Skip to content

Commit

Permalink
Merge pull request #321 from spc-group/queueserver
Browse files Browse the repository at this point in the history
Firefly-Queueserver improvements
  • Loading branch information
canismarko authored Dec 27, 2024
2 parents d7c5456 + efab527 commit 3d46fc4
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 168 deletions.
14 changes: 6 additions & 8 deletions src/firefly/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ophydregistry import Registry
from qasync import asyncSlot
from qtpy import QtCore, QtWidgets
from qtpy.QtCore import Signal
from qtpy.QtCore import Signal, Slot
from qtpy.QtGui import QIcon, QKeySequence
from qtpy.QtWidgets import QAction, QErrorMessage

Expand Down Expand Up @@ -319,17 +319,16 @@ def setup_window_actions(self):
icon=qta.icon("mdi.sine-wave"),
)

def update_queue_controls(self, status):
print(status)

@asyncSlot(QAction)
async def finalize_new_window(self, action):
"""Slot for providing new windows for after a new window is created."""
action.window.setup_menu_actions(actions=self.actions)
self.queue_status_changed.connect(action.window.update_queue_status)
self.queue_status_changed.connect(action.window.update_queue_controls)
if getattr(self, "_queue_client", None) is not None:
self._queue_client.check_queue_status(force=True)
status = await self._queue_client.queue_status()
action.window.update_queue_status(status)
action.window.update_queue_controls(status)
action.display.queue_item_submitted.connect(self.add_queue_item)
# Send the current devices to the window
await action.window.update_devices(self.registry)
Expand Down Expand Up @@ -573,9 +572,7 @@ def prepare_queue_client(self, client=None, api=None):
self.actions.queue_controls["halt"].triggered.connect(client.halt_runengine)
self.actions.queue_controls["abort"].triggered.connect(client.abort_runengine)
self.actions.queue_controls["stop_queue"].triggered.connect(client.stop_queue)
self.check_queue_status_action.triggered.connect(
partial(client.check_queue_status, True)
)
self.check_queue_status_action.triggered.connect(partial(client.update, True))
# Connect signals/slots for queueserver state changes
client.status_changed.connect(self.queue_status_changed)
client.in_use_changed.connect(self.queue_in_use_changed)
Expand Down Expand Up @@ -610,6 +607,7 @@ def start(self):
def update_devices_allowed(self, devices):
pass

@Slot(str)
def enable_queue_controls(self, re_state):
"""Enable/disable the navbar buttons that control the queue.
Expand Down
4 changes: 2 additions & 2 deletions src/firefly/kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def consumer_loop(self):
async for record in consumer:
try:
doc_type, doc = record.value
print(f"Received kafka record. {doc_type=}")
log.debug(f"Received kafka record. {doc_type=}")
self._process_document(doc_type, doc)
except Exception as ex:
log.exception(ex)
Expand Down Expand Up @@ -86,7 +86,7 @@ def _process_document(self, doc_type, doc):
except KeyError:
log.warning("fUnknown descriptor UID {descriptor_uid}")
return
print(f"Emitting run updated: {run_uid=}")
log.info(f"Emitting run updated: {run_uid=}")
self.run_updated.emit(run_uid)
elif doc_type == "stop":
run_uid = doc["run_start"]
Expand Down
10 changes: 6 additions & 4 deletions src/firefly/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydm.main_window import PyDMMainWindow
from qtpy import QtGui, QtWidgets

from firefly.queue_client import is_in_use
from haven import load_config

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -111,10 +112,11 @@ def update_queue_controls(self, new_status):

def update_queue_status(self, status):
"""Update the queue status labels."""
self.ui.environment_label.setText(status["worker_environment_state"])
new_length = status["items_in_queue"]
worker_state = status.get("worker_environment_state", "—")
self.ui.environment_label.setText(worker_state)
new_length = status.get("items_in_queue", "—")
self.ui.queue_length_label.setText(f"({new_length})")
self.ui.re_label.setText(status["re_state"])
self.ui.re_label.setText(status.get("re_state", "—"))
# Notify the display of the new status
display = self.display_widget()
display.update_queue_status(status)
Expand Down Expand Up @@ -385,7 +387,7 @@ def setup_menu_actions(self, actions):
def update_queue_controls(self, new_status):
"""Update the queue controls to match the state of the queueserver."""
super().update_queue_controls(new_status)
self.ui.navbar.setVisible(bool(new_status["in_use"]))
self.ui.navbar.setVisible(is_in_use(new_status))


# -----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/firefly/queue_button.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, *args, **kwargs):
self.setDisabled(True)

def update_queue_style(self, status: dict):
if status["worker_environment_exists"]:
if status.get("worker_environment_exists", False):
self.setEnabled(True)
else:
# Should be disabled because the queue is closed
Expand Down
Loading

0 comments on commit 3d46fc4

Please sign in to comment.