From 5aa33807da535740baba89b33573971d481d4ea8 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Sun, 3 Mar 2024 13:18:37 -0500 Subject: [PATCH 01/13] fixing for M1 Mac --- .gitignore | 2 ++ pyproject.toml | 4 ++-- src/tqdm_publisher/demo/__init__.py | 0 {demo => src/tqdm_publisher/demo}/client.html | 0 {demo => src/tqdm_publisher/demo}/client.py | 0 .../tqdm_publisher/demo/demo_command_line_interface.py | 0 {demo => src/tqdm_publisher/demo}/server.py | 0 7 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 src/tqdm_publisher/demo/__init__.py rename {demo => src/tqdm_publisher/demo}/client.html (100%) rename {demo => src/tqdm_publisher/demo}/client.py (100%) rename demo/demo_cli.py => src/tqdm_publisher/demo/demo_command_line_interface.py (100%) rename {demo => src/tqdm_publisher/demo}/server.py (100%) diff --git a/.gitignore b/.gitignore index 34fbcc3..e8708a1 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ dist .coverage .coverage.* codecov.xml + +.DS_Store diff --git a/pyproject.toml b/pyproject.toml index 2518f55..90d792c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,8 +50,8 @@ demo = [ "Homepage" = "https://github.com/catalystneuro/tqdm_publisher" "Bug Tracker" = "https://github.com/catalystneuro/tqdm_publisher/issues" -[project.gui-scripts] -tqdm_publisher = "demo.demo_cli:main" +[project.scripts] +tqdm_publisher = "tqdm_publisher.demo.demo_cli:main" [tool.black] line-length = 120 diff --git a/src/tqdm_publisher/demo/__init__.py b/src/tqdm_publisher/demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/demo/client.html b/src/tqdm_publisher/demo/client.html similarity index 100% rename from demo/client.html rename to src/tqdm_publisher/demo/client.html diff --git a/demo/client.py b/src/tqdm_publisher/demo/client.py similarity index 100% rename from demo/client.py rename to src/tqdm_publisher/demo/client.py diff --git a/demo/demo_cli.py b/src/tqdm_publisher/demo/demo_command_line_interface.py similarity index 100% rename from demo/demo_cli.py rename to src/tqdm_publisher/demo/demo_command_line_interface.py diff --git a/demo/server.py b/src/tqdm_publisher/demo/server.py similarity index 100% rename from demo/server.py rename to src/tqdm_publisher/demo/server.py From 59e757b5f1c15653188f7847326b6749aa77ad98 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 6 Mar 2024 14:12:27 -0800 Subject: [PATCH 02/13] Fix name of file targeted --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 90d792c..ed45425 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ classifiers = [ "Operating System :: MacOS", "Operating System :: Unix", ] + dependencies = [ "tqdm>=4.49.0" ] @@ -51,7 +52,7 @@ demo = [ "Bug Tracker" = "https://github.com/catalystneuro/tqdm_publisher/issues" [project.scripts] -tqdm_publisher = "tqdm_publisher.demo.demo_cli:main" +tqdm_publisher = "tqdm_publisher.demo.demo_command_line_interface:main" [tool.black] line-length = 120 From ed1422adab0f5a21ab5f3fc6f1eb0384bbdb858f Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Sun, 10 Mar 2024 17:23:23 -0700 Subject: [PATCH 03/13] Simplify demo structure (#39) * Simplify synchronous demo * Update docs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- README.md | 50 +++++++---- src/tqdm_publisher/demo/client.html | 58 ++++++++++--- src/tqdm_publisher/demo/server.py | 130 +++++----------------------- 3 files changed, 100 insertions(+), 138 deletions(-) diff --git a/README.md b/README.md index 6dd30cf..2086356 100644 --- a/README.md +++ b/README.md @@ -11,36 +11,50 @@ This is useful if you want to use `tqdm` to track the progress of a long-running ```bash pip install tqdm_publisher ``` +## Getting Started +### Basic Usage +To monitor the progress of an existing `tqdm` progress bar, simply swap the `tqdm`and `TQDMPublisher` constructors. Then, declare a callback function to handle progress updates, and subscribe it to the `TQDMPublisher` updates using the `subscribe` method _before iteration begins_. -## Usage +#### Original Code ```python import random -import asyncio +import time -from tqdm_publisher import TQDMPublisher +from tqdm import tqdm + +N_TASKS = 100 + +# Create a list of tasks +durations = [ random.uniform(0, 1.0) for _ in range(N_TASKS) ] + +# Create a progress bar +progress_bar = tqdm(durations) -async def sleep_func(sleep_duration = 1): - await asyncio.sleep(delay=sleep_duration) +# Iterate over the progress bar +for duration in progress_bar: + time.sleep(duration) # Execute the task +``` -async def run_multiple_sleeps(sleep_durations): +#### Modified Code - tasks = [] +```python +import random +import time - for sleep_duration in sleep_durations: - task = asyncio.create_task(sleep_func(sleep_duration=sleep_duration)) - tasks.append(task) +from tqdm_publisher import TQDMPublisher - progress_bar = TQDMPublisher(asyncio.as_completed(tasks), total=len(tasks)) - callback_id = progress_bar.subscribe(lambda info: print('Progress Update', info)) +N_TASKS = 100 +durations = [ random.uniform(0, 1.0) for _ in range(N_TASKS) ] +progress_bar = TQDMPublisher(durations) - for f in progress_bar: - await f +# Declare a callback function to handle progress updates +on_update = lambda info: print('Progress Update', info) - progress_bar.unsubscribe(callback_id) +# Subscribe the callback to the TQDMPublisher +progress_bar.subscribe(on_update) -number_of_tasks = 10**5 -sleep_durations = [random.uniform(0, 5.0) for _ in range(number_of_tasks)] -asyncio.run(run_multiple_sleeps(sleep_durations=sleep_durations)) +for duration in progress_bar: + time.sleep(duration) ``` ## Demo diff --git a/src/tqdm_publisher/demo/client.html b/src/tqdm_publisher/demo/client.html index 67ed769..3b716e2 100644 --- a/src/tqdm_publisher/demo/client.html +++ b/src/tqdm_publisher/demo/client.html @@ -74,35 +74,67 @@

tqdm_progress

diff --git a/src/tqdm_publisher/demo/server.py b/src/tqdm_publisher/demo/server.py index 02d68dd..abaa212 100644 --- a/src/tqdm_publisher/demo/server.py +++ b/src/tqdm_publisher/demo/server.py @@ -4,6 +4,7 @@ import json import random import threading +import time from typing import List from uuid import uuid4 @@ -12,136 +13,51 @@ from tqdm_publisher import TQDMPublisher -async def sleep_func(sleep_duration: float = 1) -> float: - await asyncio.sleep(delay=sleep_duration) +def generate_task_durations(n=100) -> List[float]: + return [random.uniform(0, 1.0) for _ in range(n)] -def create_tasks(): - n = 10**5 - sleep_durations = [random.uniform(0, 5.0) for _ in range(n)] - tasks = [] - - for sleep_duration in sleep_durations: - task = asyncio.create_task(sleep_func(sleep_duration=sleep_duration)) - tasks.append(task) - - return tasks - - -class ProgressHandler: - def __init__(self): - self.started = False - self.callbacks = [] - self.callback_ids = [] - - def subscribe(self, callback): - self.callbacks.append(callback) - - if hasattr(self, "progress_bar"): - self._subscribe(callback) - - def unsubscribe(self, callback_id): - self.progress_bar.unsubscribe(callback_id) - - def clear(self): - self.callbacks = [] - self._clear() - - def _clear(self): - for callback_id in self.callback_ids: - self.unsubscribe(callback_id) - - self.callback_ids = [] - - async def run(self): - for f in self.progress_bar: - await f - - def stop(self): - self.started = False - self.clear() - self.thread.join() - - def _subscribe(self, callback): - callback_id = self.progress_bar.subscribe(callback) - self.callback_ids.append(callback_id) - - async def run(self): - if hasattr(self, "progress_bar"): - print("Progress bar already running") - return - - self.tasks = create_tasks() - self.progress_bar = TQDMPublisher(asyncio.as_completed(self.tasks), total=len(self.tasks)) - - for callback in self.callbacks: - self._subscribe(callback) - - for f in self.progress_bar: - await f - - self._clear() - del self.progress_bar - - def thread_loop(self): - while self.started: - asyncio.run(self.run()) - - def start(self): - if self.started: - return - - self.started = True - - self.thread = threading.Thread(target=self.thread_loop) # Start infinite loop of progress bar thread - self.thread.start() - - -progress_handler = ProgressHandler() +def start_progress_bar(id, callback): + durations = generate_task_durations() + progress_bar = TQDMPublisher(durations) + progress_bar.subscribe(lambda info: callback(id, info)) + for duration in progress_bar: + time.sleep(duration) class WebSocketHandler: def __init__(self): self.clients = {} - - # Initialize with any state you need pass - def handle_task_result(self, task): - try: - task.result() # This will re-raise any exception that occurred in the task - except websockets.exceptions.ConnectionClosedOK: - print("WebSocket closed while sending message") - except Exception as e: - print(f"Error in task: {e}") + async def send(self, id, data): + await self.clients[id].send(json.dumps(data)) async def handler(self, websocket): - id = str(uuid4()) - self.clients[id] = websocket # Register client connection + identifier = str(uuid4()) + self.clients[identifier] = websocket # Register client connection - progress_handler.start() # Start if not started + def on_progress(id, info): - def on_progress(info): - task = asyncio.create_task(websocket.send(json.dumps(info))) - task.add_done_callback(self.handle_task_result) # Handle task result or exception - - progress_handler.subscribe(on_progress) + asyncio.run(self.send(identifier, dict(id=id, payload=info))) try: async for message in websocket: - print("Message from client received:", message) + + info = json.loads(message) + + if info["command"] == "start": + thread = threading.Thread(target=start_progress_bar, args=[info["id"], on_progress]) + thread.start() finally: - # This is called when the connection is closed - del self.clients[id] - if len(self.clients) == 0: - progress_handler.stop() + del self.clients[identifier] # This is called when the connection is closed async def spawn_server(): handler = WebSocketHandler().handler async with websockets.serve(handler, "", 8000): - await asyncio.Future() # run forever + await asyncio.Future() def main(): From 74a0a055b625bedf5470d93ea1c8ba7422af0a03 Mon Sep 17 00:00:00 2001 From: codycbakerphd Date: Sun, 10 Mar 2024 22:05:04 -0400 Subject: [PATCH 04/13] suggestions for readability --- pyproject.toml | 2 +- .../{demo => _demo}/__init__.py | 0 .../{demo/client.html => _demo/_client.html} | 6 +- .../_demo/_demo_command_line_interface.py | 43 +++++++++ src/tqdm_publisher/_demo/_server.py | 89 +++++++++++++++++++ src/tqdm_publisher/demo/client.py | 12 --- .../demo/demo_command_line_interface.py | 41 --------- src/tqdm_publisher/demo/server.py | 68 -------------- 8 files changed, 136 insertions(+), 125 deletions(-) rename src/tqdm_publisher/{demo => _demo}/__init__.py (100%) rename src/tqdm_publisher/{demo/client.html => _demo/_client.html} (94%) create mode 100644 src/tqdm_publisher/_demo/_demo_command_line_interface.py create mode 100644 src/tqdm_publisher/_demo/_server.py delete mode 100644 src/tqdm_publisher/demo/client.py delete mode 100644 src/tqdm_publisher/demo/demo_command_line_interface.py delete mode 100644 src/tqdm_publisher/demo/server.py diff --git a/pyproject.toml b/pyproject.toml index ed45425..942e7fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ demo = [ "Bug Tracker" = "https://github.com/catalystneuro/tqdm_publisher/issues" [project.scripts] -tqdm_publisher = "tqdm_publisher.demo.demo_command_line_interface:main" +tqdm_publisher = "tqdm_publisher._demo._demo_command_line_interface:_command_line_interface" [tool.black] line-length = 120 diff --git a/src/tqdm_publisher/demo/__init__.py b/src/tqdm_publisher/_demo/__init__.py similarity index 100% rename from src/tqdm_publisher/demo/__init__.py rename to src/tqdm_publisher/_demo/__init__.py diff --git a/src/tqdm_publisher/demo/client.html b/src/tqdm_publisher/_demo/_client.html similarity index 94% rename from src/tqdm_publisher/demo/client.html rename to src/tqdm_publisher/_demo/_client.html index 3b716e2..0e01ef3 100644 --- a/src/tqdm_publisher/demo/client.html +++ b/src/tqdm_publisher/_demo/_client.html @@ -131,10 +131,10 @@

tqdm_progress

const { element, progress } = createProgressBar(); barElements.appendChild(element); - const id = Math.random().toString(36).substring(7); - bars[id] = progress; + const progress_bar_id = Math.random().toString(36).substring(7); + bars[progress_bar_id] = progress; - client.socket.send(JSON.stringify({ command: 'start', id })); + client.socket.send(JSON.stringify({ command: 'start', progress_bar_id })); }) diff --git a/src/tqdm_publisher/_demo/_demo_command_line_interface.py b/src/tqdm_publisher/_demo/_demo_command_line_interface.py new file mode 100644 index 0000000..2f1764c --- /dev/null +++ b/src/tqdm_publisher/_demo/_demo_command_line_interface.py @@ -0,0 +1,43 @@ +import os +import subprocess +import sys +from pathlib import Path + +from ._server import run_demo + +DEMO_BASE_FOLDER_PATH = Path(__file__).parent + +CLIENT_FILE_PATH = DEMO_BASE_FOLDER_PATH / "_client.html" +SERVER_FILE_PATH = DEMO_BASE_FOLDER_PATH / "_server.py" + + +def _command_line_interface(): + """A simple command line interface for running the demo for TQDM Publisher.""" + if len(sys.argv) <= 1: + print("No input provided. Please specify a command (e.g. 'demo').") + return + + command = sys.argv[1] + if "-" in command: + print( + f"No command provided, but flag {command} was received. " + "Please specify a command before the flag (e.g. 'demo')." + ) + return + + flags_list = sys.argv[2:] + if len(flags_list) > 0: + print(f"No flags are accepted at this time, but flags {flags_list} were received.") + return + + if command == "demo": + # For convenience - automatically pop-up a browser window on the locally hosted HTML page + if sys.platform == "win32": + os.system(f'start "" "{CLIENT_FILE_PATH}"') + else: + subprocess.run(["open", CLIENT_FILE_PATH]) + + run_demo() + + else: + print(f"{command} is an invalid command.") diff --git a/src/tqdm_publisher/_demo/_server.py b/src/tqdm_publisher/_demo/_server.py new file mode 100644 index 0000000..dbd18ab --- /dev/null +++ b/src/tqdm_publisher/_demo/_server.py @@ -0,0 +1,89 @@ +import asyncio +import json +import threading +import time +from typing import Dict, Any, List +from uuid import uuid4 + +import websockets + +from tqdm_publisher import TQDMPublisher + + +def start_progress_bar(*, client_id: str, progress_bar_id: str, client_callback: callable) -> None: + """ + Emulate running the specified number of tasks by sleeping the specified amount of time on each iteration. + + Defaults are chosen for a deterministic and regular update period of one second for a total time of one minute. + """ + all_task_durations_in_seconds = [1.0 for _ in range(60)] # One minute at one second per update + progress_bar = TQDMPublisher(iterable=all_task_durations_in_seconds) + + def run_function_on_progress_update(format_dict: dict) -> None: + """ + This is the injected callback that will be run on each update of the TQDM object. + + Its first and only positional argument must be the `format_dict` of the TQDM instance. Additional customization + on outside parameters must be achieved by defining those fields at an outer scope and defining this + server-specific callback inside the local scope. + + In this demo, we will execute the `client_callback` whose protocol is known only to the WebSocketHandler. + It has + """ + client_callback(client_id=client_id, progress_bar_id=progress_bar_id, format_dict=format_dict) + + progress_bar.subscribe(callback=run_function_on_progress_update) + + for task_duration in progress_bar: + time.sleep(task_duration) + + +class WebSocketHandler: + """Describe this class.""" + + def __init__(self) -> None: + """Initialize the mapping of client IDs to .""" + self.clients: Dict[str, Any] = dict() + + def function_to_run_on_progress_update(self, *, client_id: str, progress_bar_id: str, format_dict: dict) -> None: + """This is...""" + asyncio.run(self.send(client_id=client_id, data=dict(progress_bar_id=progress_bar_id, format_dict=format_dict))) + + async def send(self, client_id: str, data: dict) -> None: + """Send an arbitrary JSON object `data` to the client identifier by `client_id`.""" + await self.clients[client_id].send(json.dumps(obj=data)) + + async def handler(self, websocket) -> None: + """Describe what the handler does.""" + client_id = str(uuid4()) + self.clients[client_id] = websocket # Register client connection + + try: + async for message in websocket: + message_from_client = json.loads(message) + + if message_from_client["command"] == "start": + thread = threading.Thread( + target=start_progress_bar, + kwargs=dict( + client_id=client_id, + progress_bar_id=message_from_client["progress_bar_id"], + client_callback=self.function_to_run_on_progress_update, + ), + ) + thread.start() + finally: # This is called when the connection is closed + if client_id in self.clients: + del self.clients[client_id] + + +async def spawn_server() -> None: + """Spawn the server asynchronously.""" + handler = WebSocketHandler().handler + async with websockets.serve(handler, "", 8000): + await asyncio.Future() + + +def run_demo() -> None: + """Trigger the execution of the asynchronous spawn.""" + asyncio.run(spawn_server()) diff --git a/src/tqdm_publisher/demo/client.py b/src/tqdm_publisher/demo/client.py deleted file mode 100644 index 73bf883..0000000 --- a/src/tqdm_publisher/demo/client.py +++ /dev/null @@ -1,12 +0,0 @@ -import subprocess -from pathlib import Path - -client_path = Path(__file__).parent / "client.html" - - -def main(): - subprocess.run(["open", client_path]) - - -if __name__ == "__main__": - main() diff --git a/src/tqdm_publisher/demo/demo_command_line_interface.py b/src/tqdm_publisher/demo/demo_command_line_interface.py deleted file mode 100644 index 766d806..0000000 --- a/src/tqdm_publisher/demo/demo_command_line_interface.py +++ /dev/null @@ -1,41 +0,0 @@ -import subprocess -import sys -from pathlib import Path - -demo_base_path = Path(__file__).parent - -client_path = demo_base_path / "client.html" -server_path = demo_base_path / "server.py" - - -def main(): - if len(sys.argv) <= 1: - print("No command provided. Please specify a command (e.g. 'demo').") - return - - command = sys.argv[1] - - flags_list = sys.argv[2:] - - client_flag = "--client" in flags_list - server_flag = "--server" in flags_list - both_flags = "--server" in flags_list and "--client" in flags_list - - flags = dict( - client=not server_flag or both_flags, - server=not client_flag or both_flags, - ) - - if command == "demo": - if flags["client"]: - subprocess.run(["open", client_path]) - - if flags["server"]: - subprocess.run(["python", server_path]) - - else: - print(f"{command} is an invalid command.") - - -if __name__ == "__main__": - main() diff --git a/src/tqdm_publisher/demo/server.py b/src/tqdm_publisher/demo/server.py deleted file mode 100644 index abaa212..0000000 --- a/src/tqdm_publisher/demo/server.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python - -import asyncio -import json -import random -import threading -import time -from typing import List -from uuid import uuid4 - -import websockets - -from tqdm_publisher import TQDMPublisher - - -def generate_task_durations(n=100) -> List[float]: - return [random.uniform(0, 1.0) for _ in range(n)] - - -def start_progress_bar(id, callback): - durations = generate_task_durations() - progress_bar = TQDMPublisher(durations) - progress_bar.subscribe(lambda info: callback(id, info)) - for duration in progress_bar: - time.sleep(duration) - - -class WebSocketHandler: - def __init__(self): - self.clients = {} - pass - - async def send(self, id, data): - await self.clients[id].send(json.dumps(data)) - - async def handler(self, websocket): - identifier = str(uuid4()) - self.clients[identifier] = websocket # Register client connection - - def on_progress(id, info): - - asyncio.run(self.send(identifier, dict(id=id, payload=info))) - - try: - async for message in websocket: - - info = json.loads(message) - - if info["command"] == "start": - thread = threading.Thread(target=start_progress_bar, args=[info["id"], on_progress]) - thread.start() - - finally: - del self.clients[identifier] # This is called when the connection is closed - - -async def spawn_server(): - handler = WebSocketHandler().handler - async with websockets.serve(handler, "", 8000): - await asyncio.Future() - - -def main(): - asyncio.run(spawn_server()) - - -if __name__ == "__main__": - main() From b1550944dfe70909fc02c0ae945b42bba067437b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 11 Mar 2024 02:10:10 +0000 Subject: [PATCH 05/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_demo/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demo/_server.py b/src/tqdm_publisher/_demo/_server.py index dbd18ab..8a02380 100644 --- a/src/tqdm_publisher/_demo/_server.py +++ b/src/tqdm_publisher/_demo/_server.py @@ -2,7 +2,7 @@ import json import threading import time -from typing import Dict, Any, List +from typing import Any, Dict, List from uuid import uuid4 import websockets From c6852931ec6088e158d387fce230ae491ff6f9a0 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 11 Mar 2024 10:29:00 -0700 Subject: [PATCH 06/13] Update descriptions and types --- src/tqdm_publisher/_demo/_server.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/tqdm_publisher/_demo/_server.py b/src/tqdm_publisher/_demo/_server.py index 8a02380..284fc75 100644 --- a/src/tqdm_publisher/_demo/_server.py +++ b/src/tqdm_publisher/_demo/_server.py @@ -28,7 +28,6 @@ def run_function_on_progress_update(format_dict: dict) -> None: server-specific callback inside the local scope. In this demo, we will execute the `client_callback` whose protocol is known only to the WebSocketHandler. - It has """ client_callback(client_id=client_id, progress_bar_id=progress_bar_id, format_dict=format_dict) @@ -39,25 +38,34 @@ def run_function_on_progress_update(format_dict: dict) -> None: class WebSocketHandler: - """Describe this class.""" + """ + This is a class that handles the WebSocket connections and the communication protocol + between the server and the client. + + While we could have implemented this as a function, a class is chosen here to maintain reference + to the clients within a defined scope. + """ def __init__(self) -> None: """Initialize the mapping of client IDs to .""" self.clients: Dict[str, Any] = dict() - def function_to_run_on_progress_update(self, *, client_id: str, progress_bar_id: str, format_dict: dict) -> None: - """This is...""" + def forward_progress_to_client(self, *, client_id: str, progress_bar_id: str, format_dict: dict) -> None: + """This is the function that will run on every update of the TQDM object. It will forward the progress to the client.""" asyncio.run(self.send(client_id=client_id, data=dict(progress_bar_id=progress_bar_id, format_dict=format_dict))) async def send(self, client_id: str, data: dict) -> None: """Send an arbitrary JSON object `data` to the client identifier by `client_id`.""" await self.clients[client_id].send(json.dumps(obj=data)) - async def handler(self, websocket) -> None: - """Describe what the handler does.""" + async def handler(self, websocket: websockets.server.WebSocketServerProtocol) -> None: + """Register new WebSocket clients and handle their messages.""" client_id = str(uuid4()) - self.clients[client_id] = websocket # Register client connection + + # Register client connection + self.clients[client_id] = websocket + # Wait for messages from the client try: async for message in websocket: message_from_client = json.loads(message) @@ -68,11 +76,13 @@ async def handler(self, websocket) -> None: kwargs=dict( client_id=client_id, progress_bar_id=message_from_client["progress_bar_id"], - client_callback=self.function_to_run_on_progress_update, + client_callback=self.forward_progress_to_client, ), ) thread.start() - finally: # This is called when the connection is closed + + # Deregister the client when the connection is closed + finally: if client_id in self.clients: del self.clients[client_id] From 5ff8d04cd1f686d5a54f214d7b2a21ad5733f50b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:29:29 +0000 Subject: [PATCH 07/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_demo/_server.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/tqdm_publisher/_demo/_server.py b/src/tqdm_publisher/_demo/_server.py index 284fc75..eef2700 100644 --- a/src/tqdm_publisher/_demo/_server.py +++ b/src/tqdm_publisher/_demo/_server.py @@ -39,10 +39,10 @@ def run_function_on_progress_update(format_dict: dict) -> None: class WebSocketHandler: """ - This is a class that handles the WebSocket connections and the communication protocol - between the server and the client. - - While we could have implemented this as a function, a class is chosen here to maintain reference + This is a class that handles the WebSocket connections and the communication protocol + between the server and the client. + + While we could have implemented this as a function, a class is chosen here to maintain reference to the clients within a defined scope. """ @@ -61,9 +61,9 @@ async def send(self, client_id: str, data: dict) -> None: async def handler(self, websocket: websockets.server.WebSocketServerProtocol) -> None: """Register new WebSocket clients and handle their messages.""" client_id = str(uuid4()) - - # Register client connection - self.clients[client_id] = websocket + + # Register client connection + self.clients[client_id] = websocket # Wait for messages from the client try: @@ -82,7 +82,7 @@ async def handler(self, websocket: websockets.server.WebSocketServerProtocol) -> thread.start() # Deregister the client when the connection is closed - finally: + finally: if client_id in self.clients: del self.clients[client_id] From 8b0d408995ad8af1ff6a13396967688001e44895 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 11 Mar 2024 10:30:09 -0700 Subject: [PATCH 08/13] Update _server.py --- src/tqdm_publisher/_demo/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demo/_server.py b/src/tqdm_publisher/_demo/_server.py index 284fc75..8829fbe 100644 --- a/src/tqdm_publisher/_demo/_server.py +++ b/src/tqdm_publisher/_demo/_server.py @@ -58,7 +58,7 @@ async def send(self, client_id: str, data: dict) -> None: """Send an arbitrary JSON object `data` to the client identifier by `client_id`.""" await self.clients[client_id].send(json.dumps(obj=data)) - async def handler(self, websocket: websockets.server.WebSocketServerProtocol) -> None: + async def handler(self, websocket: websockets.WebSocketServerProtocol) -> None: """Register new WebSocket clients and handle their messages.""" client_id = str(uuid4()) From fc7e3cf3302ede9f8b04264ce886f58b605ff7bb Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 11 Mar 2024 10:37:17 -0700 Subject: [PATCH 09/13] Fix demo --- src/tqdm_publisher/_demo/_client.html | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/tqdm_publisher/_demo/_client.html b/src/tqdm_publisher/_demo/_client.html index 0e01ef3..65d60c4 100644 --- a/src/tqdm_publisher/_demo/_client.html +++ b/src/tqdm_publisher/_demo/_client.html @@ -120,9 +120,8 @@

tqdm_progress

onopen: () => console.log('Connected'), onclose: () => console.log('Disconnected'), onmessage: (data) => { - const { id, payload } = JSON.parse(event.data); - console.log(id, payload, bars[id]); - bars[id].style.width = 100 * (payload.n / payload.total) + '%'; + const { progress_bar_id, format_dict } = JSON.parse(event.data); + bars[progress_bar_id].style.width = 100 * (format_dict.n / format_dict.total) + '%'; } }); From 36aeaff968134705ee9ae9060a2f27e4d6be129a Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 11 Mar 2024 10:44:18 -0700 Subject: [PATCH 10/13] Document the frontend --- src/tqdm_publisher/_demo/_client.html | 80 +++------------------------ src/tqdm_publisher/_demo/_client.js | 72 ++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 73 deletions(-) create mode 100644 src/tqdm_publisher/_demo/_client.js diff --git a/src/tqdm_publisher/_demo/_client.html b/src/tqdm_publisher/_demo/_client.html index 65d60c4..bdd4d8e 100644 --- a/src/tqdm_publisher/_demo/_client.html +++ b/src/tqdm_publisher/_demo/_client.html @@ -12,6 +12,7 @@ Concurrent Client Demo + + +

