Skip to content

Commit

Permalink
Add message history and retransmission (#3199)
Browse files Browse the repository at this point in the history
This PR attempts to resolve #3143 by adding a message history to
`outbox` and providing for the retransmission of missed messages in
order to resynchronize the client's state during a reconnection. If this
cannot be accomplished, a reload of the page is triggered. The goal of
this is to prevent a connected client's state from ever being out of
sync with the server.

For the auto-index page, a history duration of 30 seconds was
arbitrarily chosen. Since this value only determines when the UI is
updated through resending messages instead of a page reload, the UI
should stay properly synchronized regardless of this value.

For a `ui.page`, the history duration is computed based on the expected
lifetime of the `client` object. Currently, with the default
`reconnect_timeout = 3.0`, this is a max of 9 seconds. With this change,
a re-evaluation of this default could be warranted. Now that UI state
can be resynchronized indefinitely, discarding the user's page after
only 5-9s of disconnection seems premature. See
#3143 (comment)
for more.

---

Open tasks (October 24, 2024):

- [x] `message_history_length` isn't being used
- [x] handle reconnect when next message ID has already been pruned
- [x] fix failing pytests
- [x] fix test_no_object_duplication_on_index_client
- [x] Should the auto-index client reload when trying to reconnect
(because there is no message history)? -> No.
- [x] ack message to keep history short
- [x] thorough test
- [x] test On Air

---------

Co-authored-by: Falko Schindler <[email protected]>
Co-authored-by: Rodja Trappe <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 6cb7e39 commit 10bc9c0
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 24 deletions.
10 changes: 9 additions & 1 deletion nicegui/air.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def _handle_handshake(data: Dict[str, Any]) -> bool:
core.app.storage.copy_tab(data['old_tab_id'], data['tab_id'])
client.tab_id = data['tab_id']
client.on_air = True
client.handle_handshake()
client.handle_handshake(data.get('next_message_id'))
return True

@self.relay.on('client_disconnect')
Expand Down Expand Up @@ -178,6 +178,14 @@ def _handle_javascript_response(data: Dict[str, Any]) -> None:
client = Client.instances[client_id]
client.handle_javascript_response(data['msg'])

@self.relay.on('ack')
def _handle_ack(data: Dict[str, Any]) -> None:
client_id = data['client_id']
if client_id not in Client.instances:
return
client = Client.instances[client_id]
client.outbox.prune_history(data['msg']['next_message_id'])

@self.relay.on('out_of_time')
async def _handle_out_of_time() -> None:
print('Sorry, you have reached the time limit of this NiceGUI On Air preview.', flush=True)
Expand Down
3 changes: 3 additions & 0 deletions nicegui/app/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AppConfig:
language: Language = field(init=False)
binding_refresh_interval: float = field(init=False)
reconnect_timeout: float = field(init=False)
message_history_length: int = field(init=False)
tailwind: bool = field(init=False)
prod_js: bool = field(init=False)
show_welcome_message: bool = field(init=False)
Expand All @@ -47,6 +48,7 @@ def add_run_config(self,
language: Language,
binding_refresh_interval: float,
reconnect_timeout: float,
message_history_length: int,
tailwind: bool,
prod_js: bool,
show_welcome_message: bool,
Expand All @@ -60,6 +62,7 @@ def add_run_config(self,
self.language = language
self.binding_refresh_interval = binding_refresh_interval
self.reconnect_timeout = reconnect_timeout
self.message_history_length = message_history_length
self.tailwind = tailwind
self.prod_js = prod_js
self.show_welcome_message = show_welcome_message
Expand Down
18 changes: 10 additions & 8 deletions nicegui/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, page: page, *, request: Optional[Request]) -> None:
self._deleted = False
self.tab_id: Optional[str] = None

self.page = page
self.outbox = Outbox(self)

with Element('q-layout', _client=self).props('view="hhh lpr fff"').classes('nicegui-layout') as self.layout:
Expand All @@ -75,7 +76,6 @@ def __init__(self, page: page, *, request: Optional[Request]) -> None:
self._head_html = ''
self._body_html = ''

self.page = page
self.storage = ObservableDict()

self.connect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
Expand Down Expand Up @@ -123,7 +123,11 @@ def build_response(self, request: Request, status_code: int = 200) -> Response:
elements = json.dumps({
id: element._to_dict() for id, element in self.elements.items() # pylint: disable=protected-access
})
socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, 'client_id': self.id}
socket_io_js_query_params = {
**core.app.config.socket_io_js_query_params,
'client_id': self.id,
'next_message_id': self.outbox.next_message_id,
}
vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values())
return templates.TemplateResponse(
request=request,
Expand Down Expand Up @@ -227,8 +231,10 @@ def on_disconnect(self, handler: Union[Callable[..., Any], Awaitable]) -> None:
"""Add a callback to be invoked when the client disconnects."""
self.disconnect_handlers.append(handler)

def handle_handshake(self) -> None:
def handle_handshake(self, next_message_id: Optional[int]) -> None:
"""Cancel pending disconnect task and invoke connect handlers."""
if next_message_id is not None:
self.outbox.try_rewind(next_message_id)
if self._disconnect_task:
self._disconnect_task.cancel()
self._disconnect_task = None
Expand All @@ -241,11 +247,7 @@ def handle_handshake(self) -> None:
def handle_disconnect(self) -> None:
"""Wait for the browser to reconnect; invoke disconnect handlers if it doesn't."""
async def handle_disconnect() -> None:
if self.page.reconnect_timeout is not None:
delay = self.page.reconnect_timeout
else:
delay = core.app.config.reconnect_timeout # pylint: disable=protected-access
await asyncio.sleep(delay)
await asyncio.sleep(self.page.resolve_reconnect_timeout())
for t in self.disconnect_handlers:
self.safe_invoke(t)
for t in core.app._disconnect_handlers: # pylint: disable=protected-access
Expand Down
14 changes: 11 additions & 3 deletions nicegui/nicegui.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import urllib.parse
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Dict
from typing import Any, Dict

import socketio
from fastapi import HTTPException, Request
Expand Down Expand Up @@ -163,7 +163,7 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp


@sio.on('handshake')
async def _on_handshake(sid: str, data: Dict[str, str]) -> bool:
async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool:
client = Client.instances.get(data['client_id'])
if not client:
return False
Expand All @@ -175,7 +175,7 @@ async def _on_handshake(sid: str, data: Dict[str, str]) -> bool:
else:
client.environ = sio.get_environ(sid)
await sio.enter_room(sid, client.id)
client.handle_handshake()
client.handle_handshake(data.get('next_message_id'))
return True


Expand Down Expand Up @@ -203,3 +203,11 @@ def _on_javascript_response(_: str, msg: Dict) -> None:
if not client:
return
client.handle_javascript_response(msg)


@sio.on('ack')
def _on_ack(_: str, msg: Dict) -> None:
client = Client.instances.get(msg['client_id'])
if not client:
return
client.outbox.prune_history(msg['next_message_id'])
68 changes: 58 additions & 10 deletions nicegui/outbox.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import time
from collections import deque
from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple

Expand All @@ -10,10 +11,16 @@
from .client import Client
from .element import Element

ClientId = str
ElementId = int

ClientId = str
MessageType = str
Message = Tuple[ClientId, MessageType, Any]
Payload = Any
Message = Tuple[ClientId, MessageType, Payload]

MessageId = int
MessageTime = float
HistoryEntry = Tuple[MessageId, MessageTime, Message]


class Outbox:
Expand All @@ -22,8 +29,12 @@ def __init__(self, client: Client) -> None:
self.client = client
self.updates: Dict[ElementId, Optional[Element]] = {}
self.messages: Deque[Message] = deque()
self.message_history: Deque[HistoryEntry] = deque()
self.next_message_id: int = 0

self._should_stop = False
self._enqueue_event: Optional[asyncio.Event] = None

if core.app.is_started:
background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
else:
Expand All @@ -46,7 +57,7 @@ def enqueue_delete(self, element: Element) -> None:
self.updates[element.id] = None
self._set_enqueue_event()

def enqueue_message(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
def enqueue_message(self, message_type: MessageType, data: Payload, target_id: ClientId) -> None:
"""Enqueue a message for the given client."""
self.client.check_existence()
self.messages.append((target_id, message_type, data))
Expand Down Expand Up @@ -77,12 +88,12 @@ async def loop(self) -> None:
element_id: None if element is None else element._to_dict() # pylint: disable=protected-access
for element_id, element in self.updates.items()
}
coros.append(self._emit('update', data, self.client.id))
coros.append(self._emit((self.client.id, 'update', data)))
self.updates.clear()

if self.messages:
for target_id, message_type, data in self.messages:
coros.append(self._emit(message_type, data, target_id))
for message in self.messages:
coros.append(self._emit(message))
self.messages.clear()

for coro in coros:
Expand All @@ -95,10 +106,47 @@ async def loop(self) -> None:
core.app.handle_exception(e)
await asyncio.sleep(0.1)

async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
await core.sio.emit(message_type, data, room=target_id)
if core.air is not None and core.air.is_air_target(target_id):
await core.air.emit(message_type, data, room=target_id)
async def _emit(self, message: Message) -> None:
client_id, message_type, data = message
data['_id'] = self.next_message_id

await core.sio.emit(message_type, data, room=client_id)
if core.air is not None and core.air.is_air_target(client_id):
await core.air.emit(message_type, data, room=client_id)

if not self.client.shared:
self.message_history.append((self.next_message_id, time.time(), message))
max_age = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout()
while self.message_history and self.message_history[0][1] < time.time() - max_age:
self.message_history.popleft()
while len(self.message_history) > core.app.config.message_history_length:
self.message_history.popleft()

self.next_message_id += 1

def try_rewind(self, target_message_id: MessageId) -> None:
"""Rewind to the given message ID and discard all messages before it."""
# nothing to do, the next message ID is already the target message ID
if self.next_message_id == target_message_id:
return

# rewind to the target message ID
while self.message_history:
self.next_message_id, _, message = self.message_history.pop()
self.messages.appendleft(message)
if self.next_message_id == target_message_id:
self.message_history.clear()
self._set_enqueue_event()
return

# target message ID not found, reload the page
if not self.client.shared:
self.client.run_javascript('window.location.reload()')

def prune_history(self, next_message_id: MessageId) -> None:
"""Prune the message history up to the given message ID."""
while self.message_history and self.message_history[0][0] < next_message_id:
self.message_history.popleft()

def stop(self) -> None:
"""Stop the outbox loop."""
Expand Down
6 changes: 5 additions & 1 deletion nicegui/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self,
:param dark: whether to use Quasar's dark mode (defaults to `dark` argument of `run` command)
:param language: language of the page (defaults to `language` argument of `run` command)
:param response_timeout: maximum time for the decorated function to build the page (default: 3.0 seconds)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 0.0 seconds)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (defaults to `reconnect_timeout` argument of `run` command))
:param api_router: APIRouter instance to use, can be left `None` to use the default
:param kwargs: additional keyword arguments passed to FastAPI's @app.get method
"""
Expand Down Expand Up @@ -94,6 +94,10 @@ def resolve_language(self) -> Optional[str]:
"""Return the language of the page."""
return self.language if self.language is not ... else core.app.config.language

def resolve_reconnect_timeout(self) -> float:
"""Return the reconnect_timeout of the page."""
return self.reconnect_timeout if self.reconnect_timeout is not None else core.app.config.reconnect_timeout

def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
core.app.remove_route(self.path) # NOTE make sure only the latest route definition is used
parameters_of_decorated_func = list(inspect.signature(func).parameters.keys())
Expand Down
21 changes: 20 additions & 1 deletion nicegui/static/nicegui.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@ function download(src, filename, mediaType, prefix) {
}
}

function ack() {
if (window.ackedMessageId >= window.nextMessageId) return;
window.socket.emit("ack", {
client_id: window.clientId,
next_message_id: window.nextMessageId,
});
window.ackedMessageId = window.nextMessageId;
}

async function loadDependencies(element, prefix, version) {
if (element.component) {
const { name, key, tag } = element.component;
Expand Down Expand Up @@ -310,6 +319,7 @@ window.onbeforeunload = function () {

function createApp(elements, options) {
replaceUndefinedAttributes(elements, 0);
setInterval(() => ack(), 3000);
return (app = Vue.createApp({
data() {
return {
Expand All @@ -324,6 +334,8 @@ function createApp(elements, options) {
window.clientId = options.query.client_id;
const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host;
window.path_prefix = options.prefix;
window.nextMessageId = options.query.next_message_id;
window.ackedMessageId = -1;
window.socket = io(url, {
path: `${options.prefix}/_nicegui_ws/socket.io`,
query: options.query,
Expand All @@ -337,6 +349,7 @@ function createApp(elements, options) {
client_id: window.clientId,
tab_id: TAB_ID,
old_tab_id: OLD_TAB_ID,
next_message_id: window.nextMessageId,
};
window.socket.emit("handshake", args, (ok) => {
if (!ok) {
Expand Down Expand Up @@ -375,7 +388,7 @@ function createApp(elements, options) {
replaceUndefinedAttributes(this.elements, id);
}
},
run_javascript: (msg) => runJavascript(msg["code"], msg["request_id"]),
run_javascript: (msg) => runJavascript(msg.code, msg.request_id),
open: (msg) => {
const url = msg.path.startsWith("/") ? options.prefix + msg.path : msg.path;
const target = msg.new_tab ? "_blank" : "_self";
Expand All @@ -388,6 +401,12 @@ function createApp(elements, options) {
let isProcessingSocketMessage = false;
for (const [event, handler] of Object.entries(messageHandlers)) {
window.socket.on(event, async (...args) => {
if (args.length > 0 && args[0]._id !== undefined) {
const message_id = args[0]._id;
if (message_id < window.nextMessageId) return;
window.nextMessageId = message_id + 1;
delete args[0]._id;
}
socketMessageQueue.push(() => handler(...args));
if (!isProcessingSocketMessage) {
while (socketMessageQueue.length > 0) {
Expand Down
1 change: 1 addition & 0 deletions nicegui/testing/general_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def prepare_simulation(request: pytest.FixtureRequest) -> None:
language='en-US',
binding_refresh_interval=0.1,
reconnect_timeout=3.0,
message_history_length=1000,
tailwind=True,
prod_js=True,
show_welcome_message=False,
Expand Down
3 changes: 3 additions & 0 deletions nicegui/ui_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def run(*,
language: Language = 'en-US',
binding_refresh_interval: float = 0.1,
reconnect_timeout: float = 3.0,
message_history_length: int = 1000,
fastapi_docs: Union[bool, DocsConfig] = False,
show: bool = True,
on_air: Optional[Union[str, Literal[True]]] = None,
Expand Down Expand Up @@ -86,6 +87,7 @@ def run(*,
:param language: language for Quasar elements (default: `'en-US'`)
:param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds)
:param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable)
:param fastapi_docs: enable FastAPI's automatic documentation with Swagger UI, ReDoc, and OpenAPI JSON (bool or dictionary as described `here<https://fastapi.tiangolo.com/tutorial/metadata/>`_, default: `False`)
:param show: automatically open the UI in a browser tab (default: `True`)
:param on_air: tech preview: `allows temporary remote access <https://nicegui.io/documentation/section_configuration_deployment#nicegui_on_air>`_ if set to `True` (default: disabled)
Expand Down Expand Up @@ -114,6 +116,7 @@ def run(*,
language=language,
binding_refresh_interval=binding_refresh_interval,
reconnect_timeout=reconnect_timeout,
message_history_length=message_history_length,
tailwind=tailwind,
prod_js=prod_js,
show_welcome_message=show_welcome_message,
Expand Down
3 changes: 3 additions & 0 deletions nicegui/ui_run_with.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def run_with(
language: Language = 'en-US',
binding_refresh_interval: float = 0.1,
reconnect_timeout: float = 3.0,
message_history_length: int = 1000,
mount_path: str = '/',
on_air: Optional[Union[str, Literal[True]]] = None,
tailwind: bool = True,
Expand All @@ -36,6 +37,7 @@ def run_with(
:param language: language for Quasar elements (default: `'en-US'`)
:param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds)
:param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable)
:param mount_path: mount NiceGUI at this path (default: `'/'`)
:param on_air: tech preview: `allows temporary remote access <https://nicegui.io/documentation/section_configuration_deployment#nicegui_on_air>`_ if set to `True` (default: disabled)
:param tailwind: whether to use Tailwind CSS (experimental, default: `True`)
Expand All @@ -52,6 +54,7 @@ def run_with(
language=language,
binding_refresh_interval=binding_refresh_interval,
reconnect_timeout=reconnect_timeout,
message_history_length=message_history_length,
tailwind=tailwind,
prod_js=prod_js,
show_welcome_message=show_welcome_message,
Expand Down

0 comments on commit 10bc9c0

Please sign in to comment.