"""Promise for python"""
import collections
import enum
import uuid
from typing import Callable, Any, List, Union, Deque, TypeVar, Generic, Tuple
from prompy.errors import UnhandledPromiseError, PromiseRejectionError
TPromiseResults = TypeVar('PromiseReturnType')
# generics don't work with callbacks, check result prop for type.
CompleteCallback = Callable[[Union[List[TPromiseResults], TPromiseResults], Exception], None]
ThenCallback = Callable[[TPromiseResults], None]
CatchCallback = Callable[[Exception], None]
PromiseStarter = Callable[[Callable, Callable], None]
[docs]class PromiseState(enum.Enum):
pending = 1
fulfilled = 2
rejected = 3
[docs]class Promise(Generic[TPromiseResults]):
"""
Promise interface
Based on js Promises.
Basic usage:
`p = Promise(lambda resolve, reject: resolve('Hello')).then(print)`
"""
[docs] def __init__(self, starter: PromiseStarter,
then: ThenCallback=None,
catch: CatchCallback=None,
complete: CompleteCallback=None,
raise_again: bool=False,
start_now: bool=False,
results_buffer_size: int = 100):
"""
Promise takes at least a starter method with params to this promise
resolve and reject. Does not call exec by default but with start_now
the execution will be synchronous.
:param starter: otherwise known as executor.
:param then: initial resolve callback
:param catch: initial catch callback
:param complete: initial complete callback
:param raise_again: raise the rejection error again.
:param start_now:
:param results_buffer_size: number of results to keep in the buffer.
"""
self.canceled = False
self.completed_at = None
self._promise_id: uuid.UUID = uuid.uuid4()
self._then: List[ThenCallback] = [then] if then else []
self._catch: List[CatchCallback] = [catch] if catch else []
self._complete: List[CompleteCallback] = [complete] if complete else []
self._raise_again = raise_again
self._starter = starter
self._result: Any = None
self._results: Deque = collections.deque(maxlen=results_buffer_size)
self._error: Exception = None
self._state = PromiseState.pending
if start_now:
self.exec()
[docs] def then(self, func: ThenCallback):
"""
Add a callback to resolve
:param func: callback to resolve
:return:
"""
self._then.append(func)
if self.state == PromiseState.fulfilled:
func(self.result)
return self
[docs] def catch(self, func: CatchCallback):
"""
Add a callback to rejection
:param func:
:return:
"""
self._catch.append(func)
if self.state == PromiseState.rejected:
func(self.error)
return self
[docs] def complete(self, func: CompleteCallback):
"""
Add a callback to finally block
:param func:
:return:
"""
self._complete.append(func)
return self
[docs] def resolve(self, result: TPromiseResults):
"""
Resolve the promise, called by executor.
:param result:
:return:
"""
self._result = result # result always the last resolved
self._results.append(result)
for t in self._then:
self.callback_handler(t(result))
self._finish(PromiseState.fulfilled)
[docs] def reject(self, error: Exception):
"""
Reject the promise.
:param error:
:return:
"""
self._error = error
if not self._catch:
self._state = PromiseState.rejected
raise UnhandledPromiseError(
f"Unhandled promise exception: {self.id}") from error
for c in self._catch:
self.callback_handler(c(error))
self._finish(PromiseState.rejected)
def _finish(self, state):
for c in self._complete:
self.callback_handler(c(self.result, self._error))
self._state = state
[docs] def exec(self):
"""
Execute the starter method.
:return:
"""
try:
started = self._starter(self.resolve, self.reject)
self.callback_handler(started)
except Exception as error:
self.reject(error)
if self._raise_again:
raise PromiseRejectionError(
f"Promise {self.id} was rejected") from error
@property
def id(self) -> uuid.UUID:
return self._promise_id
@property
def result(self) -> Union[Tuple[TPromiseResults], TPromiseResults]:
return tuple(self._results) if len(self._results) > 1 else self._result
@property
def error(self) -> Exception:
return self._error
@property
def state(self) -> PromiseState:
return self._state
[docs] def callback_handler(self, obj: Any):
"""
Override to handle the return value of callbacks.
:param obj: The return value of a callback
:return:
"""
self._handle_generator_callback(obj)
# noinspection PyMethodMayBeStatic
def _handle_generator_callback(self, obj):
if hasattr(obj, '__iter__') and not hasattr(obj, '__len__'):
try:
while True:
next(obj)
except StopIteration:
pass