"""
Promise you can await.
:Example:
.. code-block:: python
import asyncio
from prompy.awaitable import AwaitablePromise
from prompy.networkio.async_call import call
async def call_starter(resolve, _):
google = await call('http://www.google.com')
resolve(google)
p = AwaitablePromise(call_starter)
@p.then
def then(result):
print(result)
asyncio.get_event_loop().stop()
@p.catch
def catch(err):
asyncio.get_event_loop().stop()
raise err
asyncio.get_event_loop().run_forever()
"""
import asyncio
from typing import Any
from prompy.container import BasePromiseRunner
from prompy.errors import UnhandledPromiseError
from prompy.promise import Promise, CompleteCallback,\
CatchCallback, ThenCallback, PromiseStarter, PromiseState
from prompy.promtools import promise_wrap
[docs]class AwaitablePromise(Promise):
"""
asyncio compatible promise
Await it to get the result.
Need a running loop to actually start the executor.
"""
[docs] def __init__(self, starter: PromiseStarter,
then: ThenCallback = None,
catch: CatchCallback = None,
complete: CompleteCallback = None,
loop: asyncio.AbstractEventLoop=None):
super().__init__(starter, then, catch, complete,
raise_again=False,
start_now=False)
self.loop = loop or asyncio.get_event_loop()
self.future: asyncio.Future = self.loop.create_future()
self.loop.call_soon_threadsafe(self.exec)
[docs] def resolve(self, result: Any):
self._result = result
self._results.append(result)
if not self._then:
return self._finish(PromiseState.fulfilled)
for t in self._then:
self._ensure_awaited(t(result),
callback=self._done(
self._finish,
PromiseState.fulfilled,
len(self._then)))
[docs] def reject(self, error: Exception):
self._error = error
if not self._catch:
self._state = PromiseState.rejected
raise UnhandledPromiseError(
f"Unhandled promise exception: {self.id}") from error
for catcher in self._catch:
self._ensure_awaited(catcher(error),
callback=self._done(
self._finish,
PromiseState.fulfilled,
len(self._catch)))
def _finish(self, state):
if not self._complete:
self._on_complete(state)
for c in self._complete:
self._ensure_awaited(c(self.result, self._error),
callback=self._done(
self._on_complete,
state,
len(self._complete)))
def _on_complete(self, state):
if self._error:
self.future.set_exception(self._error)
else:
self.future.set_result(self.result)
self._state = state
def _done(self, on_finish, state, to_do):
done = asyncio.Queue(loop=self.loop)
def _done(*args, **kwargs):
done.put_nowait(1)
if done.qsize() == to_do:
on_finish(state)
return _done
def __await__(self):
while not self.future.done():
yield from self.future
return self.future.result()
[docs] def callback_handler(self, obj: Any):
self._ensure_awaited(obj)
def _ensure_awaited(self, obj, callback=None):
if asyncio.iscoroutine(obj):
task = self.loop.create_task(obj)
if callback:
task.add_done_callback(callback)
elif callback:
callback()
@property
def error(self):
"""
:raise: invalid state if the promise was not completed.
:return: the exception or the handled error
"""
return self.future.exception() or self._error
[docs] @staticmethod
def wrap(func):
return promise_wrap(func, prom_type=AwaitablePromise)
[docs]class AsyncPromiseRunner(BasePromiseRunner):
"""Run the loop forever"""
[docs] def __init__(self):
self.loop = asyncio.get_event_loop()
[docs] def stop(self):
self.loop.stop()
[docs] def add_promise(self, promise: Promise):
pass
[docs] def start(self):
self.loop.run_forever()