From aeae8f8c74ac8854a00ab3610373137799c52974 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 20 Mar 2024 11:31:20 -0700 Subject: [PATCH 1/2] Create a queue handler for cases where a single callback is used across all messages on all progress bars --- src/tqdm_publisher/__init__.py | 5 +++-- src/tqdm_publisher/_progress_handler.py | 24 ++++++++++++++++++++++ src/tqdm_publisher/_progress_subscriber.py | 12 ++--------- 3 files changed, 29 insertions(+), 12 deletions(-) create mode 100644 src/tqdm_publisher/_progress_handler.py diff --git a/src/tqdm_publisher/__init__.py b/src/tqdm_publisher/__init__.py index bdc4b31..1d0c5fe 100644 --- a/src/tqdm_publisher/__init__.py +++ b/src/tqdm_publisher/__init__.py @@ -1,4 +1,5 @@ from ._publisher import TQDMPublisher -from ._subscriber import TQDMProgressSubscriber +from ._progress_subscriber import TQDMProgressSubscriber +from ._progress_handler import TQDMProgressHandler -__all__ = ["TQDMPublisher", "TQDMProgressSubscriber"] +__all__ = ["TQDMPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] diff --git a/src/tqdm_publisher/_progress_handler.py b/src/tqdm_publisher/_progress_handler.py new file mode 100644 index 0000000..386c477 --- /dev/null +++ b/src/tqdm_publisher/_progress_handler.py @@ -0,0 +1,24 @@ +import queue +from ._progress_subscriber import TQDMProgressSubscriber + +class TQDMProgressHandler: + def __init__(self): + self.listeners = [] + + def listen(self): + q = queue.Queue(maxsize=25) + self.listeners.append(q) + return q + + def create(self, iterable, additional_metadata: dict = dict(), **tqdm_kwargs): + return TQDMProgressSubscriber(iterable, lambda progress_update: self._announce(dict( + **progress_update, + **additional_metadata + )), **tqdm_kwargs) + + def _announce(self, msg): + for i in reversed(range(len(self.listeners))): + try: + self.listeners[i].put_nowait(msg) + except queue.Full: + del self.listeners[i] \ No newline at end of file diff --git a/src/tqdm_publisher/_progress_subscriber.py b/src/tqdm_publisher/_progress_subscriber.py index b892f4e..b818eec 100644 --- a/src/tqdm_publisher/_progress_subscriber.py +++ b/src/tqdm_publisher/_progress_subscriber.py @@ -1,14 +1,6 @@ from ._publisher import TQDMPublisher - class TQDMProgressSubscriber(TQDMPublisher): - def __init__(self, iterable, *, announcer: "?", request_id: str, message: dict, **tqdm_kwargs): + def __init__(self, iterable, on_progress_update: callable, **tqdm_kwargs): super().__init__(iterable, **tqdm_kwargs) - - def on_progress_update(format_dict) -> None: - """ - Describe what this announcer is all about... - """ - announcer.announce(dict(request_id=request_id, **message)) - - self.subscribe(callback=on_progress_update) + self.subscribe(lambda format_dict: on_progress_update(dict(progress_bar_id=self.id, format_dict=format_dict))) \ No newline at end of file From 0249b4a5777a02f648f1b87cd488e823ba1e06e8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:32:32 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/__init__.py | 4 ++-- src/tqdm_publisher/_progress_handler.py | 15 +++++++++------ src/tqdm_publisher/_progress_subscriber.py | 3 ++- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/tqdm_publisher/__init__.py b/src/tqdm_publisher/__init__.py index 1d0c5fe..4e9c451 100644 --- a/src/tqdm_publisher/__init__.py +++ b/src/tqdm_publisher/__init__.py @@ -1,5 +1,5 @@ -from ._publisher import TQDMPublisher -from ._progress_subscriber import TQDMProgressSubscriber from ._progress_handler import TQDMProgressHandler +from ._progress_subscriber import TQDMProgressSubscriber +from ._publisher import TQDMPublisher __all__ = ["TQDMPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] diff --git a/src/tqdm_publisher/_progress_handler.py b/src/tqdm_publisher/_progress_handler.py index 386c477..a3b6e29 100644 --- a/src/tqdm_publisher/_progress_handler.py +++ b/src/tqdm_publisher/_progress_handler.py @@ -1,6 +1,8 @@ import queue + from ._progress_subscriber import TQDMProgressSubscriber + class TQDMProgressHandler: def __init__(self): self.listeners = [] @@ -9,16 +11,17 @@ def listen(self): q = queue.Queue(maxsize=25) self.listeners.append(q) return q - + def create(self, iterable, additional_metadata: dict = dict(), **tqdm_kwargs): - return TQDMProgressSubscriber(iterable, lambda progress_update: self._announce(dict( - **progress_update, - **additional_metadata - )), **tqdm_kwargs) + return TQDMProgressSubscriber( + iterable, + lambda progress_update: self._announce(dict(**progress_update, **additional_metadata)), + **tqdm_kwargs, + ) def _announce(self, msg): for i in reversed(range(len(self.listeners))): try: self.listeners[i].put_nowait(msg) except queue.Full: - del self.listeners[i] \ No newline at end of file + del self.listeners[i] diff --git a/src/tqdm_publisher/_progress_subscriber.py b/src/tqdm_publisher/_progress_subscriber.py index b818eec..289d3d6 100644 --- a/src/tqdm_publisher/_progress_subscriber.py +++ b/src/tqdm_publisher/_progress_subscriber.py @@ -1,6 +1,7 @@ from ._publisher import TQDMPublisher + class TQDMProgressSubscriber(TQDMPublisher): def __init__(self, iterable, on_progress_update: callable, **tqdm_kwargs): super().__init__(iterable, **tqdm_kwargs) - self.subscribe(lambda format_dict: on_progress_update(dict(progress_bar_id=self.id, format_dict=format_dict))) \ No newline at end of file + self.subscribe(lambda format_dict: on_progress_update(dict(progress_bar_id=self.id, format_dict=format_dict)))