Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue handler #49

Closed

Conversation

garrettmflynn
Copy link
Collaborator

This PR provides a TQDMProgressHandler class that queues messages and allows users to manage several progress bars by listening to this queue.

@garrettmflynn garrettmflynn self-assigned this Mar 20, 2024
self.listeners.append(q)
return q

def create(self, iterable, additional_metadata: dict = dict(), **tqdm_kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this has to be a separate novel method create instead of the __init__?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, it's a little odd to have to use this roundabout approach to 'create' a new one. You might see this structure as a class constructor like

@classmethod
def from_iterable(cls, iterable: Iterable, ...) -> Self:
    ....

but in general, inheriting from one of the parents and using OOP pattern would be preferred

Copy link
Collaborator Author

@garrettmflynn garrettmflynn Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class handles more than one progress bar, so we can't just directly inherit from one of those classes.

This could instead be called register and receive an instantiated class from tqdm_publisher:

progress_bar = TQDMProgressSubscriber(iterable)
progress_handler.register(progress_bar, additional_metadata=dict(request_id=request_id))

Where:

def register(self, progress_bar, additional_metadata: dict = dict()):
    progress_bar.subscribe(lambda progress_update: self._announce(dict(
        **progress_update,
        **additional_metadata
    ))

It only doesn't work this way because it wouldn't be compatible with passing a progress_bar_class that is instantiated with progress_bar_options.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add this as another method—but AFAIK we need the original for our downstream code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class handles more than one progress bar

Why do we need a single class to handle more than one bar?

The natural use of tqdm that we seek to replace is one progress bar per iterable we want to track progress on, we're just looking to reroute the normal stdout printouts to a frontend client. So is that not possible with $N$ progress handlers (one per $N$ iterables) replacing $N$ previous tqdm bars?

Copy link
Collaborator Author

@garrettmflynn garrettmflynn Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I'd written this down for myself but never noted it on the new PRs. We need a single class to handle more than one bar's updates and forward them to the client.

Server-side events require that the connection is established on one endpoint (listen_to_neuroconv_events) and pass updates from other endpoints (e.g. convert_to_nwb, inspect_nwb_file, upload_folder_to_dandi, etc) here—usually in a queue-based instance like the MessageAnnouncer.

The new TQDMProgressHandler class organizes the messages from one or more progress bars into a queue, allowing these updates to conform to SSE's structural requirement and, in general, could be used to avoid overloading when running many parallel bars like in the WebSockets demo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I was trying to communicate this here, things just got quite spread out: #46 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha. OK, I see now, thanks

I will think more on this then...

from ._progress_subscriber import TQDMProgressSubscriber


class TQDMProgressHandler:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstrings everywhere would be helpful

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I'll add these when we dial it in—unless you'd prefer them for initial review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It helps with review to have a least basic descriptors of the intention of your design, yes


def on_progress_update(format_dict) -> None:
"""
Describe what this announcer is all about...
"""
announcer.announce(dict(request_id=request_id, **message))

self.subscribe(callback=on_progress_update)
self.subscribe(lambda format_dict: on_progress_update(dict(progress_bar_id=self.id, format_dict=format_dict)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to undo this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only because there was never anything called an announcer here, and callbacks are a generic way to notify another part of code about the changes here. Whereas passing an announcer as an argument assumes a certain workflow for the user, as implemented here, and ultimately runs / provides a value that will run a callback anyways

@CodyCBakerPhD
Copy link
Member

@garrettmflynn I have a mild concern for the single-Queue (in Flask) based approach with a parallelized TQDM scheme and wanted to test how quickly items in the queue are resolved (with respect to the rate of progress in the bars, which can also be throttled indirectly with regards to display using mininterval on each TQDM)

Could you put together a minimal demo to test this out? It might require us to modify the TQDM options passed to the higher level to work well

@garrettmflynn
Copy link
Collaborator Author

Sure, I'll give it a shot. The mininterval solution, however, wouldn't remove the possibility that the parallel bars would submit their updates in a way that overlaps when the Websocket (for the demo) attempts to send this back to the browser.

Since we have a single update point (e.g. /events in Flask, WebSockets in the demo), we can't remove this possibility without also pushing all updates through a single pipe that returns values serially.

@CodyCBakerPhD
Copy link
Member

Yeah, I'm thinking of the pipe in the batch update sense, which could still work as long as it's fat enough, responsive enough, and the underlying updates aren't spamming faster than a certain rate

@garrettmflynn
Copy link
Collaborator Author

Ah okay, I think I see where you're going. We could always update our approach to sending events in batches (i.e. empty the queue and send all of these updates as an array back to the client) rather than serially.

Is this something you're considering?

We'd still have to catch the events using a single handler, though the queue throughput would be less concerning.

@CodyCBakerPhD
Copy link
Member

That's kind of what I want to see if it's necessary or something else we need to add

In general it would be nice to also have another demo here that uses a Flask based approach as a simpler way of testing out these ideas before going to the GUIDE

@garrettmflynn
Copy link
Collaborator Author

garrettmflynn commented Mar 25, 2024

Any recommendations on testing this given that #32 is still unmerged from main and each PR has a fairly complicated base configuration?

@garrettmflynn
Copy link
Collaborator Author

garrettmflynn commented Mar 25, 2024

I can merge #32 or #37 into this, but then we're starting to hybridize these changes so that the focus is somewhat more vague.

@CodyCBakerPhD
Copy link
Member

Any recommendations on testing this given that #32 is still unmerged from main and each PR has a fairly complicated base configuration?

I would open a fresh branch pointing directly to main and not relying on the other branches; this will make the changelog more explicit for that new demo

As for the server parallelization strategy, the type shown in https://github.com/catalystneuro/tqdm_publisher/pull/37/files#diff-24ab6a2ba2fbdc9ca50aad1f34e47ede20b5e11fb11615ad9ef65c40027dc055R65-R70 should still be sufficient for describing our use case

@garrettmflynn
Copy link
Collaborator Author

Gotcha thanks!

@CodyCBakerPhD
Copy link
Member

And this one too, replaced by #51?

@garrettmflynn
Copy link
Collaborator Author

Yep! This is fine to close

@CodyCBakerPhD CodyCBakerPhD deleted the add_queue_handler branch April 6, 2024 22:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants