diff --git a/src/firefly/controller.py b/src/firefly/controller.py index 47c7ef12..24167193 100644 --- a/src/firefly/controller.py +++ b/src/firefly/controller.py @@ -11,7 +11,7 @@ from ophydregistry import Registry from qasync import asyncSlot from qtpy import QtCore, QtWidgets -from qtpy.QtCore import Signal +from qtpy.QtCore import Signal, Slot from qtpy.QtGui import QIcon, QKeySequence from qtpy.QtWidgets import QAction, QErrorMessage @@ -319,9 +319,6 @@ def setup_window_actions(self): icon=qta.icon("mdi.sine-wave"), ) - def update_queue_controls(self, status): - print(status) - @asyncSlot(QAction) async def finalize_new_window(self, action): """Slot for providing new windows for after a new window is created.""" @@ -329,7 +326,9 @@ async def finalize_new_window(self, action): self.queue_status_changed.connect(action.window.update_queue_status) self.queue_status_changed.connect(action.window.update_queue_controls) if getattr(self, "_queue_client", None) is not None: - self._queue_client.check_queue_status(force=True) + status = await self._queue_client.queue_status() + action.window.update_queue_status(status) + action.window.update_queue_controls(status) action.display.queue_item_submitted.connect(self.add_queue_item) # Send the current devices to the window await action.window.update_devices(self.registry) @@ -573,9 +572,7 @@ def prepare_queue_client(self, client=None, api=None): self.actions.queue_controls["halt"].triggered.connect(client.halt_runengine) self.actions.queue_controls["abort"].triggered.connect(client.abort_runengine) self.actions.queue_controls["stop_queue"].triggered.connect(client.stop_queue) - self.check_queue_status_action.triggered.connect( - partial(client.check_queue_status, True) - ) + self.check_queue_status_action.triggered.connect(partial(client.update, True)) # Connect signals/slots for queueserver state changes client.status_changed.connect(self.queue_status_changed) client.in_use_changed.connect(self.queue_in_use_changed) @@ -610,6 +607,7 @@ def start(self): def update_devices_allowed(self, devices): pass + @Slot(str) def enable_queue_controls(self, re_state): """Enable/disable the navbar buttons that control the queue. diff --git a/src/firefly/kafka_client.py b/src/firefly/kafka_client.py index 0d15c3b1..ea60e856 100644 --- a/src/firefly/kafka_client.py +++ b/src/firefly/kafka_client.py @@ -50,7 +50,7 @@ async def consumer_loop(self): async for record in consumer: try: doc_type, doc = record.value - print(f"Received kafka record. {doc_type=}") + log.debug(f"Received kafka record. {doc_type=}") self._process_document(doc_type, doc) except Exception as ex: log.exception(ex) @@ -86,7 +86,7 @@ def _process_document(self, doc_type, doc): except KeyError: log.warning("fUnknown descriptor UID {descriptor_uid}") return - print(f"Emitting run updated: {run_uid=}") + log.info(f"Emitting run updated: {run_uid=}") self.run_updated.emit(run_uid) elif doc_type == "stop": run_uid = doc["run_start"] diff --git a/src/firefly/main_window.py b/src/firefly/main_window.py index 82b76da4..af89222b 100644 --- a/src/firefly/main_window.py +++ b/src/firefly/main_window.py @@ -8,6 +8,7 @@ from pydm.main_window import PyDMMainWindow from qtpy import QtGui, QtWidgets +from firefly.queue_client import is_in_use from haven import load_config log = logging.getLogger(__name__) @@ -111,10 +112,11 @@ def update_queue_controls(self, new_status): def update_queue_status(self, status): """Update the queue status labels.""" - self.ui.environment_label.setText(status["worker_environment_state"]) - new_length = status["items_in_queue"] + worker_state = status.get("worker_environment_state", "—") + self.ui.environment_label.setText(worker_state) + new_length = status.get("items_in_queue", "—") self.ui.queue_length_label.setText(f"({new_length})") - self.ui.re_label.setText(status["re_state"]) + self.ui.re_label.setText(status.get("re_state", "—")) # Notify the display of the new status display = self.display_widget() display.update_queue_status(status) @@ -385,7 +387,7 @@ def setup_menu_actions(self, actions): def update_queue_controls(self, new_status): """Update the queue controls to match the state of the queueserver.""" super().update_queue_controls(new_status) - self.ui.navbar.setVisible(bool(new_status["in_use"])) + self.ui.navbar.setVisible(is_in_use(new_status)) # ----------------------------------------------------------------------------- diff --git a/src/firefly/queue_button.py b/src/firefly/queue_button.py index ff39b16e..492dc702 100644 --- a/src/firefly/queue_button.py +++ b/src/firefly/queue_button.py @@ -21,7 +21,7 @@ def __init__(self, *args, **kwargs): self.setDisabled(True) def update_queue_style(self, status: dict): - if status["worker_environment_exists"]: + if status.get("worker_environment_exists", False): self.setEnabled(True) else: # Should be disabled because the queue is closed diff --git a/src/firefly/queue_client.py b/src/firefly/queue_client.py index 829155a9..666dcdf8 100644 --- a/src/firefly/queue_client.py +++ b/src/firefly/queue_client.py @@ -1,6 +1,4 @@ import logging -import time -import warnings from typing import Mapping, Optional from bluesky_queueserver_api import comm_base @@ -14,6 +12,18 @@ log = logging.getLogger() +def is_in_use(status): + # Add a new key for whether the queue is busy (length > 0 or running) + has_queue = status.get("items_in_queue", 0) > 0 + is_running = status.get("manager_state") in [ + "paused", + "starting_queue", + "executing_queue", + "executing_task", + ] + return has_queue or is_running + + def queueserver_api(): try: config = load_config()["queueserver"] @@ -25,13 +35,73 @@ def queueserver_api(): return api +def queue_status(status_mapping: Mapping[str, str] = {}): + """A generative coroutine that tracks the status of the queueserver. + + Parameters + ========== + status_mapping + Maps the queuestatus parameters that are sent onto the update + signal names to yield + + Yields + ====== + to_update + Dictionary with signals to emit as keys, and tuples of ``*args`` + to emit as values. So ``{self.status_changed: ("spam", + "eggs")}`` results in ``self.status_changed.emit("spam", + "eggs")``. + + Sends + ===== + status + The most recent updated status from the queueserver. + + """ + to_update = {} + last_status = {} + was_in_use = None + while True: + status = yield to_update + to_update = {} + if status != last_status: + to_update["status_changed"] = (status,) + # Check individual status items to see if they've changed + status_diff = { + key: val + for key, val in status.items() + if key not in last_status or val != last_status[key] + } + log.debug(f"Received updated queue status: {status_diff}") + updated_params = { + status_mapping[key]: (val,) + for key, val in status_diff.items() + if key in status_mapping + } + to_update.update(updated_params) + # Decide if the queue is being used + now_in_use = is_in_use(status) + if now_in_use != was_in_use: + to_update["in_use_changed"] = (now_in_use,) + was_in_use = now_in_use + # Stash this status for the next time around + last_status = status + + class QueueClient(QObject): api: REManagerAPI _last_queue_status: Optional[dict] = None last_update: float = -1 - timeout: float = 0.2 - min_timeout: float = 0.2 + timeout: float = 0.5 timer: QTimer + parameter_mapping: Mapping[str, str] = { + "queue_autostart_enabled": "autostart_changed", + "worker_environment_exists": "environment_opened", + "worker_environment_state": "environment_state_changed", + "manager_state": "manager_state_changed", + "re_state": "re_state_changed", + "devices_allowed_uid": "devices_allowed_changed", + } # Signals responding to queue changes status_changed = Signal(dict) @@ -45,9 +115,6 @@ class QueueClient(QObject): re_state_changed = Signal(str) # New state devices_changed = Signal(dict) - def start(self): - self.timer.start(int(self.timeout * 1000)) - def __init__(self, *args, api, **kwargs): self.api = api super().__init__(*args, **kwargs) @@ -55,6 +122,13 @@ def __init__(self, *args, api, **kwargs): # Setup timer for updating the queue self.timer = QTimer() self.timer.timeout.connect(self.update) + # Create the generator to keep track of the queue states + self.status = queue_status(status_mapping=self.parameter_mapping) + next(self.status) # Prime the generator + + def start(self): + # Start the time so that it triggers status updates + self.timer.start(int(self.timeout * 1000)) @asyncSlot(bool) async def open_environment(self, to_open): @@ -67,30 +141,6 @@ async def open_environment(self, to_open): else: log.error(f"Failed to open/close environment: {result['msg']}") - @asyncSlot() - async def update(self): - now = time.monotonic() - if now >= self.last_update + self.timeout: - if not load_config()["beamline"]["hardware_is_present"]: - log.warning("Beamline not connected, skipping queue client update.") - self.timeout = 60 # Just update every 1 minute - self.last_update = now - return - log.debug("Updating queue client.") - try: - await self._check_queue_status() - except comm_base.RequestTimeoutError as e: - # If we can't reach the server, wait for a minute and retry - self.timeout = min(60, self.timeout * 2) - log.warn(str(e)) - warnings.warn(str(e)) - log.info(f"Retrying in {self.timeout} seconds.") - else: - # Update succeeded, so wait for a bit - self.timeout = self.min_timeout - finally: - self.last_update = now - @asyncSlot(bool) async def request_pause(self, *args, defer: bool = True, **kwargs): """Ask the queueserver run engine to pause. @@ -113,10 +163,9 @@ async def add_queue_item(self, item): self.check_result(result) except (RuntimeError, comm_base.RequestFailedError) as ex: # Request failed, so force a UI update - await self.check_queue_status(force=True) raise - else: - await self.check_queue_status(force=False) + finally: + await self.update() @asyncSlot(bool) async def toggle_autostart(self, enable: bool): @@ -124,11 +173,8 @@ async def toggle_autostart(self, enable: bool): try: result = await self.api.queue_autostart(enable) self.check_result(result, task="toggle auto-start") - except (RuntimeError, comm_base.RequestFailedError): - # Request failed, so force a UI update - await self.check_queue_status(force=True) - else: - await self.check_queue_status(force=False) + finally: + await self.update() @asyncSlot(bool) async def stop_queue(self, stop: bool): @@ -142,11 +188,8 @@ async def stop_queue(self, stop: bool): try: result = await api_call self.check_result(result, task="toggle stop queue") - except (RuntimeError, comm_base.RequestFailedError): - # Request failed, so force a UI update - await self.check_queue_status(force=True) - else: - await self.check_queue_status(force=False) + finally: + await self.update() @asyncSlot() async def start_queue(self): @@ -188,88 +231,50 @@ def check_result(self, result: Mapping, task: str = "control queue server"): raise RuntimeError(msg) @asyncSlot() - async def check_queue_status(self, force=False, *args, **kwargs): + async def update(self): """Get an update queue status from queue server and notify slots. - Parameters - ========== - force - If false (default), the ``queue_status_changed`` signal will - be emitted only if the queue status has changed since last - check. - *args, *kwargs - Unused arguments from qt widgets - Emits ===== self.status_changed With the updated queue status. """ - try: - await self._check_queue_status(force=force) - except comm_base.RequestTimeoutError as e: - log.warning(str(e)) - warnings.warn(str(e)) - - async def _check_queue_status(self, force: bool = False): - """Get an update queue status from queue server and notify slots. + new_status = await self.queue_status() + signals_changed = self.status.send(new_status) + # Check individual components of the status if they've changed + if signals_changed != {}: + log.debug(f"Emitting changed signals: {signals_changed}") + for signal_name, args in signals_changed.items(): + if hasattr(self, signal_name): + signal = getattr(self, signal_name) + signal.emit(*args) + # Check for new available devices + if "devices_allowed_changed" in signals_changed: + await self.update_devices() - Similar to ``check_queue_status`` but without the exception - handling. + async def queue_status(self) -> dict: + """Get the latest queue status from the queue server. Parameters ========== - force - If false (default), the ``queue_status_changed`` signal will - be emitted only if the queue status has changed since last - check. - - Emits - ===== - self.status_changed - With the updated queue status. + status + The response from the queueserver regarding its status. If + the queueserver is not reachable, then + ``status['manager_state']`` will be ``"disconnected"``. """ - new_status = await self.api.status() - # Add a new key for whether the queue is busy (length > 0 or running) - has_queue = new_status["items_in_queue"] > 0 - is_running = new_status["manager_state"] in [ - "paused", - "starting_queue", - "executing_queue", - "executing_task", - ] - new_status.setdefault("in_use", has_queue or is_running) - # Check individual components of the status if they've changed - signals_to_check = [ - # (status key, signal to emit) - ("worker_environment_exists", self.environment_opened), - ("worker_environment_state", self.environment_state_changed), - ("manager_state", self.manager_state_changed), - ("re_state", self.re_state_changed), - ("items_in_queue", self.length_changed), - ("in_use", self.in_use_changed), - ("queue_stop_pending", self.queue_stop_changed), - ("queue_autostart_enabled", self.autostart_changed), - ] - if force: - log.debug(f"Forcing queue server status update: {new_status}") - for key, signal in signals_to_check: - is_new = key not in self._last_queue_status - has_changed = new_status[key] != self._last_queue_status.get(key) - if is_new or has_changed or force: - signal.emit(new_status[key]) - # Check for new available devices - if new_status["devices_allowed_uid"] != self._last_queue_status.get( - "devices_allowed_uid" - ): - await self.update_devices() - # check the whole status to see if it's changed - has_changed = new_status != self._last_queue_status - if has_changed or force: - self.status_changed.emit(new_status) - self._last_queue_status = new_status + + try: + status = await self.api.status() + except comm_base.RequestTimeoutError as e: + log.warning("Could not reach queueserver ZMQ.") + status = { + "manager_state": "N.C.", + "worker_environment_state": "N.C.", + "re_state": "N.C.", + } + return status async def update_devices(self): "Emit the latest dict of available devices." diff --git a/src/firefly/tests/test_main_window.py b/src/firefly/tests/test_main_window.py index 304b0635..96615656 100644 --- a/src/firefly/tests/test_main_window.py +++ b/src/firefly/tests/test_main_window.py @@ -42,10 +42,10 @@ def test_navbar_autohide(qtbot, actions): window.show() navbar = window.ui.navbar # Pretend the queue has some things in it - window.update_queue_controls({"in_use": True}) + window.update_queue_controls({"items_in_queue": 5}) assert navbar.isVisible() # Make the queue be empty - window.update_queue_controls({"in_use": False}) + window.update_queue_controls({"items_in_queue": 0}) assert not navbar.isVisible() diff --git a/src/firefly/tests/test_queue_client.py b/src/firefly/tests/test_queue_client.py index 1c09f971..a3f4128a 100644 --- a/src/firefly/tests/test_queue_client.py +++ b/src/firefly/tests/test_queue_client.py @@ -1,9 +1,9 @@ import datetime as dt import time +from collections import ChainMap from unittest.mock import AsyncMock import pytest -from pytestqt.exceptions import TimeoutError from qtpy.QtCore import QTimer from qtpy.QtWidgets import QAction @@ -246,6 +246,13 @@ def client(): yield client +@pytest.fixture() +def status(): + status_ = queue_client.queue_status(queue_client.QueueClient.parameter_mapping) + next(status_) + return status_ + + def test_client_timer(client): assert isinstance(client.timer, QTimer) @@ -293,12 +300,8 @@ async def test_run_plan(client, qtbot): api.item_add.return_value = {"success": True, "qsize": 2} new_status = qs_status.copy() new_status["items_in_queue"] = 2 - api.status.return_value = new_status # Send a plan - with qtbot.waitSignal( - client.length_changed, timeout=1000, check_params_cb=lambda l: l == 2 - ): - await client.add_queue_item({}) + await client.add_queue_item({}) # Check if the API sent it api.item_add.assert_called_once_with(item={}) @@ -336,26 +339,23 @@ async def test_stop_queue(client, qtbot): @pytest.mark.asyncio -async def test_check_queue_status(client, qtbot): - # Check that the queue length is changed - signals = [ - client.status_changed, - client.environment_opened, - client.environment_state_changed, - client.re_state_changed, - client.autostart_changed, - client.manager_state_changed, - client.in_use_changed, - ] - with qtbot.waitSignals(signals): - await client.check_queue_status() - return - # Check that it isn't emitted a second time - with pytest.raises(TimeoutError): - with qtbot.waitSignals(signals, timeout=10): - client.check_queue_status() +async def test_send_status(status, qtbot): + to_update = status.send(qs_status) + assert to_update == { + "status_changed": (qs_status,), + "environment_opened": (False,), + "environment_state_changed": ("closed",), + "re_state_changed": (None,), + "autostart_changed": (False,), + "manager_state_changed": ("idle",), + "in_use_changed": (False,), + "devices_allowed_changed": ("a5ddff29-917c-462e-ba66-399777d2442a",), + } + # Check that it isn't updated a second time + to_update = status.send(qs_status) + assert to_update == {} # Now check a non-empty length queue - new_status = qs_status.copy() + new_status = ChainMap({}, qs_status) new_status.update( { "worker_environment_exists": True, @@ -369,9 +369,14 @@ async def test_check_queue_status(client, qtbot): # "plan_queue_uid": "f682e6fa-983c-4bd8-b643-b3baec2ec764", } ) - client.api.status.return_value = new_status - with qtbot.waitSignals(signals): - client.check_queue_status() + to_update = status.send(new_status) + assert to_update == { + "environment_opened": (True,), + "environment_state_changed": ("initializing",), + "re_state_changed": ("idle",), + "manager_state_changed": ("creating_environment",), + "status_changed": (new_status,), + } @pytest.mark.asyncio @@ -410,7 +415,7 @@ async def test_devices_available(client, qtbot): @pytest.mark.asyncio -async def test_update_status(client, time_machine, monkeypatch): +async def test_update(client, time_machine, monkeypatch): api = client.api monkeypatch.setattr( queue_client, "load_config", lambda: {"beamline": {"hardware_is_present": True}} diff --git a/src/queueserver/queueserver_startup.py b/src/queueserver/queueserver_startup.py index 3e673a37..ef6e912a 100755 --- a/src/queueserver/queueserver_startup.py +++ b/src/queueserver/queueserver_startup.py @@ -3,16 +3,9 @@ import bluesky.preprocessors as bpp # noqa: F401 import databroker # noqa: F401 -from bluesky.plan_stubs import ( # noqa: F401 - abs_set, - mv, - mvr, - null, - pause, - rel_set, - sleep, - stop, -) +from bluesky.plan_stubs import abs_set # noqa: F401 +from bluesky.plan_stubs import mv as _mv # noqa: F401 +from bluesky.plan_stubs import mvr, null, pause, rel_set, sleep, stop # noqa: F401 from bluesky.plans import ( # noqa: F401 count, grid_scan, @@ -59,3 +52,43 @@ name = sanitize_name(cpt.name) # Add the device as a variable in module's globals globals().setdefault(name, cpt) + + +# Workaround for https://github.com/bluesky/bluesky-queueserver/issues/310 +from collections.abc import Hashable + + +def mv( + *args, + group: Hashable | None = None, + **kwargs, +): + """ + Move one or more devices to a setpoint. Wait for all to complete. + + If more than one device is specified, the movements are done in parallel. + + Parameters + ---------- + args : + device1, value1, device2, value2, ... + group : string, optional + Used to mark these as a unit to be waited on. + kwargs : + passed to obj.set() + + Yields + ------ + msg : Msg + + Returns + ------- + statuses : + Tuple of n statuses, one for each move operation + + See Also + -------- + :func:`bluesky.plan_stubs.abs_set` + :func:`bluesky.plan_stubs.mvr` + """ + yield from _mv(*args, group=group, **kwargs)