Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Firefly-Queueserver improvements #321

Merged
merged 11 commits into from
Dec 27, 2024
14 changes: 6 additions & 8 deletions src/firefly/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ophydregistry import Registry
from qasync import asyncSlot
from qtpy import QtCore, QtWidgets
from qtpy.QtCore import Signal
from qtpy.QtCore import Signal, Slot
from qtpy.QtGui import QIcon, QKeySequence
from qtpy.QtWidgets import QAction, QErrorMessage

Expand Down Expand Up @@ -319,17 +319,16 @@ def setup_window_actions(self):
icon=qta.icon("mdi.sine-wave"),
)

def update_queue_controls(self, status):
print(status)

@asyncSlot(QAction)
async def finalize_new_window(self, action):
"""Slot for providing new windows for after a new window is created."""
action.window.setup_menu_actions(actions=self.actions)
self.queue_status_changed.connect(action.window.update_queue_status)
self.queue_status_changed.connect(action.window.update_queue_controls)
if getattr(self, "_queue_client", None) is not None:
self._queue_client.check_queue_status(force=True)
status = await self._queue_client.queue_status()
action.window.update_queue_status(status)
action.window.update_queue_controls(status)
action.display.queue_item_submitted.connect(self.add_queue_item)
# Send the current devices to the window
await action.window.update_devices(self.registry)
Expand Down Expand Up @@ -573,9 +572,7 @@ def prepare_queue_client(self, client=None, api=None):
self.actions.queue_controls["halt"].triggered.connect(client.halt_runengine)
self.actions.queue_controls["abort"].triggered.connect(client.abort_runengine)
self.actions.queue_controls["stop_queue"].triggered.connect(client.stop_queue)
self.check_queue_status_action.triggered.connect(
partial(client.check_queue_status, True)
)
self.check_queue_status_action.triggered.connect(partial(client.update, True))
# Connect signals/slots for queueserver state changes
client.status_changed.connect(self.queue_status_changed)
client.in_use_changed.connect(self.queue_in_use_changed)
Expand Down Expand Up @@ -610,6 +607,7 @@ def start(self):
def update_devices_allowed(self, devices):
pass

@Slot(str)
def enable_queue_controls(self, re_state):
"""Enable/disable the navbar buttons that control the queue.

Expand Down
4 changes: 2 additions & 2 deletions src/firefly/kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def consumer_loop(self):
async for record in consumer:
try:
doc_type, doc = record.value
print(f"Received kafka record. {doc_type=}")
log.debug(f"Received kafka record. {doc_type=}")
self._process_document(doc_type, doc)
except Exception as ex:
log.exception(ex)
Expand Down Expand Up @@ -86,7 +86,7 @@ def _process_document(self, doc_type, doc):
except KeyError:
log.warning("fUnknown descriptor UID {descriptor_uid}")
return
print(f"Emitting run updated: {run_uid=}")
log.info(f"Emitting run updated: {run_uid=}")
self.run_updated.emit(run_uid)
elif doc_type == "stop":
run_uid = doc["run_start"]
Expand Down
11 changes: 7 additions & 4 deletions src/firefly/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from haven import load_config

from .queue_client import is_in_use
canismarko marked this conversation as resolved.
Show resolved Hide resolved

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -111,10 +113,11 @@ def update_queue_controls(self, new_status):

def update_queue_status(self, status):
"""Update the queue status labels."""
self.ui.environment_label.setText(status["worker_environment_state"])
new_length = status["items_in_queue"]
worker_state = status.get("worker_environment_state", "—")
self.ui.environment_label.setText(worker_state)
new_length = status.get("items_in_queue", "—")
self.ui.queue_length_label.setText(f"({new_length})")
self.ui.re_label.setText(status["re_state"])
self.ui.re_label.setText(status.get("re_state", "—"))
# Notify the display of the new status
display = self.display_widget()
display.update_queue_status(status)
Expand Down Expand Up @@ -385,7 +388,7 @@ def setup_menu_actions(self, actions):
def update_queue_controls(self, new_status):
"""Update the queue controls to match the state of the queueserver."""
super().update_queue_controls(new_status)
self.ui.navbar.setVisible(bool(new_status["in_use"]))
self.ui.navbar.setVisible(is_in_use(new_status))


# -----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/firefly/queue_button.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, *args, **kwargs):
self.setDisabled(True)

def update_queue_style(self, status: dict):
if status["worker_environment_exists"]:
if status.get("worker_environment_exists", False):
self.setEnabled(True)
else:
# Should be disabled because the queue is closed
Expand Down
Loading