diff --git a/src/tqdm_publisher/__init__.py b/src/tqdm_publisher/__init__.py index bdc4b31..4e9c451 100644 --- a/src/tqdm_publisher/__init__.py +++ b/src/tqdm_publisher/__init__.py @@ -1,4 +1,5 @@ +from ._progress_handler import TQDMProgressHandler +from ._progress_subscriber import TQDMProgressSubscriber from ._publisher import TQDMPublisher -from ._subscriber import TQDMProgressSubscriber -__all__ = ["TQDMPublisher", "TQDMProgressSubscriber"] +__all__ = ["TQDMPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] diff --git a/src/tqdm_publisher/_progress_handler.py b/src/tqdm_publisher/_progress_handler.py new file mode 100644 index 0000000..a3b6e29 --- /dev/null +++ b/src/tqdm_publisher/_progress_handler.py @@ -0,0 +1,27 @@ +import queue + +from ._progress_subscriber import TQDMProgressSubscriber + + +class TQDMProgressHandler: + def __init__(self): + self.listeners = [] + + def listen(self): + q = queue.Queue(maxsize=25) + self.listeners.append(q) + return q + + def create(self, iterable, additional_metadata: dict = dict(), **tqdm_kwargs): + return TQDMProgressSubscriber( + iterable, + lambda progress_update: self._announce(dict(**progress_update, **additional_metadata)), + **tqdm_kwargs, + ) + + def _announce(self, msg): + for i in reversed(range(len(self.listeners))): + try: + self.listeners[i].put_nowait(msg) + except queue.Full: + del self.listeners[i] diff --git a/src/tqdm_publisher/_progress_subscriber.py b/src/tqdm_publisher/_progress_subscriber.py index b892f4e..289d3d6 100644 --- a/src/tqdm_publisher/_progress_subscriber.py +++ b/src/tqdm_publisher/_progress_subscriber.py @@ -2,13 +2,6 @@ class TQDMProgressSubscriber(TQDMPublisher): - def __init__(self, iterable, *, announcer: "?", request_id: str, message: dict, **tqdm_kwargs): + def __init__(self, iterable, on_progress_update: callable, **tqdm_kwargs): super().__init__(iterable, **tqdm_kwargs) - - def on_progress_update(format_dict) -> None: - """ - Describe what this announcer is all about... - """ - announcer.announce(dict(request_id=request_id, **message)) - - self.subscribe(callback=on_progress_update) + self.subscribe(lambda format_dict: on_progress_update(dict(progress_bar_id=self.id, format_dict=format_dict)))