"""Methods for working with promises."""
import functools
from typing import Callable
import time
from prompy.promise import Promise, PromiseState
[docs]def promise_wrap(func, prom_type=Promise, **kw) -> Callable[..., Promise]:
"""Wraps a function return in a promise resolve."""
@functools.wraps(func)
def _wrap(*args, **kwargs) -> Promise:
def _prom(resolve, reject):
try:
resolve(func(*args, **kwargs))
except Exception as error:
reject(error)
return prom_type(_prom, **kw)
return _wrap
class _AllPromiseWrap:
def __init__(self, promises):
self.promises = promises
self.num_promises = len(promises)
self.res = 0
self.rejected = False
self.results = []
self.rejection = []
self._resolve = None
self._reject = None
def __call__(self, resolve, reject):
self._resolve = resolve
self._reject = reject
if not self.rejected and self.num_promises == self.res:
resolve(self.results)
elif sum((1 for x in self.promises if x.state == PromiseState.fulfilled)) == self.num_promises:
resolve([p.result for p in self.promises])
elif self.rejected or sum((1 for x in self.promises if x.state == PromiseState.rejected)) > 0:
reject(Exception("Promise all reject"))
def then(self, result):
self.res += 1
if not self.rejected:
self.results.append(result)
if self.res == self.num_promises and self._resolve:
self._resolve(self.results)
def catch(self, err):
self.rejected = True
if self._reject:
self._reject(err)
[docs]def pall(*promises, prom_type=Promise, **kwargs) -> Promise:
"""Wrap all the promises in a single one that resolve when all promises are done."""
starter = _AllPromiseWrap(promises)
for p in promises:
p.then(starter.then).catch(starter.catch)
return prom_type(starter, **kwargs)
[docs]def piter(func, iterable, prom_type=Promise, **kwargs) -> Promise:
"""Applies func to iterable in a promise, like a map and foreach (starter->then)."""
def starter(resolve, _):
for i in iterable:
resolve(func(i))
return prom_type(starter, **kwargs)
[docs]def later(func, delay, wait_func=time.sleep, prom_type=Promise, **kwargs) -> Callable[..., Promise]:
"""Bad, do not use."""
@functools.wraps(func)
def _wrap(*args, **kw):
def starter(resolve, reject):
wait_func(delay)
try:
resolve(func(*args, **kw))
except Exception as err:
reject(err)
return prom_type(starter, **kwargs)
return _wrap