Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue handler #49

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/tqdm_publisher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ._progress_handler import TQDMProgressHandler
from ._progress_subscriber import TQDMProgressSubscriber
from ._publisher import TQDMPublisher
from ._subscriber import TQDMProgressSubscriber

__all__ = ["TQDMPublisher", "TQDMProgressSubscriber"]
__all__ = ["TQDMPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"]
27 changes: 27 additions & 0 deletions src/tqdm_publisher/_progress_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import queue

from ._progress_subscriber import TQDMProgressSubscriber


class TQDMProgressHandler:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstrings everywhere would be helpful

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I'll add these when we dial it in—unless you'd prefer them for initial review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It helps with review to have a least basic descriptors of the intention of your design, yes

def __init__(self):
self.listeners = []

def listen(self):
q = queue.Queue(maxsize=25)
self.listeners.append(q)
return q

def create(self, iterable, additional_metadata: dict = dict(), **tqdm_kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this has to be a separate novel method create instead of the __init__?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, it's a little odd to have to use this roundabout approach to 'create' a new one. You might see this structure as a class constructor like

@classmethod
def from_iterable(cls, iterable: Iterable, ...) -> Self:
    ....

but in general, inheriting from one of the parents and using OOP pattern would be preferred

Copy link
Collaborator Author

@garrettmflynn garrettmflynn Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class handles more than one progress bar, so we can't just directly inherit from one of those classes.

This could instead be called register and receive an instantiated class from tqdm_publisher:

progress_bar = TQDMProgressSubscriber(iterable)
progress_handler.register(progress_bar, additional_metadata=dict(request_id=request_id))

Where:

def register(self, progress_bar, additional_metadata: dict = dict()):
    progress_bar.subscribe(lambda progress_update: self._announce(dict(
        **progress_update,
        **additional_metadata
    ))

It only doesn't work this way because it wouldn't be compatible with passing a progress_bar_class that is instantiated with progress_bar_options.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add this as another method—but AFAIK we need the original for our downstream code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class handles more than one progress bar

Why do we need a single class to handle more than one bar?

The natural use of tqdm that we seek to replace is one progress bar per iterable we want to track progress on, we're just looking to reroute the normal stdout printouts to a frontend client. So is that not possible with $N$ progress handlers (one per $N$ iterables) replacing $N$ previous tqdm bars?

Copy link
Collaborator Author

@garrettmflynn garrettmflynn Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I'd written this down for myself but never noted it on the new PRs. We need a single class to handle more than one bar's updates and forward them to the client.

Server-side events require that the connection is established on one endpoint (listen_to_neuroconv_events) and pass updates from other endpoints (e.g. convert_to_nwb, inspect_nwb_file, upload_folder_to_dandi, etc) here—usually in a queue-based instance like the MessageAnnouncer.

The new TQDMProgressHandler class organizes the messages from one or more progress bars into a queue, allowing these updates to conform to SSE's structural requirement and, in general, could be used to avoid overloading when running many parallel bars like in the WebSockets demo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I was trying to communicate this here, things just got quite spread out: #46 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha. OK, I see now, thanks

I will think more on this then...

return TQDMProgressSubscriber(
iterable,
lambda progress_update: self._announce(dict(**progress_update, **additional_metadata)),
**tqdm_kwargs,
)

def _announce(self, msg):
for i in reversed(range(len(self.listeners))):
try:
self.listeners[i].put_nowait(msg)
except queue.Full:
del self.listeners[i]
11 changes: 2 additions & 9 deletions src/tqdm_publisher/_progress_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@


class TQDMProgressSubscriber(TQDMPublisher):
def __init__(self, iterable, *, announcer: "?", request_id: str, message: dict, **tqdm_kwargs):
def __init__(self, iterable, on_progress_update: callable, **tqdm_kwargs):
super().__init__(iterable, **tqdm_kwargs)

def on_progress_update(format_dict) -> None:
"""
Describe what this announcer is all about...
"""
announcer.announce(dict(request_id=request_id, **message))

self.subscribe(callback=on_progress_update)
self.subscribe(lambda format_dict: on_progress_update(dict(progress_bar_id=self.id, format_dict=format_dict)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to undo this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only because there was never anything called an announcer here, and callbacks are a generic way to notify another part of code about the changes here. Whereas passing an announcer as an argument assumes a certain workflow for the user, as implemented here, and ultimately runs / provides a value that will run a callback anyways

Loading