prompy.processio package

prompy.processio.process_promise module

Experimental multiprocess promise.

class prompy.processio.process_promise.ProcessPromise(starter, namespace=None, *args, **kwargs)[source]

Bases: prompy.promise.Promise

Experimental Promise for a multiprocessing backend. Should only use for long running functions.

Closures are not serialized properly, only their values are kept.

This goes for starter and callbacks: * Objects need to be marshal compatible. * Need to import any module at function level.

__init__(starter, namespace=None, *args, **kwargs)[source]

Promise takes at least a starter method with params to this promise resolve and reject. Does not call exec by default but with start_now the execution will be synchronous.

Parameters:
  • starter (Callable[[Callable, Callable], None]) – otherwise known as executor.
  • then – initial resolve callback
  • catch – initial catch callback
  • complete – initial complete callback
  • raise_again – raise the rejection error again.
  • start_now
  • results_buffer_size – number of results to keep in the buffer.
catch(func)[source]

Add a callback to rejection

Parameters:func (Callable[[Exception], None]) –
Returns:
exec()[source]

Execute the starter method.

Returns:
reject(error)[source]

Reject the promise.

Parameters:error (Exception) –
Returns:
resolve(result)[source]

Resolve the promise, called by executor.

Parameters:result (~PromiseReturnType) –
Returns:
then(func)[source]

Add a callback to resolve

Parameters:func (Callable[[~PromiseReturnType], None]) – callback to resolve
Returns:

prompy.processio.process_containers module

Experimental multiprocessing promise containers.

class prompy.processio.process_containers.ProcessPromiseQueue(on_idle=None, max_idle=2, poll_time=0.01, error_list=None, idle_check=False, raise_again=True)[source]

Bases: prompy.container.BasePromiseContainer

A queue for a process promise.

Usage: multiprocess.Process(target=ProcessPromiseQueue.run)

__init__(on_idle=None, max_idle=2, poll_time=0.01, error_list=None, idle_check=False, raise_again=True)[source]

Queue initializer.

Parameters:
  • on_idle (Optional[Callable]) – callback to call when the queue is idle
  • max_idle (float) – max time the queue can be idling.
  • poll_time (float) – the frequency of queue timeouts.
  • error_list (Optional[<bound method BaseContext.Queue of <multiprocessing.context.DefaultContext object at 0x7faadbe97780>>]) – a multiprocess container to exchange errors.
  • idle_check (bool) – to use the idle timeout or not.
  • raise_again (bool) – to raise errors again after catch (stop the queue).
add_promise(promise)[source]

Add a promise to the container.

Parameters:promise (ProcessPromise[]) –
Returns:
errors
id
Return type:int
num_tasks

The number of promise still to resolve.

Return type:int
run()[source]
running
class prompy.processio.process_containers.PromiseProcessPool(pool_size=10, queue_options=None)[source]

Bases: prompy.container.BasePromiseRunner

A pool of PromiseQueue to add promise to.

__init__(pool_size=10, queue_options=None)[source]
Parameters:
  • pool_size – number of processes that will be spawned.
  • queue_options – options to give to spawned queue
add_promise(promise)[source]

Add a promise to the container.

Parameters:promise (ProcessPromise[]) –
Returns:
get_errors()[source]

Get all the errors from processes, they are consumed.

num_tasks

Sum of all tasks still in queue.

start()[source]
stop()[source]