diff --git a/nicegui/air.py b/nicegui/air.py
index a4b37a972..ffa8233f0 100644
--- a/nicegui/air.py
+++ b/nicegui/air.py
@@ -134,7 +134,7 @@ def _handle_handshake(data: Dict[str, Any]) -> bool:
core.app.storage.copy_tab(data['old_tab_id'], data['tab_id'])
client.tab_id = data['tab_id']
client.on_air = True
- client.handle_handshake()
+ client.handle_handshake(data.get('next_message_id'))
return True
@self.relay.on('client_disconnect')
@@ -178,6 +178,14 @@ def _handle_javascript_response(data: Dict[str, Any]) -> None:
client = Client.instances[client_id]
client.handle_javascript_response(data['msg'])
+ @self.relay.on('ack')
+ def _handle_ack(data: Dict[str, Any]) -> None:
+ client_id = data['client_id']
+ if client_id not in Client.instances:
+ return
+ client = Client.instances[client_id]
+ client.outbox.prune_history(data['msg']['next_message_id'])
+
@self.relay.on('out_of_time')
async def _handle_out_of_time() -> None:
print('Sorry, you have reached the time limit of this NiceGUI On Air preview.', flush=True)
diff --git a/nicegui/app/app_config.py b/nicegui/app/app_config.py
index c36e8c134..579d586d8 100644
--- a/nicegui/app/app_config.py
+++ b/nicegui/app/app_config.py
@@ -32,6 +32,7 @@ class AppConfig:
language: Language = field(init=False)
binding_refresh_interval: float = field(init=False)
reconnect_timeout: float = field(init=False)
+ message_history_length: int = field(init=False)
tailwind: bool = field(init=False)
prod_js: bool = field(init=False)
show_welcome_message: bool = field(init=False)
@@ -47,6 +48,7 @@ def add_run_config(self,
language: Language,
binding_refresh_interval: float,
reconnect_timeout: float,
+ message_history_length: int,
tailwind: bool,
prod_js: bool,
show_welcome_message: bool,
@@ -60,6 +62,7 @@ def add_run_config(self,
self.language = language
self.binding_refresh_interval = binding_refresh_interval
self.reconnect_timeout = reconnect_timeout
+ self.message_history_length = message_history_length
self.tailwind = tailwind
self.prod_js = prod_js
self.show_welcome_message = show_welcome_message
diff --git a/nicegui/client.py b/nicegui/client.py
index b1e883927..384316fb4 100644
--- a/nicegui/client.py
+++ b/nicegui/client.py
@@ -63,6 +63,7 @@ def __init__(self, page: page, *, request: Optional[Request]) -> None:
self._deleted = False
self.tab_id: Optional[str] = None
+ self.page = page
self.outbox = Outbox(self)
with Element('q-layout', _client=self).props('view="hhh lpr fff"').classes('nicegui-layout') as self.layout:
@@ -75,7 +76,6 @@ def __init__(self, page: page, *, request: Optional[Request]) -> None:
self._head_html = ''
self._body_html = ''
- self.page = page
self.storage = ObservableDict()
self.connect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
@@ -123,7 +123,11 @@ def build_response(self, request: Request, status_code: int = 200) -> Response:
elements = json.dumps({
id: element._to_dict() for id, element in self.elements.items() # pylint: disable=protected-access
})
- socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, 'client_id': self.id}
+ socket_io_js_query_params = {
+ **core.app.config.socket_io_js_query_params,
+ 'client_id': self.id,
+ 'next_message_id': self.outbox.next_message_id,
+ }
vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values())
return templates.TemplateResponse(
request=request,
@@ -227,8 +231,10 @@ def on_disconnect(self, handler: Union[Callable[..., Any], Awaitable]) -> None:
"""Add a callback to be invoked when the client disconnects."""
self.disconnect_handlers.append(handler)
- def handle_handshake(self) -> None:
+ def handle_handshake(self, next_message_id: Optional[int]) -> None:
"""Cancel pending disconnect task and invoke connect handlers."""
+ if next_message_id is not None:
+ self.outbox.try_rewind(next_message_id)
if self._disconnect_task:
self._disconnect_task.cancel()
self._disconnect_task = None
@@ -241,11 +247,7 @@ def handle_handshake(self) -> None:
def handle_disconnect(self) -> None:
"""Wait for the browser to reconnect; invoke disconnect handlers if it doesn't."""
async def handle_disconnect() -> None:
- if self.page.reconnect_timeout is not None:
- delay = self.page.reconnect_timeout
- else:
- delay = core.app.config.reconnect_timeout # pylint: disable=protected-access
- await asyncio.sleep(delay)
+ await asyncio.sleep(self.page.resolve_reconnect_timeout())
for t in self.disconnect_handlers:
self.safe_invoke(t)
for t in core.app._disconnect_handlers: # pylint: disable=protected-access
diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py
index c08115465..893b4f81b 100644
--- a/nicegui/nicegui.py
+++ b/nicegui/nicegui.py
@@ -3,7 +3,7 @@
import urllib.parse
from contextlib import asynccontextmanager
from pathlib import Path
-from typing import Dict
+from typing import Any, Dict
import socketio
from fastapi import HTTPException, Request
@@ -163,7 +163,7 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp
@sio.on('handshake')
-async def _on_handshake(sid: str, data: Dict[str, str]) -> bool:
+async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool:
client = Client.instances.get(data['client_id'])
if not client:
return False
@@ -175,7 +175,7 @@ async def _on_handshake(sid: str, data: Dict[str, str]) -> bool:
else:
client.environ = sio.get_environ(sid)
await sio.enter_room(sid, client.id)
- client.handle_handshake()
+ client.handle_handshake(data.get('next_message_id'))
return True
@@ -203,3 +203,11 @@ def _on_javascript_response(_: str, msg: Dict) -> None:
if not client:
return
client.handle_javascript_response(msg)
+
+
+@sio.on('ack')
+def _on_ack(_: str, msg: Dict) -> None:
+ client = Client.instances.get(msg['client_id'])
+ if not client:
+ return
+ client.outbox.prune_history(msg['next_message_id'])
diff --git a/nicegui/outbox.py b/nicegui/outbox.py
index b65a759bf..18d19e5b1 100644
--- a/nicegui/outbox.py
+++ b/nicegui/outbox.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
+import time
from collections import deque
from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple
@@ -10,10 +11,16 @@
from .client import Client
from .element import Element
-ClientId = str
ElementId = int
+
+ClientId = str
MessageType = str
-Message = Tuple[ClientId, MessageType, Any]
+Payload = Any
+Message = Tuple[ClientId, MessageType, Payload]
+
+MessageId = int
+MessageTime = float
+HistoryEntry = Tuple[MessageId, MessageTime, Message]
class Outbox:
@@ -22,8 +29,12 @@ def __init__(self, client: Client) -> None:
self.client = client
self.updates: Dict[ElementId, Optional[Element]] = {}
self.messages: Deque[Message] = deque()
+ self.message_history: Deque[HistoryEntry] = deque()
+ self.next_message_id: int = 0
+
self._should_stop = False
self._enqueue_event: Optional[asyncio.Event] = None
+
if core.app.is_started:
background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
else:
@@ -46,7 +57,7 @@ def enqueue_delete(self, element: Element) -> None:
self.updates[element.id] = None
self._set_enqueue_event()
- def enqueue_message(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
+ def enqueue_message(self, message_type: MessageType, data: Payload, target_id: ClientId) -> None:
"""Enqueue a message for the given client."""
self.client.check_existence()
self.messages.append((target_id, message_type, data))
@@ -77,12 +88,12 @@ async def loop(self) -> None:
element_id: None if element is None else element._to_dict() # pylint: disable=protected-access
for element_id, element in self.updates.items()
}
- coros.append(self._emit('update', data, self.client.id))
+ coros.append(self._emit((self.client.id, 'update', data)))
self.updates.clear()
if self.messages:
- for target_id, message_type, data in self.messages:
- coros.append(self._emit(message_type, data, target_id))
+ for message in self.messages:
+ coros.append(self._emit(message))
self.messages.clear()
for coro in coros:
@@ -95,10 +106,47 @@ async def loop(self) -> None:
core.app.handle_exception(e)
await asyncio.sleep(0.1)
- async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
- await core.sio.emit(message_type, data, room=target_id)
- if core.air is not None and core.air.is_air_target(target_id):
- await core.air.emit(message_type, data, room=target_id)
+ async def _emit(self, message: Message) -> None:
+ client_id, message_type, data = message
+ data['_id'] = self.next_message_id
+
+ await core.sio.emit(message_type, data, room=client_id)
+ if core.air is not None and core.air.is_air_target(client_id):
+ await core.air.emit(message_type, data, room=client_id)
+
+ if not self.client.shared:
+ self.message_history.append((self.next_message_id, time.time(), message))
+ max_age = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout()
+ while self.message_history and self.message_history[0][1] < time.time() - max_age:
+ self.message_history.popleft()
+ while len(self.message_history) > core.app.config.message_history_length:
+ self.message_history.popleft()
+
+ self.next_message_id += 1
+
+ def try_rewind(self, target_message_id: MessageId) -> None:
+ """Rewind to the given message ID and discard all messages before it."""
+ # nothing to do, the next message ID is already the target message ID
+ if self.next_message_id == target_message_id:
+ return
+
+ # rewind to the target message ID
+ while self.message_history:
+ self.next_message_id, _, message = self.message_history.pop()
+ self.messages.appendleft(message)
+ if self.next_message_id == target_message_id:
+ self.message_history.clear()
+ self._set_enqueue_event()
+ return
+
+ # target message ID not found, reload the page
+ if not self.client.shared:
+ self.client.run_javascript('window.location.reload()')
+
+ def prune_history(self, next_message_id: MessageId) -> None:
+ """Prune the message history up to the given message ID."""
+ while self.message_history and self.message_history[0][0] < next_message_id:
+ self.message_history.popleft()
def stop(self) -> None:
"""Stop the outbox loop."""
diff --git a/nicegui/page.py b/nicegui/page.py
index f6493ba80..c4f46d249 100644
--- a/nicegui/page.py
+++ b/nicegui/page.py
@@ -56,7 +56,7 @@ def __init__(self,
:param dark: whether to use Quasar's dark mode (defaults to `dark` argument of `run` command)
:param language: language of the page (defaults to `language` argument of `run` command)
:param response_timeout: maximum time for the decorated function to build the page (default: 3.0 seconds)
- :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 0.0 seconds)
+ :param reconnect_timeout: maximum time the server waits for the browser to reconnect (defaults to `reconnect_timeout` argument of `run` command))
:param api_router: APIRouter instance to use, can be left `None` to use the default
:param kwargs: additional keyword arguments passed to FastAPI's @app.get method
"""
@@ -94,6 +94,10 @@ def resolve_language(self) -> Optional[str]:
"""Return the language of the page."""
return self.language if self.language is not ... else core.app.config.language
+ def resolve_reconnect_timeout(self) -> float:
+ """Return the reconnect_timeout of the page."""
+ return self.reconnect_timeout if self.reconnect_timeout is not None else core.app.config.reconnect_timeout
+
def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
core.app.remove_route(self.path) # NOTE make sure only the latest route definition is used
parameters_of_decorated_func = list(inspect.signature(func).parameters.keys())
diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js
index bc92ef26f..7c86d490c 100644
--- a/nicegui/static/nicegui.js
+++ b/nicegui/static/nicegui.js
@@ -269,6 +269,15 @@ function download(src, filename, mediaType, prefix) {
}
}
+function ack() {
+ if (window.ackedMessageId >= window.nextMessageId) return;
+ window.socket.emit("ack", {
+ client_id: window.clientId,
+ next_message_id: window.nextMessageId,
+ });
+ window.ackedMessageId = window.nextMessageId;
+}
+
async function loadDependencies(element, prefix, version) {
if (element.component) {
const { name, key, tag } = element.component;
@@ -310,6 +319,7 @@ window.onbeforeunload = function () {
function createApp(elements, options) {
replaceUndefinedAttributes(elements, 0);
+ setInterval(() => ack(), 3000);
return (app = Vue.createApp({
data() {
return {
@@ -324,6 +334,8 @@ function createApp(elements, options) {
window.clientId = options.query.client_id;
const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host;
window.path_prefix = options.prefix;
+ window.nextMessageId = options.query.next_message_id;
+ window.ackedMessageId = -1;
window.socket = io(url, {
path: `${options.prefix}/_nicegui_ws/socket.io`,
query: options.query,
@@ -337,6 +349,7 @@ function createApp(elements, options) {
client_id: window.clientId,
tab_id: TAB_ID,
old_tab_id: OLD_TAB_ID,
+ next_message_id: window.nextMessageId,
};
window.socket.emit("handshake", args, (ok) => {
if (!ok) {
@@ -375,7 +388,7 @@ function createApp(elements, options) {
replaceUndefinedAttributes(this.elements, id);
}
},
- run_javascript: (msg) => runJavascript(msg["code"], msg["request_id"]),
+ run_javascript: (msg) => runJavascript(msg.code, msg.request_id),
open: (msg) => {
const url = msg.path.startsWith("/") ? options.prefix + msg.path : msg.path;
const target = msg.new_tab ? "_blank" : "_self";
@@ -388,6 +401,12 @@ function createApp(elements, options) {
let isProcessingSocketMessage = false;
for (const [event, handler] of Object.entries(messageHandlers)) {
window.socket.on(event, async (...args) => {
+ if (args.length > 0 && args[0]._id !== undefined) {
+ const message_id = args[0]._id;
+ if (message_id < window.nextMessageId) return;
+ window.nextMessageId = message_id + 1;
+ delete args[0]._id;
+ }
socketMessageQueue.push(() => handler(...args));
if (!isProcessingSocketMessage) {
while (socketMessageQueue.length > 0) {
diff --git a/nicegui/testing/general_fixtures.py b/nicegui/testing/general_fixtures.py
index a7205c8fc..f03f96c41 100644
--- a/nicegui/testing/general_fixtures.py
+++ b/nicegui/testing/general_fixtures.py
@@ -91,6 +91,7 @@ def prepare_simulation(request: pytest.FixtureRequest) -> None:
language='en-US',
binding_refresh_interval=0.1,
reconnect_timeout=3.0,
+ message_history_length=1000,
tailwind=True,
prod_js=True,
show_welcome_message=False,
diff --git a/nicegui/ui_run.py b/nicegui/ui_run.py
index 31d9d3fc6..708fd87ba 100644
--- a/nicegui/ui_run.py
+++ b/nicegui/ui_run.py
@@ -53,6 +53,7 @@ def run(*,
language: Language = 'en-US',
binding_refresh_interval: float = 0.1,
reconnect_timeout: float = 3.0,
+ message_history_length: int = 1000,
fastapi_docs: Union[bool, DocsConfig] = False,
show: bool = True,
on_air: Optional[Union[str, Literal[True]]] = None,
@@ -86,6 +87,7 @@ def run(*,
:param language: language for Quasar elements (default: `'en-US'`)
:param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds)
+ :param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable)
:param fastapi_docs: enable FastAPI's automatic documentation with Swagger UI, ReDoc, and OpenAPI JSON (bool or dictionary as described `here`_, default: `False`)
:param show: automatically open the UI in a browser tab (default: `True`)
:param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled)
@@ -114,6 +116,7 @@ def run(*,
language=language,
binding_refresh_interval=binding_refresh_interval,
reconnect_timeout=reconnect_timeout,
+ message_history_length=message_history_length,
tailwind=tailwind,
prod_js=prod_js,
show_welcome_message=show_welcome_message,
diff --git a/nicegui/ui_run_with.py b/nicegui/ui_run_with.py
index 8d69e2bba..a7a4e8640 100644
--- a/nicegui/ui_run_with.py
+++ b/nicegui/ui_run_with.py
@@ -19,6 +19,7 @@ def run_with(
language: Language = 'en-US',
binding_refresh_interval: float = 0.1,
reconnect_timeout: float = 3.0,
+ message_history_length: int = 1000,
mount_path: str = '/',
on_air: Optional[Union[str, Literal[True]]] = None,
tailwind: bool = True,
@@ -36,6 +37,7 @@ def run_with(
:param language: language for Quasar elements (default: `'en-US'`)
:param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds)
+ :param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable)
:param mount_path: mount NiceGUI at this path (default: `'/'`)
:param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled)
:param tailwind: whether to use Tailwind CSS (experimental, default: `True`)
@@ -52,6 +54,7 @@ def run_with(
language=language,
binding_refresh_interval=binding_refresh_interval,
reconnect_timeout=reconnect_timeout,
+ message_history_length=message_history_length,
tailwind=tailwind,
prod_js=prod_js,
show_welcome_message=show_welcome_message,