From 10bc9c0f24979ca1cc177f45a44750427a3be08a Mon Sep 17 00:00:00 2001 From: Aaron Fuller <159853563+afullerx@users.noreply.github.com> Date: Thu, 19 Dec 2024 05:32:56 -0500 Subject: [PATCH] Add message history and retransmission (#3199) This PR attempts to resolve #3143 by adding a message history to `outbox` and providing for the retransmission of missed messages in order to resynchronize the client's state during a reconnection. If this cannot be accomplished, a reload of the page is triggered. The goal of this is to prevent a connected client's state from ever being out of sync with the server. For the auto-index page, a history duration of 30 seconds was arbitrarily chosen. Since this value only determines when the UI is updated through resending messages instead of a page reload, the UI should stay properly synchronized regardless of this value. For a `ui.page`, the history duration is computed based on the expected lifetime of the `client` object. Currently, with the default `reconnect_timeout = 3.0`, this is a max of 9 seconds. With this change, a re-evaluation of this default could be warranted. Now that UI state can be resynchronized indefinitely, discarding the user's page after only 5-9s of disconnection seems premature. See https://github.com/zauberzeug/nicegui/issues/3143#issuecomment-2151376569 for more. --- Open tasks (October 24, 2024): - [x] `message_history_length` isn't being used - [x] handle reconnect when next message ID has already been pruned - [x] fix failing pytests - [x] fix test_no_object_duplication_on_index_client - [x] Should the auto-index client reload when trying to reconnect (because there is no message history)? -> No. - [x] ack message to keep history short - [x] thorough test - [x] test On Air --------- Co-authored-by: Falko Schindler Co-authored-by: Rodja Trappe --- nicegui/air.py | 10 ++++- nicegui/app/app_config.py | 3 ++ nicegui/client.py | 18 ++++---- nicegui/nicegui.py | 14 ++++-- nicegui/outbox.py | 68 ++++++++++++++++++++++++----- nicegui/page.py | 6 ++- nicegui/static/nicegui.js | 21 ++++++++- nicegui/testing/general_fixtures.py | 1 + nicegui/ui_run.py | 3 ++ nicegui/ui_run_with.py | 3 ++ 10 files changed, 123 insertions(+), 24 deletions(-) diff --git a/nicegui/air.py b/nicegui/air.py index a4b37a972..ffa8233f0 100644 --- a/nicegui/air.py +++ b/nicegui/air.py @@ -134,7 +134,7 @@ def _handle_handshake(data: Dict[str, Any]) -> bool: core.app.storage.copy_tab(data['old_tab_id'], data['tab_id']) client.tab_id = data['tab_id'] client.on_air = True - client.handle_handshake() + client.handle_handshake(data.get('next_message_id')) return True @self.relay.on('client_disconnect') @@ -178,6 +178,14 @@ def _handle_javascript_response(data: Dict[str, Any]) -> None: client = Client.instances[client_id] client.handle_javascript_response(data['msg']) + @self.relay.on('ack') + def _handle_ack(data: Dict[str, Any]) -> None: + client_id = data['client_id'] + if client_id not in Client.instances: + return + client = Client.instances[client_id] + client.outbox.prune_history(data['msg']['next_message_id']) + @self.relay.on('out_of_time') async def _handle_out_of_time() -> None: print('Sorry, you have reached the time limit of this NiceGUI On Air preview.', flush=True) diff --git a/nicegui/app/app_config.py b/nicegui/app/app_config.py index c36e8c134..579d586d8 100644 --- a/nicegui/app/app_config.py +++ b/nicegui/app/app_config.py @@ -32,6 +32,7 @@ class AppConfig: language: Language = field(init=False) binding_refresh_interval: float = field(init=False) reconnect_timeout: float = field(init=False) + message_history_length: int = field(init=False) tailwind: bool = field(init=False) prod_js: bool = field(init=False) show_welcome_message: bool = field(init=False) @@ -47,6 +48,7 @@ def add_run_config(self, language: Language, binding_refresh_interval: float, reconnect_timeout: float, + message_history_length: int, tailwind: bool, prod_js: bool, show_welcome_message: bool, @@ -60,6 +62,7 @@ def add_run_config(self, self.language = language self.binding_refresh_interval = binding_refresh_interval self.reconnect_timeout = reconnect_timeout + self.message_history_length = message_history_length self.tailwind = tailwind self.prod_js = prod_js self.show_welcome_message = show_welcome_message diff --git a/nicegui/client.py b/nicegui/client.py index b1e883927..384316fb4 100644 --- a/nicegui/client.py +++ b/nicegui/client.py @@ -63,6 +63,7 @@ def __init__(self, page: page, *, request: Optional[Request]) -> None: self._deleted = False self.tab_id: Optional[str] = None + self.page = page self.outbox = Outbox(self) with Element('q-layout', _client=self).props('view="hhh lpr fff"').classes('nicegui-layout') as self.layout: @@ -75,7 +76,6 @@ def __init__(self, page: page, *, request: Optional[Request]) -> None: self._head_html = '' self._body_html = '' - self.page = page self.storage = ObservableDict() self.connect_handlers: List[Union[Callable[..., Any], Awaitable]] = [] @@ -123,7 +123,11 @@ def build_response(self, request: Request, status_code: int = 200) -> Response: elements = json.dumps({ id: element._to_dict() for id, element in self.elements.items() # pylint: disable=protected-access }) - socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, 'client_id': self.id} + socket_io_js_query_params = { + **core.app.config.socket_io_js_query_params, + 'client_id': self.id, + 'next_message_id': self.outbox.next_message_id, + } vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values()) return templates.TemplateResponse( request=request, @@ -227,8 +231,10 @@ def on_disconnect(self, handler: Union[Callable[..., Any], Awaitable]) -> None: """Add a callback to be invoked when the client disconnects.""" self.disconnect_handlers.append(handler) - def handle_handshake(self) -> None: + def handle_handshake(self, next_message_id: Optional[int]) -> None: """Cancel pending disconnect task and invoke connect handlers.""" + if next_message_id is not None: + self.outbox.try_rewind(next_message_id) if self._disconnect_task: self._disconnect_task.cancel() self._disconnect_task = None @@ -241,11 +247,7 @@ def handle_handshake(self) -> None: def handle_disconnect(self) -> None: """Wait for the browser to reconnect; invoke disconnect handlers if it doesn't.""" async def handle_disconnect() -> None: - if self.page.reconnect_timeout is not None: - delay = self.page.reconnect_timeout - else: - delay = core.app.config.reconnect_timeout # pylint: disable=protected-access - await asyncio.sleep(delay) + await asyncio.sleep(self.page.resolve_reconnect_timeout()) for t in self.disconnect_handlers: self.safe_invoke(t) for t in core.app._disconnect_handlers: # pylint: disable=protected-access diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index c08115465..893b4f81b 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -3,7 +3,7 @@ import urllib.parse from contextlib import asynccontextmanager from pathlib import Path -from typing import Dict +from typing import Any, Dict import socketio from fastapi import HTTPException, Request @@ -163,7 +163,7 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp @sio.on('handshake') -async def _on_handshake(sid: str, data: Dict[str, str]) -> bool: +async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool: client = Client.instances.get(data['client_id']) if not client: return False @@ -175,7 +175,7 @@ async def _on_handshake(sid: str, data: Dict[str, str]) -> bool: else: client.environ = sio.get_environ(sid) await sio.enter_room(sid, client.id) - client.handle_handshake() + client.handle_handshake(data.get('next_message_id')) return True @@ -203,3 +203,11 @@ def _on_javascript_response(_: str, msg: Dict) -> None: if not client: return client.handle_javascript_response(msg) + + +@sio.on('ack') +def _on_ack(_: str, msg: Dict) -> None: + client = Client.instances.get(msg['client_id']) + if not client: + return + client.outbox.prune_history(msg['next_message_id']) diff --git a/nicegui/outbox.py b/nicegui/outbox.py index b65a759bf..18d19e5b1 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import time from collections import deque from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple @@ -10,10 +11,16 @@ from .client import Client from .element import Element -ClientId = str ElementId = int + +ClientId = str MessageType = str -Message = Tuple[ClientId, MessageType, Any] +Payload = Any +Message = Tuple[ClientId, MessageType, Payload] + +MessageId = int +MessageTime = float +HistoryEntry = Tuple[MessageId, MessageTime, Message] class Outbox: @@ -22,8 +29,12 @@ def __init__(self, client: Client) -> None: self.client = client self.updates: Dict[ElementId, Optional[Element]] = {} self.messages: Deque[Message] = deque() + self.message_history: Deque[HistoryEntry] = deque() + self.next_message_id: int = 0 + self._should_stop = False self._enqueue_event: Optional[asyncio.Event] = None + if core.app.is_started: background_tasks.create(self.loop(), name=f'outbox loop {client.id}') else: @@ -46,7 +57,7 @@ def enqueue_delete(self, element: Element) -> None: self.updates[element.id] = None self._set_enqueue_event() - def enqueue_message(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: + def enqueue_message(self, message_type: MessageType, data: Payload, target_id: ClientId) -> None: """Enqueue a message for the given client.""" self.client.check_existence() self.messages.append((target_id, message_type, data)) @@ -77,12 +88,12 @@ async def loop(self) -> None: element_id: None if element is None else element._to_dict() # pylint: disable=protected-access for element_id, element in self.updates.items() } - coros.append(self._emit('update', data, self.client.id)) + coros.append(self._emit((self.client.id, 'update', data))) self.updates.clear() if self.messages: - for target_id, message_type, data in self.messages: - coros.append(self._emit(message_type, data, target_id)) + for message in self.messages: + coros.append(self._emit(message)) self.messages.clear() for coro in coros: @@ -95,10 +106,47 @@ async def loop(self) -> None: core.app.handle_exception(e) await asyncio.sleep(0.1) - async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: - await core.sio.emit(message_type, data, room=target_id) - if core.air is not None and core.air.is_air_target(target_id): - await core.air.emit(message_type, data, room=target_id) + async def _emit(self, message: Message) -> None: + client_id, message_type, data = message + data['_id'] = self.next_message_id + + await core.sio.emit(message_type, data, room=client_id) + if core.air is not None and core.air.is_air_target(client_id): + await core.air.emit(message_type, data, room=client_id) + + if not self.client.shared: + self.message_history.append((self.next_message_id, time.time(), message)) + max_age = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout() + while self.message_history and self.message_history[0][1] < time.time() - max_age: + self.message_history.popleft() + while len(self.message_history) > core.app.config.message_history_length: + self.message_history.popleft() + + self.next_message_id += 1 + + def try_rewind(self, target_message_id: MessageId) -> None: + """Rewind to the given message ID and discard all messages before it.""" + # nothing to do, the next message ID is already the target message ID + if self.next_message_id == target_message_id: + return + + # rewind to the target message ID + while self.message_history: + self.next_message_id, _, message = self.message_history.pop() + self.messages.appendleft(message) + if self.next_message_id == target_message_id: + self.message_history.clear() + self._set_enqueue_event() + return + + # target message ID not found, reload the page + if not self.client.shared: + self.client.run_javascript('window.location.reload()') + + def prune_history(self, next_message_id: MessageId) -> None: + """Prune the message history up to the given message ID.""" + while self.message_history and self.message_history[0][0] < next_message_id: + self.message_history.popleft() def stop(self) -> None: """Stop the outbox loop.""" diff --git a/nicegui/page.py b/nicegui/page.py index f6493ba80..c4f46d249 100644 --- a/nicegui/page.py +++ b/nicegui/page.py @@ -56,7 +56,7 @@ def __init__(self, :param dark: whether to use Quasar's dark mode (defaults to `dark` argument of `run` command) :param language: language of the page (defaults to `language` argument of `run` command) :param response_timeout: maximum time for the decorated function to build the page (default: 3.0 seconds) - :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 0.0 seconds) + :param reconnect_timeout: maximum time the server waits for the browser to reconnect (defaults to `reconnect_timeout` argument of `run` command)) :param api_router: APIRouter instance to use, can be left `None` to use the default :param kwargs: additional keyword arguments passed to FastAPI's @app.get method """ @@ -94,6 +94,10 @@ def resolve_language(self) -> Optional[str]: """Return the language of the page.""" return self.language if self.language is not ... else core.app.config.language + def resolve_reconnect_timeout(self) -> float: + """Return the reconnect_timeout of the page.""" + return self.reconnect_timeout if self.reconnect_timeout is not None else core.app.config.reconnect_timeout + def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]: core.app.remove_route(self.path) # NOTE make sure only the latest route definition is used parameters_of_decorated_func = list(inspect.signature(func).parameters.keys()) diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index bc92ef26f..7c86d490c 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -269,6 +269,15 @@ function download(src, filename, mediaType, prefix) { } } +function ack() { + if (window.ackedMessageId >= window.nextMessageId) return; + window.socket.emit("ack", { + client_id: window.clientId, + next_message_id: window.nextMessageId, + }); + window.ackedMessageId = window.nextMessageId; +} + async function loadDependencies(element, prefix, version) { if (element.component) { const { name, key, tag } = element.component; @@ -310,6 +319,7 @@ window.onbeforeunload = function () { function createApp(elements, options) { replaceUndefinedAttributes(elements, 0); + setInterval(() => ack(), 3000); return (app = Vue.createApp({ data() { return { @@ -324,6 +334,8 @@ function createApp(elements, options) { window.clientId = options.query.client_id; const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host; window.path_prefix = options.prefix; + window.nextMessageId = options.query.next_message_id; + window.ackedMessageId = -1; window.socket = io(url, { path: `${options.prefix}/_nicegui_ws/socket.io`, query: options.query, @@ -337,6 +349,7 @@ function createApp(elements, options) { client_id: window.clientId, tab_id: TAB_ID, old_tab_id: OLD_TAB_ID, + next_message_id: window.nextMessageId, }; window.socket.emit("handshake", args, (ok) => { if (!ok) { @@ -375,7 +388,7 @@ function createApp(elements, options) { replaceUndefinedAttributes(this.elements, id); } }, - run_javascript: (msg) => runJavascript(msg["code"], msg["request_id"]), + run_javascript: (msg) => runJavascript(msg.code, msg.request_id), open: (msg) => { const url = msg.path.startsWith("/") ? options.prefix + msg.path : msg.path; const target = msg.new_tab ? "_blank" : "_self"; @@ -388,6 +401,12 @@ function createApp(elements, options) { let isProcessingSocketMessage = false; for (const [event, handler] of Object.entries(messageHandlers)) { window.socket.on(event, async (...args) => { + if (args.length > 0 && args[0]._id !== undefined) { + const message_id = args[0]._id; + if (message_id < window.nextMessageId) return; + window.nextMessageId = message_id + 1; + delete args[0]._id; + } socketMessageQueue.push(() => handler(...args)); if (!isProcessingSocketMessage) { while (socketMessageQueue.length > 0) { diff --git a/nicegui/testing/general_fixtures.py b/nicegui/testing/general_fixtures.py index a7205c8fc..f03f96c41 100644 --- a/nicegui/testing/general_fixtures.py +++ b/nicegui/testing/general_fixtures.py @@ -91,6 +91,7 @@ def prepare_simulation(request: pytest.FixtureRequest) -> None: language='en-US', binding_refresh_interval=0.1, reconnect_timeout=3.0, + message_history_length=1000, tailwind=True, prod_js=True, show_welcome_message=False, diff --git a/nicegui/ui_run.py b/nicegui/ui_run.py index 31d9d3fc6..708fd87ba 100644 --- a/nicegui/ui_run.py +++ b/nicegui/ui_run.py @@ -53,6 +53,7 @@ def run(*, language: Language = 'en-US', binding_refresh_interval: float = 0.1, reconnect_timeout: float = 3.0, + message_history_length: int = 1000, fastapi_docs: Union[bool, DocsConfig] = False, show: bool = True, on_air: Optional[Union[str, Literal[True]]] = None, @@ -86,6 +87,7 @@ def run(*, :param language: language for Quasar elements (default: `'en-US'`) :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly) :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds) + :param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable) :param fastapi_docs: enable FastAPI's automatic documentation with Swagger UI, ReDoc, and OpenAPI JSON (bool or dictionary as described `here`_, default: `False`) :param show: automatically open the UI in a browser tab (default: `True`) :param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled) @@ -114,6 +116,7 @@ def run(*, language=language, binding_refresh_interval=binding_refresh_interval, reconnect_timeout=reconnect_timeout, + message_history_length=message_history_length, tailwind=tailwind, prod_js=prod_js, show_welcome_message=show_welcome_message, diff --git a/nicegui/ui_run_with.py b/nicegui/ui_run_with.py index 8d69e2bba..a7a4e8640 100644 --- a/nicegui/ui_run_with.py +++ b/nicegui/ui_run_with.py @@ -19,6 +19,7 @@ def run_with( language: Language = 'en-US', binding_refresh_interval: float = 0.1, reconnect_timeout: float = 3.0, + message_history_length: int = 1000, mount_path: str = '/', on_air: Optional[Union[str, Literal[True]]] = None, tailwind: bool = True, @@ -36,6 +37,7 @@ def run_with( :param language: language for Quasar elements (default: `'en-US'`) :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly) :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds) + :param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable) :param mount_path: mount NiceGUI at this path (default: `'/'`) :param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled) :param tailwind: whether to use Tailwind CSS (experimental, default: `True`) @@ -52,6 +54,7 @@ def run_with( language=language, binding_refresh_interval=binding_refresh_interval, reconnect_timeout=reconnect_timeout, + message_history_length=message_history_length, tailwind=tailwind, prod_js=prod_js, show_welcome_message=show_welcome_message,