Source code for prompy.threadio.pooled_caller

import prompy.networkio.url_tools
from prompy.promise import Promise
from prompy.networkio import urlcall
from prompy.threadio.promise_queue import PromiseQueuePool
from prompy.container import container_wrap, BasePromiseContainer


[docs]class PooledCaller(BasePromiseContainer): """ Class wrapper for urlcall. Auto-add calls to a PromiseQueuePool to be resolved. """
[docs] def __init__(self, **pool_kwargs): self._pool = PromiseQueuePool(**pool_kwargs)
[docs] @container_wrap def call(self, url, **kwargs) -> Promise[prompy.networkio.url_tools.UrlCallResponse]: return urlcall.url_call(url, **kwargs)
[docs] def json_call(self, url, **kwargs): prom = urlcall.json_call(url, **kwargs) self._pool.add_promise(prom) return prom
[docs] def get(self, url, **kwargs): prom = urlcall.get(url, **kwargs) self._pool.add_promise(prom) return prom
[docs] def post(self, url, **kwargs): prom = urlcall.post(url, **kwargs) self._pool.add_promise(prom) return prom
[docs] def put(self, url, **kwargs): prom = urlcall.put(url, **kwargs) self._pool.add_promise(prom) return prom
[docs] @container_wrap def head(self, url, **kwargs): return urlcall.url_call(url, **kwargs)
[docs] def add_promise(self, promise: Promise): self._pool.add_promise(promise)