Source code for prompy.threadio.promise_queue

import queue
import threading
import uuid

from typing import Callable

import time

from prompy.container import PromiseContainer, BasePromiseRunner
from prompy.promise import Promise


[docs]class PromiseQueue(PromiseContainer): __thread_index = 0
[docs] def __init__(self, start=False, max_idle=0.5, on_stop: Callable = None, queue_timeout=0.01, interval=0.01, daemon=False): super().__init__() self.index = PromiseQueue.__thread_index self._thread = threading.Thread(target=self._run, name=f"PromiseQueue-{self.index}") self._thread.daemon = daemon PromiseQueue.__thread_index += 1 self._queue = queue.Queue() self._lock: threading.Lock = threading.Lock() self._stop_event = threading.Event() self._running = False self._idle_time = 0 self._max_idle = max_idle self._started = False self._on_stop = on_stop self._queue_timeout = queue_timeout self._interval = interval self._error = None if start: self.start()
[docs] def add_promise(self, promise: Promise): super(PromiseQueue, self).add_promise(promise) self._queue.put(promise.id)
def _run(self): self._running = True idle_start = None while self._running: try: current = self._queue.get(block=False, timeout=self._queue_timeout) idle_start = None promise = self._promises[current] self._lock.acquire() if not promise.canceled: promise.exec() self._lock.release() self._stop_event.wait(self._interval) if self._stop_event.is_set(): self._running = False except queue.Empty: if not idle_start: idle_start = time.time() else: idle_time = time.time() - idle_start if idle_time > self._max_idle: self._running = False except Exception as e: self._running = False self._error = e self._stopped() raise e self._stopped()
[docs] def start(self): if not self._started: self._thread.start() self._started = True
[docs] def cancel(self, cancel_id: uuid.UUID): prom = self._promises.get(cancel_id) if prom and not prom.canceled: prom.canceled = True
[docs] def stop(self): self._stop_event.set()
@property def running(self): return self._running @property def error(self): return self._error def _stopped(self): if self._on_stop: self._on_stop(self)
[docs]class PromiseQueuePool(BasePromiseRunner):
[docs] def __init__(self, pool_size=8, start=False, max_idle=0.5, daemon=False): self._max_idle = max_idle self.pool_size = pool_size self._daemon = daemon self._pool = queue.Queue(maxsize=pool_size) self._on_thread_stop = None if start: self.start()
[docs] def add_promise(self, promise: Promise): if self._pool.qsize() < self.pool_size: self._add_queue() while True: pq = self._pool.get() if not pq.running: self._add_queue() else: pq.add_promise(promise) self._pool.put(pq) return
def _add_queue(self): pq = PromiseQueue(start=True, max_idle=self._max_idle, on_stop=self._thread_stopped, daemon=self._daemon) self._pool.put(pq)
[docs] def stop(self): while True: try: pq = self._pool.get() pq.stop() except queue.Empty: break
[docs] def start(self): while self._pool.qsize() < self.pool_size: try: self._add_queue() except queue.Full: return
[docs] def is_running(self): running = [] while True: try: pq = self._pool.get_nowait() if pq.running: running.append(pq) except queue.Empty: break is_running = len(running) > 0 for r in running: self._pool.put_nowait(r) return is_running
[docs] def on_thread_stop(self, func): self._on_thread_stop = func
def _thread_stopped(self, t): if self._on_thread_stop: self._on_thread_stop(t)