tqdm_progress

Create multiple progress bars to test concurrent subscriptions
+ +
-
- -
- + +
- - - diff --git a/src/tqdm_publisher/_demo/_client.js b/src/tqdm_publisher/_demo/_client.js new file mode 100644 index 0000000..b078373 --- /dev/null +++ b/src/tqdm_publisher/_demo/_client.js @@ -0,0 +1,72 @@ + +// Grab bar container from HTML +const barContainer = document.querySelector('#bars'); + +// Create a progress bar and append it to the bar container +const createProgressBar = () => { + const element = document.createElement('div'); + element.classList.add('progress'); + const progress = document.createElement('div'); + element.appendChild(progress); + barContainer.appendChild(element); + return { element, progress }; +} + +// Create a simple WebSocket client wrapper class +class ProgressClient { + + #connect = (props = {}) => { + + const { + onopen = () => {}, + onclose = () => {}, + onmessage = () => {} + } = props; + + this.socket = new WebSocket('ws://localhost:8000'); + this.socket.addEventListener('open', onopen); + + // Attemp to reconnect every second if the connection is closed + this.socket.addEventListener('close', () => { + onclose(); + setTimeout(() => this.#connect(props), 1000); + }); + + this.socket.addEventListener('message', onmessage); + } + + constructor(props) { + this.#connect(props); + } + + close() { + this.socket.close(); + } + +} + + +const bars = {} // Track progress bars + + +// Update the specified progress bar when a message is received from the server +const onProgressUpdate = (event) => { + const { progress_bar_id, format_dict } = JSON.parse(event.data); + bars[progress_bar_id].style.width = 100 * (format_dict.n / format_dict.total) + '%'; +} + +// Create a new WebSocket client +const client = new ProgressClient({ onmessage: onProgressUpdate }); + +// Declare that the HTML Button should create a new progress bar when clicked +const button = document.querySelector('button'); +button.addEventListener('click', () => { + const { element, progress } = createProgressBar(); // Create a progress bar + + barContainer.appendChild(element); // Render the progress bar + + const progress_bar_id = Math.random().toString(36).substring(7); // Create a unique ID for the progress bar + bars[progress_bar_id] = progress; // Track the progress bar + + client.socket.send(JSON.stringify({ command: 'start', progress_bar_id })); // Send a message to the server to start the progress bar +}) From 38051fdb64d163e4de650e931cd88bfa063373d3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:45:05 +0000 Subject: [PATCH 11/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_demo/_client.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demo/_client.html b/src/tqdm_publisher/_demo/_client.html index bdd4d8e..0e514eb 100644 --- a/src/tqdm_publisher/_demo/_client.html +++ b/src/tqdm_publisher/_demo/_client.html @@ -70,6 +70,6 @@

