Source code for prompy.processio.process_promise

"""Experimental multiprocess promise."""
import time

from prompy.errors import PromiseRejectionError, UnhandledPromiseError
from prompy.promise import Promise, PromiseStarter, PromiseState, ThenCallback, CatchCallback, TPromiseResults

from prompy.function_serializer import serialize_fun, deserialize_fun


[docs]class ProcessPromise(Promise): """ Experimental Promise for a multiprocessing backend. Should only use for long running functions. Closures are not serialized properly, only their values are kept. This goes for starter and callbacks: * Objects need to be marshal compatible. * Need to import any module at function level. """
[docs] def __init__(self, starter: PromiseStarter, namespace=None, *args, **kwargs): super().__init__(starter, *args, **kwargs) self.namespace = namespace self._starter = serialize_fun(starter)
[docs] def exec(self): try: starter = deserialize_fun(self._starter, namespace=self.namespace) starter(self.resolve, self.reject) self._state = PromiseState.fulfilled except Exception as error: self.reject(error) if self._raise_again: raise PromiseRejectionError(f"Promise {self.id} was rejected") from error finally: self.completed_at = time.time() for c in self._complete: c(self.result, self._error)
[docs] def then(self, func: ThenCallback): self._then.append(serialize_fun(func)) return self
[docs] def catch(self, func: CatchCallback): self._catch.append(serialize_fun(func)) return self
[docs] def resolve(self, result: TPromiseResults): self._result = result self._results.append(result) for t in self._then: then = deserialize_fun(t, self.namespace) then(result)
[docs] def reject(self, error: Exception): self._error = error self._state = PromiseState.rejected if not self._catch: raise UnhandledPromiseError(f"Unhandled promise exception: {self.id}") from error for c in self._catch: catch = deserialize_fun(c, self.namespace) catch(error)