tqdm_progress

-
+
From 9426dc57b4dca632ed9cd5171cd3f8dd1801aa6c Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:56:21 -0400 Subject: [PATCH 12/13] Update src/tqdm_publisher/_demo/_client.js --- src/tqdm_publisher/_demo/_client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demo/_client.js b/src/tqdm_publisher/_demo/_client.js index b078373..edf3670 100644 --- a/src/tqdm_publisher/_demo/_client.js +++ b/src/tqdm_publisher/_demo/_client.js @@ -26,7 +26,7 @@ class ProgressClient { this.socket = new WebSocket('ws://localhost:8000'); this.socket.addEventListener('open', onopen); - // Attemp to reconnect every second if the connection is closed + // Attempt to reconnect every second if the connection is closed this.socket.addEventListener('close', () => { onclose(); setTimeout(() => this.#connect(props), 1000); From d24f4a6ff15af10faaf5647ca3409b90d61148b0 Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Mon, 11 Mar 2024 14:24:19 -0400 Subject: [PATCH 13/13] Functional websocket declaration (#42) * Update _server.py * more keyword arguments; break up docstring; remove unused capture * more keyword arguments --------- Co-authored-by: Garrett Michael Flynn --- src/tqdm_publisher/_demo/_server.py | 79 ++++++++++------------------- 1 file changed, 27 insertions(+), 52 deletions(-) diff --git a/src/tqdm_publisher/_demo/_server.py b/src/tqdm_publisher/_demo/_server.py index 1a697ab..ac821dc 100644 --- a/src/tqdm_publisher/_demo/_server.py +++ b/src/tqdm_publisher/_demo/_server.py @@ -2,22 +2,20 @@ import json import threading import time -from typing import Any, Dict, List -from uuid import uuid4 import websockets -from tqdm_publisher import TQDMPublisher +import tqdm_publisher -def start_progress_bar(*, client_id: str, progress_bar_id: str, client_callback: callable) -> None: +def start_progress_bar(*, progress_bar_id: str, client_callback: callable) -> None: """ Emulate running the specified number of tasks by sleeping the specified amount of time on each iteration. Defaults are chosen for a deterministic and regular update period of one second for a total time of one minute. """ all_task_durations_in_seconds = [1.0 for _ in range(60)] # One minute at one second per update - progress_bar = TQDMPublisher(iterable=all_task_durations_in_seconds) + progress_bar = tqdm_publisher.TQDMPublisher(iterable=all_task_durations_in_seconds) def run_function_on_progress_update(format_dict: dict) -> None: """ @@ -29,7 +27,7 @@ def run_function_on_progress_update(format_dict: dict) -> None: In this demo, we will execute the `client_callback` whose protocol is known only to the WebSocketHandler. """ - client_callback(client_id=client_id, progress_bar_id=progress_bar_id, format_dict=format_dict) + client_callback(progress_bar_id=progress_bar_id, format_dict=format_dict) progress_bar.subscribe(callback=run_function_on_progress_update) @@ -37,60 +35,37 @@ def run_function_on_progress_update(format_dict: dict) -> None: time.sleep(task_duration) -class WebSocketHandler: - """ - This is a class that handles the WebSocket connections and the communication protocol - between the server and the client. - - While we could have implemented this as a function, a class is chosen here to maintain reference - to the clients within a defined scope. - """ - - def __init__(self) -> None: - """Initialize the mapping of client IDs to .""" - self.clients: Dict[str, Any] = dict() - - def forward_progress_to_client(self, *, client_id: str, progress_bar_id: str, format_dict: dict) -> None: - """This is the function that will run on every update of the TQDM object. It will forward the progress to the client.""" - asyncio.run(self.send(client_id=client_id, data=dict(progress_bar_id=progress_bar_id, format_dict=format_dict))) +async def handler(websocket: websockets.WebSocketServerProtocol) -> None: + """Handle messages from the client and manage the client connections.""" - async def send(self, client_id: str, data: dict) -> None: - """Send an arbitrary JSON object `data` to the client identifier by `client_id`.""" - await self.clients[client_id].send(json.dumps(obj=data)) - - async def handler(self, websocket: websockets.WebSocketServerProtocol) -> None: - """Register new WebSocket clients and handle their messages.""" - client_id = str(uuid4()) - - # Register client connection - self.clients[client_id] = websocket + def forward_progress_to_client(*, progress_bar_id: str, format_dict: dict) -> None: + """ + This is the function that will run on every update of the TQDM object. - # Wait for messages from the client - try: - async for message in websocket: - message_from_client = json.loads(message) + It will forward the progress to the client. + """ + asyncio.run( + websocket.send(message=json.dumps(obj=dict(progress_bar_id=progress_bar_id, format_dict=format_dict))) + ) - if message_from_client["command"] == "start": - thread = threading.Thread( - target=start_progress_bar, - kwargs=dict( - client_id=client_id, - progress_bar_id=message_from_client["progress_bar_id"], - client_callback=self.forward_progress_to_client, - ), - ) - thread.start() + # Wait for messages from the client + async for message in websocket: + message_from_client = json.loads(message) - # Deregister the client when the connection is closed - finally: - if client_id in self.clients: - del self.clients[client_id] + if message_from_client["command"] == "start": + thread = threading.Thread( + target=start_progress_bar, + kwargs=dict( + progress_bar_id=message_from_client["progress_bar_id"], + client_callback=forward_progress_to_client, + ), + ) + thread.start() async def spawn_server() -> None: """Spawn the server asynchronously.""" - handler = WebSocketHandler().handler - async with websockets.serve(handler, "", 8000): + async with websockets.serve(ws_handler=handler, host="", port=8000): await asyncio.Future()