diff --git a/src/conftest.py b/src/conftest.py
index 660a7284..804a72bb 100644
--- a/src/conftest.py
+++ b/src/conftest.py
@@ -280,12 +280,20 @@ async def filters(sim_registry):
),
},
metadata={
- "plan_name": "xafs_scan",
"start": {
"plan_name": "xafs_scan",
+ "esaf_id": "1337",
+ "proposal_id": "158839",
+ "beamline_id": "255-ID-Z",
+ "sample_name": "NMC-532",
+ "sample_formula": "LiNi0.5Mn0.3Co0.2O2",
+ "edge": "Ni-K",
"uid": "7d1daf1d-60c7-4aa7-a668-d1cd97e5335f",
"hints": {"dimensions": [[["energy_energy"], "primary"]]},
},
+ "stop": {
+ "exit_status": "success",
+ },
},
),
"9d33bf66-9701-4ee3-90f4-3be730bc226c": MapAdapter(
@@ -357,6 +365,7 @@ async def filters(sim_registry):
mapping = {
"255id_testing": MapAdapter(bluesky_mapping),
+ "255bm_testing": MapAdapter(bluesky_mapping),
}
tree = MapAdapter(mapping)
@@ -367,13 +376,12 @@ def tiled_client():
app = build_app(tree)
with Context.from_app(app) as context:
client = from_context(context)
- yield client["255id_testing"]
+ yield client
@pytest.fixture()
def catalog(tiled_client):
- cat = Catalog(client=tiled_client)
- # cat = mock.AsyncMock()
+ cat = Catalog(client=tiled_client["255id_testing"])
return cat
diff --git a/src/firefly/controller.py b/src/firefly/controller.py
index 24167193..899f87f3 100644
--- a/src/firefly/controller.py
+++ b/src/firefly/controller.py
@@ -15,7 +15,7 @@
from qtpy.QtGui import QIcon, QKeySequence
from qtpy.QtWidgets import QAction, QErrorMessage
-from haven import beamline, load_config
+from haven import beamline, load_config, tiled_client
from haven.exceptions import ComponentNotFound, InvalidConfiguration
from haven.utils import titleize
@@ -333,11 +333,18 @@ async def finalize_new_window(self, action):
# Send the current devices to the window
await action.window.update_devices(self.registry)
- def finalize_run_browser_window(self, action):
- """Connect up signals that are specific to the run browser window."""
+ @asyncSlot(QAction)
+ async def finalize_run_browser_window(self, action):
+ """Connect up run browser signals and load initial data."""
display = action.display
self.run_updated.connect(display.update_running_scan)
self.run_stopped.connect(display.update_running_scan)
+ # Set initial state for the run_browser
+ client = tiled_client(catalog=None)
+ config = load_config()["database"]["tiled"]
+ await display.setup_database(
+ tiled_client=client, catalog_name=config["default_catalog"]
+ )
def finalize_status_window(self, action):
"""Connect up signals that are specific to the voltmeters window."""
@@ -652,12 +659,6 @@ async def add_queue_item(self, item):
if getattr(self, "_queue_client", None) is not None:
await self._queue_client.add_queue_item(item)
- @QtCore.Slot()
- def show_sample_viewer_window(self):
- return self.show_window(
- FireflyMainWindow, ui_dir / "sample_viewer.ui", name="sample_viewer"
- )
-
@QtCore.Slot(bool)
def set_open_environment_action_state(self, is_open: bool):
"""Update the readback value for opening the queueserver environment."""
diff --git a/src/firefly/kafka_client.py b/src/firefly/kafka_client.py
index ea60e856..3e8440f0 100644
--- a/src/firefly/kafka_client.py
+++ b/src/firefly/kafka_client.py
@@ -1,6 +1,7 @@
import asyncio
import logging
import warnings
+from uuid import uuid4
import msgpack
from aiokafka import AIOKafkaConsumer
@@ -40,7 +41,7 @@ async def consumer_loop(self):
self.kafka_consumer = AIOKafkaConsumer(
config["queueserver"]["kafka_topic"],
bootstrap_servers="fedorov.xray.aps.anl.gov:9092",
- group_id="my-group",
+ group_id=str(uuid4()),
value_deserializer=msgpack.loads,
)
consumer = self.kafka_consumer
diff --git a/src/firefly/run_browser/client.py b/src/firefly/run_browser/client.py
index fb579258..32cac56c 100644
--- a/src/firefly/run_browser/client.py
+++ b/src/firefly/run_browser/client.py
@@ -1,65 +1,97 @@
+import asyncio
import datetime as dt
import logging
import warnings
from collections import OrderedDict
+from functools import partial
from typing import Mapping, Sequence
import numpy as np
import pandas as pd
+from qasync import asyncSlot
from tiled import queries
from haven import exceptions
-from haven.catalog import Catalog
+from haven.catalog import Catalog, run_in_executor
log = logging.getLogger(__name__)
class DatabaseWorker:
selected_runs: Sequence = []
+ catalog: Catalog = None
- def __init__(self, catalog=None, *args, **kwargs):
- if catalog is None:
- catalog = Catalog()
- self.catalog = catalog
+ def __init__(self, tiled_client, *args, **kwargs):
+ self.client = tiled_client
super().__init__(*args, **kwargs)
+ @asyncSlot(str)
+ async def change_catalog(self, catalog_name: str):
+ """Change the catalog being used for pulling data.
+
+ *catalog_name* should be an entry in *worker.tiled_client()*.
+ """
+
+ def get_catalog(name):
+ return Catalog(self.client[catalog_name])
+
+ loop = asyncio.get_running_loop()
+ self.catalog = await loop.run_in_executor(None, get_catalog, catalog_name)
+
+ @run_in_executor
+ def catalog_names(self):
+ return list(self.client.keys())
+
+ async def stream_names(self):
+ awaitables = [scan.stream_names() for scan in self.selected_runs]
+ all_streams = await asyncio.gather(*awaitables)
+ # Flatten the lists
+ streams = [stream for streams in all_streams for stream in streams]
+ return list(set(streams))
+
async def filtered_nodes(self, filters: Mapping):
case_sensitive = False
log.debug(f"Filtering nodes: {filters}")
- filter_params = [
- # (filter_name, query type, metadata key)
- ("user", queries.Contains, "start.proposal_users"),
- ("proposal", queries.Eq, "start.proposal_id"),
- ("esaf", queries.Eq, "start.esaf_id"),
- ("sample", queries.Contains, "start.sample_name"),
- ("exit_status", queries.Eq, "stop.exit_status"),
- ("plan", queries.Eq, "start.plan_name"),
- ("edge", queries.Contains, "start.edge"),
- ]
+ filter_params = {
+ # filter_name: (query type, metadata key)
+ "plan": (queries.Eq, "start.plan_name"),
+ "sample": (queries.Contains, "start.sample_name"),
+ "formula": (queries.Contains, "start.sample_formula"),
+ "edge": (queries.Contains, "start.edge"),
+ "exit_status": (queries.Eq, "stop.exit_status"),
+ "user": (queries.Contains, "start.proposal_users"),
+ "proposal": (queries.Eq, "start.proposal_id"),
+ "esaf": (queries.Eq, "start.esaf_id"),
+ "beamline": (queries.Eq, "start.beamline_id"),
+ "before": (partial(queries.Comparison, "le"), "end.time"),
+ "after": (partial(queries.Comparison, "ge"), "start.time"),
+ "full_text": (queries.FullText, ""),
+ "standards_only": (queries.Eq, "start.is_standard"),
+ }
# Apply filters
runs = self.catalog
- for filter_name, Query, md_name in filter_params:
- val = filters.get(filter_name, "")
- if val != "":
- runs = await runs.search(Query(md_name, val))
- full_text = filters.get("full_text", "")
- if full_text != "":
- runs = await runs.search(
- queries.FullText(full_text, case_sensitive=case_sensitive)
- )
+ for filter_name, filter_value in filters.items():
+ if filter_name not in filter_params:
+ continue
+ Query, md_name = filter_params[filter_name]
+ if Query is queries.FullText:
+ runs = await runs.search(Query(filter_value), case_sensitive=False)
+ else:
+ runs = await runs.search(Query(md_name, filter_value))
return runs
async def load_distinct_fields(self):
"""Get distinct metadata fields for filterable metadata."""
new_fields = {}
target_fields = [
- "sample_name",
- "proposal_users",
- "proposal_id",
- "esaf_id",
- "sample_name",
- "plan_name",
- "edge",
+ "start.plan_name",
+ "start.sample_name",
+ "start.sample_formula",
+ "start.edge",
+ "stop.exit_status",
+ "start.proposal_id",
+ "start.esaf_id",
+ "start.beamline_id",
]
# Get fields from the database
response = await self.catalog.distinct(*target_fields)
@@ -118,11 +150,13 @@ async def load_all_runs(self, filters: Mapping = {}):
all_runs.append(run_data)
return all_runs
- async def signal_names(self, hinted_only: bool = False):
+ async def signal_names(self, stream: str, *, hinted_only: bool = False):
"""Get a list of valid signal names (data columns) for selected runs.
Parameters
==========
+ stream
+ The Tiled stream name to fetch.
hinted_only
If true, only signals with the kind="hinted" parameter get
picked.
@@ -131,9 +165,9 @@ async def signal_names(self, hinted_only: bool = False):
xsignals, ysignals = [], []
for run in self.selected_runs:
if hinted_only:
- xsig, ysig = await run.hints()
+ xsig, ysig = await run.hints(stream=stream)
else:
- df = await run.data()
+ df = await run.data(stream=stream)
xsig = ysig = df.columns
xsignals.extend(xsig)
ysignals.extend(ysig)
@@ -160,32 +194,34 @@ async def load_selected_runs(self, uids):
self.selected_runs = runs
return runs
- async def images(self, signal):
+ async def images(self, signal: str, stream: str):
"""Load the selected runs as 2D or 3D images suitable for plotting."""
images = OrderedDict()
for idx, run in enumerate(self.selected_runs):
# Load datasets from the database
try:
- image = await run[signal]
+ image = await run.__getitem__(signal, stream=stream)
except KeyError as exc:
log.exception(exc)
else:
images[run.uid] = image
return images
- async def all_signals(self, hinted_only=False) -> dict:
+ async def all_signals(self, stream: str, *, hinted_only=False) -> dict:
"""Produce dataframes with all signals for each run.
The keys of the dictionary are the labels for each curve, and
the corresponding value is a pandas dataframe with the scan data.
"""
- xsignals, ysignals = await self.signal_names(hinted_only=hinted_only)
+ xsignals, ysignals = await self.signal_names(
+ hinted_only=hinted_only, stream=stream
+ )
# Build the dataframes
dfs = OrderedDict()
for run in self.selected_runs:
# Get data from the database
- df = await run.data(signals=xsignals + ysignals)
+ df = await run.data(signals=xsignals + ysignals, stream=stream)
dfs[run.uid] = df
return dfs
@@ -194,6 +230,8 @@ async def signals(
x_signal,
y_signal,
r_signal=None,
+ *,
+ stream: str,
use_log=False,
use_invert=False,
use_grad=False,
@@ -233,7 +271,7 @@ async def signals(
if uids is not None and run.uid not in uids:
break
# Get data from the database
- df = await run.data(signals=signals)
+ df = await run.data(signals=signals, stream=stream)
# Check for missing signals
missing_x = x_signal not in df.columns and df.index.name != x_signal
missing_y = y_signal not in df.columns
diff --git a/src/firefly/run_browser/display.py b/src/firefly/run_browser/display.py
index 039e026e..be2f9243 100644
--- a/src/firefly/run_browser/display.py
+++ b/src/firefly/run_browser/display.py
@@ -1,15 +1,20 @@
import asyncio
+import datetime as dt
import logging
from collections import Counter
from contextlib import contextmanager
-from functools import wraps
+from functools import partial, wraps
from typing import Mapping, Optional, Sequence
import qtawesome as qta
import yaml
+from ophyd import Device as ThreadedDevice
+from ophyd_async.core import Device
+from pydm import PyDMChannel
from qasync import asyncSlot
-from qtpy.QtCore import Qt
+from qtpy.QtCore import QDateTime, Qt
from qtpy.QtGui import QStandardItem, QStandardItemModel
+from tiled.client.container import Container
from firefly import display
from firefly.run_browser.client import DatabaseWorker
@@ -48,21 +53,40 @@ class RunBrowserDisplay(display.FireflyDisplay):
selected_runs: list
_running_db_tasks: Mapping
+ proposal_channel: PyDMChannel
+ esaf_channel: PyDMChannel
+
export_dialog: Optional[ExportDialog] = None
# Counter for keeping track of UI hints for long DB hits
_busy_hinters: Counter
- def __init__(self, root_node=None, args=None, macros=None, **kwargs):
+ def __init__(self, args=None, macros=None, **kwargs):
super().__init__(args=args, macros=macros, **kwargs)
self.selected_runs = []
self._running_db_tasks = {}
self._busy_hinters = Counter()
- self.db = DatabaseWorker(catalog=root_node)
- # Load the list of all runs for the selection widget
- self.db_task(self.load_runs(), name="init_load_runs")
- # Load the list of filters' field values into the comboboxes
- self.db_task(self.update_combobox_items(), name="update_combobox_items")
+ self.reset_default_filters()
+
+ async def setup_database(self, tiled_client: Container, catalog_name: str):
+ """Prepare to use a set of databases accessible through *tiled_client*.
+
+ Parameters
+ ==========
+ Each key in *tiled_client* should be"""
+ self.db = DatabaseWorker(tiled_client)
+ self.ui.catalog_combobox.addItems(await self.db.catalog_names())
+ self.ui.catalog_combobox.setCurrentText(catalog_name)
+ await self.change_catalog(catalog_name)
+
+ @asyncSlot(str)
+ async def change_catalog(self, catalog_name: str):
+ """Activate a different catalog in the Tiled server."""
+ await self.db_task(self.db.change_catalog(catalog_name), name="change_catalog")
+ await self.db_task(
+ asyncio.gather(self.load_runs(), self.update_combobox_items()),
+ name="change_catalog",
+ )
def db_task(self, coro, name="default task"):
"""Executes a co-routine as a database task. Existing database
@@ -106,28 +130,45 @@ async def load_runs(self):
self.runs_total_label.setText(str(self.ui.runs_model.rowCount()))
def clear_filters(self):
- self.ui.filter_proposal_combobox.setCurrentText("")
- self.ui.filter_esaf_combobox.setCurrentText("")
+ self.ui.filter_plan_combobox.setCurrentText("")
self.ui.filter_sample_combobox.setCurrentText("")
+ self.ui.filter_formula_combobox.setCurrentText("")
+ self.ui.filter_edge_combobox.setCurrentText("")
self.ui.filter_exit_status_combobox.setCurrentText("")
+ self.ui.filter_user_combobox.setCurrentText("")
+ self.ui.filter_proposal_combobox.setCurrentText("")
+ self.ui.filter_esaf_combobox.setCurrentText("")
self.ui.filter_current_proposal_checkbox.setChecked(False)
self.ui.filter_current_esaf_checkbox.setChecked(False)
- self.ui.filter_plan_combobox.setCurrentText("")
+ self.ui.filter_beamline_combobox.setCurrentText("")
+ self.ui.filter_after_checkbox.setChecked(False)
+ self.ui.filter_before_checkbox.setChecked(False)
self.ui.filter_full_text_lineedit.setText("")
- self.ui.filter_edge_combobox.setCurrentText("")
- self.ui.filter_user_combobox.setCurrentText("")
+ self.ui.filter_standards_checkbox.setChecked(False)
+
+ def reset_default_filters(self):
+ self.clear_filters()
+ self.ui.filter_exit_status_combobox.setCurrentText("success")
+ self.ui.filter_current_esaf_checkbox.setChecked(True)
+ self.ui.filter_current_proposal_checkbox.setChecked(True)
+ self.ui.filter_after_checkbox.setChecked(True)
+ last_week = dt.datetime.now().astimezone() - dt.timedelta(days=7)
+ last_week = QDateTime.fromTime_t(int(last_week.timestamp()))
+ self.ui.filter_after_datetimeedit.setDateTime(last_week)
async def update_combobox_items(self):
""""""
with self.busy_hints(run_table=False, run_widgets=False, filter_widgets=True):
fields = await self.db.load_distinct_fields()
for field_name, cb in [
- ("proposal_users", self.ui.filter_proposal_combobox),
- ("proposal_id", self.ui.filter_user_combobox),
- ("esaf_id", self.ui.filter_esaf_combobox),
- ("sample_name", self.ui.filter_sample_combobox),
("plan_name", self.ui.filter_plan_combobox),
+ ("sample_name", self.ui.filter_sample_combobox),
+ ("sample_formula", self.ui.filter_formula_combobox),
("edge", self.ui.filter_edge_combobox),
+ ("exit_status", self.ui.filter_exit_status_combobox),
+ ("proposal_id", self.ui.filter_proposal_combobox),
+ ("esaf_id", self.ui.filter_esaf_combobox),
+ ("beamline_id", self.ui.filter_beamline_combobox),
]:
if field_name in fields.keys():
old_text = cb.currentText()
@@ -135,36 +176,15 @@ async def update_combobox_items(self):
cb.addItems(fields[field_name])
cb.setCurrentText(old_text)
- @asyncSlot()
- @cancellable
- async def sleep_slot(self):
- await self.db_task(self.print_sleep())
-
- async def print_sleep(self):
- with self.busy_hints(run_widgets=True, run_table=True, filter_widgets=True):
- label = self.ui.sleep_label
- label.setText(f"3...")
- await asyncio.sleep(1)
- old_text = label.text()
- label.setText(f"{old_text}2...")
- await asyncio.sleep(1)
- old_text = label.text()
- label.setText(f"{old_text}1...")
- await asyncio.sleep(1)
- old_text = label.text()
- label.setText(f"{old_text}done!")
-
def customize_ui(self):
self.load_models()
# Setup controls for select which run to show
-
self.ui.run_tableview.selectionModel().selectionChanged.connect(
self.update_selected_runs
)
self.ui.refresh_runs_button.setIcon(qta.icon("fa5s.sync"))
self.ui.refresh_runs_button.clicked.connect(self.reload_runs)
- # Sleep controls for testing async timing
- self.ui.sleep_button.clicked.connect(self.sleep_slot)
+ self.ui.reset_filters_button.clicked.connect(self.reset_default_filters)
# Respond to changes in displaying the 1d plot
for signal in [
self.ui.signal_y_combobox.currentTextChanged,
@@ -192,6 +212,8 @@ def customize_ui(self):
self.ui.invert_checkbox_2d.stateChanged.connect(self.update_2d_plot)
self.ui.gradient_checkbox_2d.stateChanged.connect(self.update_2d_plot)
self.ui.plot_2d_hints_checkbox.stateChanged.connect(self.update_2d_signals)
+ # Select a new catalog
+ self.ui.catalog_combobox.currentTextChanged.connect(self.change_catalog)
# Respond to filter controls getting updated
self.ui.filters_widget.returnPressed.connect(self.refresh_runs_button.click)
# Respond to controls for the current run
@@ -207,6 +229,47 @@ def customize_ui(self):
# Create a new export dialog for saving files
self.export_dialog = ExportDialog(parent=self)
+ async def update_devices(self, registry):
+ try:
+ bss_device = registry["bss"]
+ except KeyError:
+ log.warning("Could not find device 'bss', disabling 'current' filters.")
+ self.ui.filter_current_proposal_checkbox.setChecked(False),
+ self.ui.filter_current_proposal_checkbox.setEnabled(False),
+ self.ui.filter_current_esaf_checkbox.setChecked(False),
+ self.ui.filter_current_esaf_checkbox.setEnabled(False),
+ else:
+ self.setup_bss_channels(bss_device)
+ await super().update_devices(registry)
+
+ def setup_bss_channels(self, bss: Device | ThreadedDevice):
+ """Setup channels to update the proposal and ESAF ID boxes."""
+ if getattr(self, "proposal_channel", None) is not None:
+ self.proposal_channel.disconnect()
+ self.proposal_channel = PyDMChannel(
+ address=f"haven://{bss.proposal.proposal_id.name}",
+ value_slot=partial(
+ self.update_bss_filter,
+ combobox=self.ui.filter_proposal_combobox,
+ checkbox=self.ui.filter_current_proposal_checkbox,
+ ),
+ )
+ if getattr(self, "esaf_channel", None) is not None:
+ self.esaf_channel.disconnect()
+ self.esaf_channel = PyDMChannel(
+ address=f"haven://{bss.esaf.esaf_id.name}",
+ value_slot=partial(
+ self.update_bss_filter,
+ combobox=self.ui.filter_esaf_combobox,
+ checkbox=self.ui.filter_current_esaf_checkbox,
+ ),
+ )
+
+ def update_bss_filter(self, text: str, *, combobox, checkbox):
+ """If *checkbox* is checked, update *combobox* with new *text*."""
+ if checkbox.checkState():
+ combobox.setCurrentText(text)
+
def auto_range(self):
self.plot_1d_view.autoRange()
@@ -276,6 +339,20 @@ def busy_hints(self, run_widgets=True, run_table=True, filter_widgets=True):
self._busy_hinters.subtract(hinters)
self.update_busy_hints()
+ @asyncSlot()
+ async def update_streams(self, *args):
+ """Update the list of available streams to choose from."""
+ stream_names = await self.db.stream_names()
+ # Sort so that "primary" is first
+ sorted(stream_names, key=lambda x: x != "primary")
+ self.ui.stream_combobox.clear()
+ self.ui.stream_combobox.addItems(stream_names)
+
+ @property
+ def stream(self):
+ current_text = self.ui.stream_combobox.currentText()
+ return current_text or "primary"
+
@asyncSlot()
@cancellable
async def update_multi_signals(self, *args):
@@ -286,7 +363,8 @@ async def update_multi_signals(self, *args):
# Determine valid list of columns to choose from
use_hints = self.ui.plot_multi_hints_checkbox.isChecked()
signals_task = self.db_task(
- self.db.signal_names(hinted_only=use_hints), "multi signals"
+ self.db.signal_names(hinted_only=use_hints, stream=self.stream),
+ "multi signals",
)
xcols, ycols = await signals_task
# Update the comboboxes with new signals
@@ -310,7 +388,8 @@ async def update_1d_signals(self, *args):
# Determine valid list of columns to choose from
use_hints = self.ui.plot_1d_hints_checkbox.isChecked()
signals_task = self.db_task(
- self.db.signal_names(hinted_only=use_hints), "1D signals"
+ self.db.signal_names(hinted_only=use_hints, stream=self.stream),
+ name="1D signals",
)
xcols, ycols = await signals_task
self.multi_y_signals = ycols
@@ -336,7 +415,8 @@ async def update_2d_signals(self, *args):
# Determine valid list of dependent signals to choose from
use_hints = self.ui.plot_2d_hints_checkbox.isChecked()
xcols, vcols = await self.db_task(
- self.db.signal_names(hinted_only=use_hints), "2D signals"
+ self.db.signal_names(hinted_only=use_hints, stream=self.stream),
+ "2D signals",
)
# Update the UI with the list of controls
val_cb.clear()
@@ -352,7 +432,7 @@ async def update_multi_plot(self, *args):
return
use_hints = self.ui.plot_multi_hints_checkbox.isChecked()
runs = await self.db_task(
- self.db.all_signals(hinted_only=use_hints), "multi-plot"
+ self.db.all_signals(hinted_only=use_hints, stream=self.stream), "multi-plot"
)
self.ui.plot_multi_view.plot_runs(runs, xsignal=x_signal)
@@ -413,6 +493,7 @@ async def update_1d_plot(self, *args, uids: Sequence[str] = None):
use_invert=use_invert,
use_grad=use_grad,
uids=uids,
+ stream=self.stream,
),
"1D plot",
)
@@ -449,7 +530,9 @@ async def update_2d_plot(self):
use_log = self.ui.logarithm_checkbox_2d.isChecked()
use_invert = self.ui.invert_checkbox_2d.isChecked()
use_grad = self.ui.gradient_checkbox_2d.isChecked()
- images = await self.db_task(self.db.images(value_signal), "2D plot")
+ images = await self.db_task(
+ self.db.images(value_signal, stream=self.stream), "2D plot"
+ )
# Get axis labels
# Eventually this will be replaced with robust choices for plotting multiple images
metadata = await self.db_task(self.db.metadata(), "2D plot")
@@ -523,6 +606,7 @@ async def update_selected_runs(self, *args):
self.update_multi_signals(),
self.update_1d_signals(),
self.update_2d_signals(),
+ self.update_streams(),
)
# Update the plots
self.clear_plots()
@@ -531,19 +615,28 @@ async def update_selected_runs(self, *args):
def filters(self, *args):
new_filters = {
- "proposal": self.ui.filter_proposal_combobox.currentText(),
- "esaf": self.ui.filter_esaf_combobox.currentText(),
- "sample": self.ui.filter_sample_combobox.currentText(),
- "exit_status": self.ui.filter_exit_status_combobox.currentText(),
- "use_current_proposal": bool(
- self.ui.filter_current_proposal_checkbox.checkState()
- ),
- "use_current_esaf": bool(self.ui.filter_current_esaf_checkbox.checkState()),
"plan": self.ui.filter_plan_combobox.currentText(),
- "full_text": self.ui.filter_full_text_lineedit.text(),
+ "sample": self.ui.filter_sample_combobox.currentText(),
+ "formula": self.ui.filter_formula_combobox.currentText(),
"edge": self.ui.filter_edge_combobox.currentText(),
+ "exit_status": self.ui.filter_exit_status_combobox.currentText(),
"user": self.ui.filter_user_combobox.currentText(),
+ "proposal": self.ui.filter_proposal_combobox.currentText(),
+ "esaf": self.ui.filter_esaf_combobox.currentText(),
+ "beamline": self.ui.filter_beamline_combobox.currentText(),
+ "full_text": self.ui.filter_full_text_lineedit.text(),
}
+ # Special handling for the time-based filters
+ if self.ui.filter_after_checkbox.checkState():
+ after = self.ui.filter_after_datetimeedit.dateTime().toSecsSinceEpoch()
+ new_filters["after"] = after
+ if self.ui.filter_before_checkbox.checkState():
+ before = self.ui.filter_before_datetimeedit.dateTime().toSecsSinceEpoch()
+ new_filters["before"] = before
+ # Limit the search to standards only
+ if self.ui.filter_standards_checkbox.checkState():
+ new_filters["standards_only"] = True
+ # Only include values that were actually filled in
null_values = ["", False]
new_filters = {k: v for k, v in new_filters.items() if v not in null_values}
return new_filters
diff --git a/src/firefly/run_browser/run_browser.ui b/src/firefly/run_browser/run_browser.ui
index d4caa98d..7555e73b 100644
--- a/src/firefly/run_browser/run_browser.ui
+++ b/src/firefly/run_browser/run_browser.ui
@@ -7,420 +7,609 @@
0
0
1408
- 775
+ 961
Run Browser
-
+
-
Qt::Horizontal
-
+
-
-
-
-
- 1
- 0
-
-
-
-
- 16777215
- 16777215
-
-
-
-
- 100
- 0
-
-
-
- true
-
-
-
- -
-
-
- 0
-
-
- 0
+
+
+ Qt::Vertical
-
-
-
-
- Filters
-
-
- true
-
-
- true
-
-
-
- -
-
-
- Qt::Horizontal
-
-
-
- 40
- 20
-
-
-
-
- -
-
-
-
- 0
- 0
-
-
-
-
- 0
- 0
-
+
+
+
-
+
+
+ 0
+
+
-
+
+
+ true
+
+
+ Catalog:
+
+
+
+ -
+
+
+ true
+
+
+
+ -
+
+
+ Qt::Horizontal
+
+
+
+ 40
+ 20
+
+
+
+
+
+
+ -
+
+
+
+ 1
+ 0
+
+
+
+
+ 16777215
+ 16777215
+
+
+
+
+ 100
+ 0
+
+
+
+ true
+
+
+
+
+
+
+
+
+ 0
-
- -
-
-
- Qt::Horizontal
-
-
-
- 40
- 20
-
-
-
-
- -
-
-
- 0
-
-
- 0
-
-
-
- -
-
-
- total
+
-
+
+
+ 0
+
+
+ 0
+
+
-
+
+
+ 1
+
+
+ 0
+
+
-
+
+
+ 0
+
+
+ 0
+
+
+
+ -
+
+
+ total
+
+
+
+
+
+ -
+
+
+ Qt::Horizontal
+
+
+
+ 40
+ 20
+
+
+
+
+ -
+
+
+ Retrieve the list of runs matching the filters.
+
+
+ Refresh
+
+
+
+
+
+ -
+
+
+ true
+
+
+
+
+ 0
+ 0
+ 376
+ 591
+
+
+
-
+
+
+
+ 0
+
+
+ 0
+
+
+ 0
+
+
+ 0
+
+
-
+
+
+ Qt::AlignRight|Qt::AlignTrailing|Qt::AlignVCenter
+
+
+ 0
+
+
-
+
+
+ Plan:
+
+
+
+ -
+
+
+ true
+
+
+
+
+
+
+ -
+
+
+ Sample:
+
+
+
+ -
+
+
+ The sample composition to look for. Supports regular expression. E.g. Sb.*Te
+
+
+ true
+
+
+
+
+
+
+ -
+
+
+ Formula:
+
+
+
+ -
+
+
+ true
+
+
+
+ -
+
+
+ Edge:
+
+
+
+ -
+
+
+ The X-ray absorption edge, or energy in electron-volts, that an energy scan was collected. Supports regular expressions. E.g. Ni_K
+
+
+ true
+
+
+
+
+
+
+ -
+
+
+ true
+
+
+ Status:
+
+
+
+ -
+
+
+ true
+
+
+ true
+
+
+ success
+
+
+
+ -
+
+
+ User:
+
+
+
+ -
+
+
+ true
+
+
+
+
+
+
+ -
+
+
+ Proposal:
+
+
+
+ -
+
+
-
+
+
+ false
+
+
+
+ 0
+ 0
+
+
+
+ true
+
+
+
+ -
+
+
+ true
+
+
+
+ 0
+ 0
+
+
+
+
+ 0
+ 0
+
+
+
+ Current
+
+
+ true
+
+
+
+
+
+ -
+
+
+ ESAF:
+
+
+
+ -
+
+
-
+
+
+ false
+
+
+ true
+
+
+
+ -
+
+
+ true
+
+
+
+ 0
+ 0
+
+
+
+ Current
+
+
+ true
+
+
+ true
+
+
+ false
+
+
+
+
+
+ -
+
+
+ Beamline:
+
+
+
+ -
+
+
+ true
+
+
+
+ -
+
+
+ After:
+
+
+
+ -
+
+
+ 0
+
+
-
+
+
+
+ 0
+ 0
+
+
+
+
+
+
+ true
+
+
+
+ -
+
+
+ yyyy-MM-dd HH.mm
+
+
+ true
+
+
+
+ -
+
+
+ Qt::Horizontal
+
+
+
+ 40
+ 20
+
+
+
+
+
+
+ -
+
+
+ Before:
+
+
+
+ -
+
+
+ 0
+
+
-
+
+
+
+ 0
+ 0
+
+
+
+
+
+
+
+ -
+
+
+ false
+
+
+ true
+
+
+
+ -
+
+
+ Qt::Horizontal
+
+
+
+ 40
+ 20
+
+
+
+
+
+
+ -
+
+
+ Full Text:
+
+
+
+ -
+
+
+ -
+
+
+ Standards only
+
+
+
+ -
+
+
+ 0
+
+
-
+
+
+ Qt::Horizontal
+
+
+
+ 40
+ 5
+
+
+
+
+ -
+
+
+ Reset the filters to their defaults.
+
+
+ Reset Defaults
+
+
+
+
+
+ -
+
+
+ Qt::Vertical
+
+
+
+ 20
+ 40
+
+
+
+
+
+
+
+
+
+
-
- -
-
-
- Qt::Horizontal
-
-
-
- 40
- 20
-
-
-
-
-
-
-
- -
-
-
- Qt::Horizontal
-
-
-
- 40
- 20
-
-
-
-
- -
-
-
- Refresh
-
-
-
-
+
+
+
+
+
+
+
+
+
-
-
-
- 4
-
-
-
-
-
- Sleep
-
-
-
+
-
-
+
- ⬅ Press the button
+ Stream:
-
-
-
- Qt::Horizontal
-
-
-
- 40
- 20
-
-
-
+
-
-
- -
-
-
-
- 0
-
-
- 0
-
-
- 0
-
-
- 0
-
-
-
-
-
- Qt::AlignRight|Qt::AlignTrailing|Qt::AlignVCenter
-
-
- 6
-
-
-
-
-
- Plan:
-
-
-
- -
-
-
- true
-
-
-
-
-
-
- -
-
-
- Sample:
-
-
-
- -
-
-
- The sample composition to look for. Supports regular expression. E.g. Sb.*Te
-
-
- true
-
-
-
-
-
-
- -
-
-
- Edge:
-
-
-
- -
-
-
- The X-ray absorption edge, or energy in electron-volts, that an energy scan was collected. Supports regular expressions. E.g. Ni_K
-
-
- true
-
-
-
-
-
-
- -
-
-
- User:
-
-
-
- -
-
-
- true
-
-
-
-
-
-
- -
-
-
- Proposal:
-
-
-
- -
-
-
-
-
-
-
- 0
- 0
-
-
-
- true
-
-
-
- -
-
-
- false
-
-
-
- 0
- 0
-
-
-
-
- 100
- 0
-
-
-
- Current
-
-
- true
-
-
-
-
-
- -
-
-
- ESAF:
-
-
-
- -
-
-
-
-
-
- true
-
-
-
- -
-
-
- false
-
-
- Current
-
-
- true
-
-
- false
-
-
-
-
-
- -
-
-
- Full Text:
-
-
-
- -
-
-
- -
-
-
- false
-
-
- true
-
-
- success
-
-
-
- -
-
-
- false
-
-
- Status:
-
-
-
-
-
-
-
-
-
-
-
-
- -
-
-
@@ -475,7 +664,7 @@
- 2
+ 1
@@ -566,8 +755,8 @@
0
0
- 864
- 638
+ 964
+ 824
@@ -1015,11 +1204,6 @@
-
- RevealButton
- QPushButton
-
-
FiltersWidget
QWidget
@@ -1046,32 +1230,11 @@
- run_tableview
- filters_button
- refresh_runs_button
filter_proposal_combobox
- filter_current_proposal_checkbox
filter_esaf_combobox
- filter_current_esaf_checkbox
-
- filters_button
- toggled(bool)
- filters_widget
- setVisible(bool)
-
-
- 50
- 341
-
-
- 149
- 365
-
-
-
filter_current_esaf_checkbox
toggled(bool)
@@ -1120,5 +1283,37 @@
+
+ filter_after_checkbox
+ toggled(bool)
+ filter_after_datetimeedit
+ setEnabled(bool)
+
+
+ 73
+ 699
+
+
+ 146
+ 699
+
+
+
+
+ filter_before_checkbox
+ toggled(bool)
+ filter_before_datetimeedit
+ setEnabled(bool)
+
+
+ 73
+ 726
+
+
+ 146
+ 726
+
+
+
diff --git a/src/firefly/run_browser/tests/test_client.py b/src/firefly/run_browser/tests/test_client.py
index 209b4008..55f6c35f 100644
--- a/src/firefly/run_browser/tests/test_client.py
+++ b/src/firefly/run_browser/tests/test_client.py
@@ -3,23 +3,40 @@
from firefly.run_browser.client import DatabaseWorker
+@pytest.fixture()
+async def worker(tiled_client):
+ worker = DatabaseWorker(tiled_client)
+ await worker.change_catalog("255id_testing")
+ return worker
+
+
@pytest.mark.asyncio
-async def test_filter_runs(catalog):
- worker = DatabaseWorker(catalog=catalog)
+async def test_catalog_names(worker):
+ assert (await worker.catalog_names()) == ["255id_testing", "255bm_testing"]
+
+
+@pytest.mark.asyncio
+async def test_filter_runs(worker):
runs = await worker.load_all_runs(filters={"plan": "xafs_scan"})
# Check that the runs were filtered
assert len(runs) == 1
@pytest.mark.asyncio
-async def test_distinct_fields(catalog):
- worker = DatabaseWorker(catalog=catalog)
+async def test_distinct_fields(worker):
distinct_fields = await worker.load_distinct_fields()
# Check that the dictionary has the right structure
for key in ["sample_name"]:
assert key in distinct_fields.keys()
+async def test_stream_names(worker):
+ uids = (await worker.catalog.client).keys()
+ await worker.load_selected_runs(uids)
+ stream_names = await worker.stream_names()
+ assert stream_names == ["primary"]
+
+
# -----------------------------------------------------------------------------
# :author: Mark Wolfman
# :email: wolfman@anl.gov
diff --git a/src/firefly/run_browser/tests/test_display.py b/src/firefly/run_browser/tests/test_display.py
index d8da645f..5082e3b2 100644
--- a/src/firefly/run_browser/tests/test_display.py
+++ b/src/firefly/run_browser/tests/test_display.py
@@ -1,16 +1,27 @@
import asyncio
-from unittest.mock import MagicMock
+import datetime as dt
+from functools import partial
+from unittest.mock import AsyncMock, MagicMock
import numpy as np
import pytest
+import time_machine
+from ophyd.sim import instantiate_fake_device
from pyqtgraph import ImageItem, ImageView, PlotItem, PlotWidget
from qtpy.QtWidgets import QFileDialog
from firefly.run_browser.display import RunBrowserDisplay
+from haven.devices.beamline_manager import EpicsBssDevice
@pytest.fixture()
-async def display(qtbot, catalog, mocker):
+def bss(sim_registry):
+ bss_ = instantiate_fake_device(EpicsBssDevice, prefix="apsbss:", name="bss")
+ return bss_
+
+
+@pytest.fixture()
+async def display(qtbot, tiled_client, catalog, mocker):
mocker.patch(
"firefly.run_browser.widgets.ExportDialog.exec_",
return_value=QFileDialog.Accepted,
@@ -20,21 +31,17 @@ async def display(qtbot, catalog, mocker):
return_value=["/net/s255data/export/test_file.nx"],
)
mocker.patch("firefly.run_browser.client.DatabaseWorker.export_runs")
- display = RunBrowserDisplay(root_node=catalog)
+ display = RunBrowserDisplay()
qtbot.addWidget(display)
display.clear_filters()
# Wait for the initial database load to process
- await display._running_db_tasks["init_load_runs"]
- await display._running_db_tasks["update_combobox_items"]
+ await display.setup_database(tiled_client, catalog_name="255id_testing")
+ display.db.stream_names = AsyncMock(return_value=["primary", "baseline"])
# Set up some fake data
run = [run async for run in catalog.values()][0]
display.db.selected_runs = [run]
await display.update_1d_signals()
- print(run.uid)
- run_data = await run.data()
- expected_xdata = run_data.energy_energy
- expected_ydata = np.log(run_data.I0_net_counts / run_data.It_net_counts)
- expected_ydata = np.gradient(expected_ydata, expected_xdata)
+ run_data = await run.data(stream="primary")
# Set the controls to describe the data we want to test
x_combobox = display.ui.signal_x_combobox
x_combobox.addItem("energy_energy")
@@ -48,7 +55,6 @@ async def display(qtbot, catalog, mocker):
return display
-@pytest.mark.asyncio
async def test_db_task(display):
async def test_coro():
return 15
@@ -57,7 +63,6 @@ async def test_coro():
assert result == 15
-@pytest.mark.asyncio
async def test_db_task_interruption(display):
async def test_coro(sleep_time):
await asyncio.sleep(sleep_time)
@@ -80,7 +85,6 @@ def test_load_runs(display):
assert display.ui.runs_total_label.text() == str(display.runs_model.rowCount())
-@pytest.mark.asyncio
async def test_update_selected_runs(display):
# Change the proposal item
item = display.runs_model.item(0, 1)
@@ -92,7 +96,6 @@ async def test_update_selected_runs(display):
assert len(display.db.selected_runs) > 0
-@pytest.mark.asyncio
async def test_update_selected_runs(display):
# Change the proposal item
item = display.runs_model.item(0, 1)
@@ -110,7 +113,6 @@ async def test_clear_plots(display):
assert display.plot_1d_view.clear_runs.called
-@pytest.mark.asyncio
async def test_metadata(display):
# Change the proposal item
display.ui.run_tableview.selectRow(0)
@@ -120,7 +122,6 @@ async def test_metadata(display):
assert "xafs_scan" in text
-@pytest.mark.asyncio
async def test_1d_plot_signals(catalog, display):
# Check that the 1D plot was created
plot_widget = display.ui.plot_1d_view
@@ -142,8 +143,7 @@ async def test_1d_plot_signals(catalog, display):
# Warns: Task was destroyed but it is pending!
-@pytest.mark.asyncio
-async def test_1d_plot_signal_memory(catalog, display):
+async def test_1d_plot_signal_memory(display):
"""Do we remember the signals that were previously selected."""
# Check that the 1D plot was created
plot_widget = display.ui.plot_1d_view
@@ -164,7 +164,6 @@ async def test_1d_plot_signal_memory(catalog, display):
# Warns: Task was destroyed but it is pending!
-@pytest.mark.asyncio
async def test_1d_hinted_signals(catalog, display):
display.ui.plot_1d_hints_checkbox.setChecked(True)
# Check that the 1D plot was created
@@ -186,7 +185,6 @@ async def test_1d_hinted_signals(catalog, display):
), f"unhinted signal found in {combobox.objectName()}."
-@pytest.mark.asyncio
async def test_update_1d_plot(catalog, display):
display.plot_1d_view.plot_runs = MagicMock()
display.plot_1d_view.autoRange = MagicMock()
@@ -223,7 +221,6 @@ async def test_update_running_scan(display):
# Warns: Task was destroyed but it is pending!
-@pytest.mark.asyncio
async def test_2d_plot_signals(catalog, display):
# Check that the 1D plot was created
plot_widget = display.ui.plot_2d_view
@@ -238,7 +235,6 @@ async def test_2d_plot_signals(catalog, display):
assert combobox.findText("It_net_counts") > -1
-@pytest.mark.asyncio
async def test_update_2d_plot(catalog, display):
display.plot_2d_item.setRect = MagicMock()
# Load test data
@@ -255,7 +251,7 @@ async def test_update_2d_plot(catalog, display):
# Update the plots
await display.update_2d_plot()
# Determine what the image data should look like
- expected_data = await run["It_net_counts"]
+ expected_data = await run.__getitem__("It_net_counts", stream="primary")
expected_data = expected_data.reshape((5, 21)).T
# Check that the data were added
image = display.plot_2d_item.image
@@ -269,11 +265,12 @@ async def test_update_2d_plot(catalog, display):
display.plot_2d_item.setRect.assert_called_with(-100, -80, 200, 160)
-@pytest.mark.asyncio
async def test_update_multi_plot(catalog, display):
run = await catalog["7d1daf1d-60c7-4aa7-a668-d1cd97e5335f"]
- expected_xdata = await run["energy_energy"]
- expected_ydata = np.log(await run["I0_net_counts"] / await run["It_net_counts"])
+ expected_xdata = await run.__getitem__("energy_energy", stream="primary")
+ I0 = await run.__getitem__("I0_net_counts", stream="primary")
+ It = await run.__getitem__("It_net_counts", stream="primary")
+ expected_ydata = np.log(I0 / It)
expected_ydata = np.gradient(expected_ydata, expected_xdata)
# Configure signals
display.ui.multi_signal_x_combobox.addItem("energy_energy")
@@ -343,13 +340,18 @@ def test_busy_hints_multiple(display):
assert display.ui.detail_tabwidget.isEnabled()
-@pytest.mark.asyncio
async def test_update_combobox_items(display):
"""Check that the comboboxes get the distinct filter fields."""
assert display.ui.filter_plan_combobox.count() > 0
+ assert display.ui.filter_sample_combobox.count() > 0
+ assert display.ui.filter_formula_combobox.count() > 0
+ assert display.ui.filter_edge_combobox.count() > 0
+ assert display.ui.filter_exit_status_combobox.count() > 0
+ assert display.ui.filter_proposal_combobox.count() > 0
+ assert display.ui.filter_esaf_combobox.count() > 0
+ assert display.ui.filter_beamline_combobox.count() > 0
-@pytest.mark.asyncio
async def test_export_button_enabled(catalog, display):
assert not display.export_button.isEnabled()
# Update the list with 1 run and see if the control gets enabled
@@ -363,7 +365,6 @@ async def test_export_button_enabled(catalog, display):
assert not display.export_button.isEnabled()
-@pytest.mark.asyncio
async def test_export_button_clicked(catalog, display, mocker, qtbot):
# Set up a run to be tested against
run = MagicMock()
@@ -388,6 +389,75 @@ async def test_export_button_clicked(catalog, display, mocker, qtbot):
assert display.db.export_runs.call_args.kwargs["formats"] == ["application/json"]
+fake_time = dt.datetime(2022, 8, 19, 19, 10, 51).astimezone()
+
+
+@time_machine.travel(fake_time, tick=False)
+def test_default_filters(display):
+ display.clear_filters()
+ display.reset_default_filters()
+ assert display.ui.filter_exit_status_combobox.currentText() == "success"
+ assert display.ui.filter_current_esaf_checkbox.checkState()
+ assert display.ui.filter_current_proposal_checkbox.checkState()
+ assert display.ui.filter_after_checkbox.checkState()
+ last_week = dt.datetime(2022, 8, 12, 19, 10, 51)
+ filter_time = display.ui.filter_after_datetimeedit.dateTime()
+ filter_time = dt.datetime.fromtimestamp(filter_time.toTime_t())
+ assert filter_time == last_week
+
+
+def test_time_filters(display):
+ """Check that the before and after datetime filters are activated."""
+ display.ui.filter_after_checkbox.setChecked(False)
+ display.ui.filter_before_checkbox.setChecked(False)
+ filters = display.filters()
+ assert "after" not in filters
+ assert "before" not in filters
+ display.ui.filter_after_checkbox.setChecked(True)
+ display.ui.filter_before_checkbox.setChecked(True)
+ filters = display.filters()
+ assert "after" in filters
+ assert "before" in filters
+
+
+def test_bss_channels(display, bss):
+ """Do the widgets get updated based on the BSS proposal ID, etc."""
+ display.setup_bss_channels(bss)
+ assert (
+ display.proposal_channel.address == f"haven://{bss.proposal.proposal_id.name}"
+ )
+ assert display.esaf_channel.address == f"haven://{bss.esaf.esaf_id.name}"
+
+
+def test_update_bss_filters(display):
+ checkbox = display.ui.filter_current_proposal_checkbox
+ combobox = display.ui.filter_proposal_combobox
+ update_slot = partial(
+ display.update_bss_filter, combobox=combobox, checkbox=checkbox
+ )
+ # Enable the "current" checkbox, and make sure the combobox updates
+ checkbox.setChecked(True)
+ update_slot("89321")
+ assert combobox.currentText() == "89321"
+ # Disable the "current" checkbox, and make sure the combobox doesn't update
+ checkbox.setChecked(False)
+ update_slot("99531")
+ assert combobox.currentText() == "89321"
+
+
+def test_catalog_choices(display, tiled_client):
+ combobox = display.ui.catalog_combobox
+ items = [combobox.itemText(idx) for idx in range(combobox.count())]
+ assert items == ["255id_testing", "255bm_testing"]
+
+
+async def test_stream_choices(display, tiled_client):
+ await display.update_streams()
+ combobox = display.ui.stream_combobox
+ items = [combobox.itemText(idx) for idx in range(combobox.count())]
+ assert items == ["primary", "baseline"]
+
+
# -----------------------------------------------------------------------------
# :author: Mark Wolfman
# :email: wolfman@anl.gov
diff --git a/src/haven/__init__.py b/src/haven/__init__.py
index 1b65a39d..cd8f8d56 100644
--- a/src/haven/__init__.py
+++ b/src/haven/__init__.py
@@ -36,8 +36,7 @@
from ._iconfig import load_config # noqa: F401
# Top-level imports
-# from .catalog import load_catalog, load_data, load_result, tiled_client # noqa: F401
-from .catalog import load_catalog, tiled_client # noqa: F401
+from .catalog import tiled_client # noqa: F401
from .constants import edge_energy # noqa: F401
from .devices import IonChamber, Monochromator, Robot, ion_chamber # noqa: F401
from .devices.motor import HavenMotor # noqa: F401
diff --git a/src/haven/catalog.py b/src/haven/catalog.py
index 7ec4185a..23623f68 100644
--- a/src/haven/catalog.py
+++ b/src/haven/catalog.py
@@ -1,4 +1,5 @@
import asyncio
+import functools
import logging
import os
import sqlite3
@@ -8,7 +9,6 @@
from functools import partial
from typing import Sequence
-import databroker
import numpy as np
from tiled.client import from_uri
from tiled.client.cache import Cache
@@ -18,6 +18,23 @@
log = logging.getLogger(__name__)
+def run_in_executor(_func):
+ """Decorator that makes the wrapped synchronous function asynchronous.
+
+ This is done by running the wrapped function in the default
+ asyncio executor.
+
+ """
+
+ @functools.wraps(_func)
+ def wrapped(*args, **kwargs):
+ loop = asyncio.get_running_loop()
+ func = functools.partial(_func, *args, **kwargs)
+ return loop.run_in_executor(None, func)
+
+ return wrapped
+
+
def unsnake(arr: np.ndarray, snaking: list) -> np.ndarray:
"""Unsnake a nump array.
@@ -48,83 +65,6 @@ def unsnake(arr: np.ndarray, snaking: list) -> np.ndarray:
return arr
-def load_catalog(name: str = "bluesky"):
- """Load a databroker catalog for retrieving data.
-
- To retrieve individual scans, consider the ``load_result`` and
- ``load_data`` methods.
-
- Parameters
- ==========
- name
- The name of the catalog as defined in the Intake file
- (e.g. ~/.local/share/intake/catalogs.yml)
-
- Returns
- =======
- catalog
- The databroker catalog.
- """
- return databroker.catalog[name]
-
-
-def load_result(uid: str, catalog_name: str = "bluesky", stream: str = "primary"):
- """Load a past experiment from the database.
-
- The result contains metadata and scan parameters. The data
- themselves are accessible from the result's *read()* method.
-
- Parameters
- ==========
- uid
- The universal identifier for this scan, as return by a bluesky
- RunEngine.
- catalog_name
- The name of the catalog as defined in the Intake file
- (e.g. ~/.local/share/intake/catalogs.yml)
- stream
- The data stream defined by the bluesky RunEngine.
-
- Returns
- =======
- result
- The experiment result, with data available via the *read()*
- method.
-
- """
- cat = load_catalog(name=catalog_name)
- result = cat[uid][stream]
- return result
-
-
-def load_data(uid, catalog_name="bluesky", stream="primary"):
- """Load a past experiment's data from the database.
-
- The result is an xarray with the data collected.
-
- Parameters
- ==========
- uid
- The universal identifier for this scan, as return by a bluesky
- RunEngine.
- catalog_name
- The name of the catalog as defined in the Intake file
- (e.g. ~/.local/share/intake/catalogs.yml)
- stream
- The data stream defined by the bluesky RunEngine.
-
- Returns
- =======
- data
- The experimental data, as an xarray.
-
- """
-
- res = load_result(uid=uid, catalog_name=catalog_name, stream=stream)
- data = res.read()
- return data
-
-
def with_thread_lock(fn):
"""Makes sure the function isn't accessed concurrently."""
@@ -171,9 +111,32 @@ def write_safe(self):
delete = with_thread_lock(Cache.delete)
+DEFAULT_NODE = object()
+
+
def tiled_client(
- entry_node=None, uri=None, cache_filepath=None, structure_clients="numpy"
+ catalog: str = DEFAULT_NODE,
+ uri: str = None,
+ cache_filepath=None,
+ structure_clients="numpy",
):
+ """Load a tiled client for retrieving data from databses.
+
+ Parameters
+ ==========
+ catalog
+ The node within the catalog to return, by default this will be
+ read from the config file. If ``None``, the root container will
+ be return containing all catalogs.
+ uri
+ The location of the tiled server, e.g. "http://localhost:8000".
+ cache_filepath
+ Where to keep a local cache of tiled nodes.
+ structure_clients
+ "numpy" for immediate retrieval of data, "dask" for just-in-time
+ retrieval.
+
+ """
config = load_config()
tiled_config = config["database"].get("tiled", {})
# Create a cache for saving local copies
@@ -189,9 +152,10 @@ def tiled_client(
uri = tiled_config["uri"]
api_key = tiled_config.get("api_key")
client_ = from_uri(uri, structure_clients, api_key=api_key)
- if entry_node is None:
- entry_node = tiled_config["entry_node"]
- client_ = client_[entry_node]
+ if catalog is DEFAULT_NODE:
+ client_ = client_[tiled_config["default_catalog"]]
+ elif catalog is not None:
+ client_ = client_[catalog]
return client_
@@ -207,9 +171,12 @@ def __init__(self, container, executor=None):
self.container = container
self.executor = executor
- def _read_data(
- self, signals: Sequence | None, dataset: str = "primary/internal/events"
- ):
+ @run_in_executor
+ def stream_names(self):
+ return list(self.container.keys())
+
+ @run_in_executor
+ def _read_data(self, signals: Sequence | None, dataset: str):
data = self.container[dataset]
if signals is None:
return data.read()
@@ -219,20 +186,10 @@ def _read_data(
signals = signals & available_signals
return data.read()
- def _read_metadata(self, keys=None):
- container = self.container
- if keys is not None:
- container = container[keys]
- return container.metadata
-
@property
def uid(self):
return self.container._item["id"]
- async def run(self, to_call, *args):
- """Run the given syncronous callable in an asynchronous context."""
- return await self.loop.run_in_executor(self.executor, to_call, *args)
-
async def export(self, filename: str, format: str):
target = partial(self.container.export, filename, format=format)
await self.loop.run_in_executor(None, target)
@@ -240,30 +197,32 @@ async def export(self, filename: str, format: str):
def formats(self):
return self.container.formats
- async def data(self, signals=None, stream="primary"):
- return await self.loop.run_in_executor(
- None, self._read_data, signals, f"{stream}/internal/events/"
- )
+ async def data(self, *, signals=None, stream: str = "primary"):
+ return await self._read_data(signals, f"{stream}/internal/events/")
@property
def loop(self):
return asyncio.get_running_loop()
- def _data_keys(self, stream):
- return self.container[stream]["internal/events"].columns
-
- async def data_keys(self, stream="primary"):
- return await self.run(self._data_keys, ("primary",))
+ @run_in_executor
+ def data_keys(self, stream: str = "primary"):
+ return self.container[f"{stream}/internal/events"].columns
- async def hints(self):
+ async def hints(self, stream: str = "primary"):
"""Retrieve the data hints for this scan.
+ Parameters
+ ==========
+ stream
+ The name of the Tiled data stream to look up hints for.
+
Returns
=======
independent
The hints for the independent scanning axis.
dependent
The hints for the dependent scanning axis.
+
"""
metadata = await self.metadata
# Get hints for the independent (X)
@@ -273,20 +232,30 @@ async def hints(self):
warnings.warn("Could not get independent hints")
# Get hints for the dependent (X)
dependent = []
- primary_metadata = await self.run(self._read_metadata, "primary")
+ primary_metadata = await self._read_metadata(stream)
hints = primary_metadata["hints"]
for device, dev_hints in hints.items():
dependent.extend(dev_hints["fields"])
return independent, dependent
+ @run_in_executor
+ def _read_metadata(self, keys=None):
+ assert keys != "", "Metadata keys cannot be ''."
+ container = self.container
+ if keys is not None:
+ print(f"{container=}, {keys=}")
+ container = container[keys]
+ return container.metadata
+
@property
async def metadata(self):
- metadata = await self.run(self._read_metadata)
- return metadata
+ return await self._read_metadata()
- async def __getitem__(self, signal):
+ async def __getitem__(self, signal, stream: str = "primary"):
"""Retrieve a signal from the dataset, with reshaping etc."""
- arr = await self.run(self._read_data, tuple([signal]))
+ arr = await self._read_data(
+ [f"{stream}/{signal}"], dataset=f"{stream}/internal/events"
+ )
arr = np.asarray(arr[signal])
# Re-shape to match the scan dimensions
metadata = await self.metadata
@@ -309,6 +278,11 @@ class Catalog:
are structured, so can make some assumptions and takes care of
boiler-plate code (e.g. reshaping maps, etc).
+ Parameters
+ ==========
+ client
+ A Tiled client that has scan UIDs as its keys.
+
"""
_client = None
@@ -320,10 +294,6 @@ def __init__(self, client=None):
def __del__(self):
self.executor.shutdown(wait=True, cancel_futures=True)
- async def run(self, to_call, *args):
- """Run the given syncronous callable in an asynchronous context."""
- return await self.loop.run_in_executor(self.executor, to_call, *args)
-
@property
def loop(self):
return asyncio.get_running_loop()
@@ -331,29 +301,29 @@ def loop(self):
@property
async def client(self):
if self._client is None:
- self._client = await self.run(tiled_client)
+ self._client = await run_in_executor(tiled_client)()
return self._client
async def __getitem__(self, uid) -> CatalogScan:
client = await self.client
- container = await self.run(client.__getitem__, uid)
+ container = await run_in_executor(client.__getitem__)(uid)
scan = CatalogScan(container=container, executor=self.executor)
return scan
async def items(self):
client = await self.client
- for key, value in await self.run(client.items):
+ for key, value in await run_in_executor(client.items)():
yield key, CatalogScan(container=value, executor=self.executor)
async def values(self):
client = await self.client
- containers = await self.run(client.values)
+ containers = await run_in_executor(client.values)()
for container in containers:
yield CatalogScan(container, executor=self.executor)
async def __len__(self):
client = await self.client
- length = await self.run(client.__len__)
+ length = await run_in_executor(client.__len__)()
return length
async def search(self, query):
diff --git a/src/haven/iconfig_testing.toml b/src/haven/iconfig_testing.toml
index 361a9273..7d611070 100644
--- a/src/haven/iconfig_testing.toml
+++ b/src/haven/iconfig_testing.toml
@@ -4,9 +4,6 @@ area_detector_root_path = "/tmp"
# General name for the beamline, used for metadata.
name = "SPC Beamline (sector unknown)"
-[database.databroker]
-catalog = "bluesky"
-
[xray_source]
type = "undulator"
prefix = "ID255ds:"
@@ -16,13 +13,14 @@ prefix = "255idc:bss"
beamline = "255-ID-C"
-#####################
-# Queueserver
-#####################
+##############
+# Acquisition
+##############
-# This section describes how to connect to the queueserver. It does
-# not generate any devices, but is intended to be read by the Firefly
-# GUI application to determine how to interact with the queue.
+# This section describes how to connect to the queueserver and how
+# queueserver data reaches the database. It does not generate any
+# devices, but is intended to be read by the Firefly GUI application
+# to determine how to interact with the queue.
[queueserver]
kafka_topic = "s255idc_queueserver"
@@ -34,10 +32,13 @@ redis_addr = "localhost:6379"
[database.tiled]
uri = "http://localhost:8337/"
-entry_node = "255id_testing"
-cache_filterpath = "/tmp/tiled_cache/http_response_cache.db"
+default_catalog = "255id_testing"
+cache_filepath = "/tmp/tiled_cache/http_response_cache.db"
api_key = ""
+[database.databroker]
+catalog = "bluesky"
+
#################
# Device support
#################
diff --git a/src/haven/tests/test_catalog.py b/src/haven/tests/test_catalog.py
index 17681fea..d484753e 100644
--- a/src/haven/tests/test_catalog.py
+++ b/src/haven/tests/test_catalog.py
@@ -8,13 +8,13 @@
@pytest.fixture()
def scan(tiled_client):
uid = "7d1daf1d-60c7-4aa7-a668-d1cd97e5335f"
- return CatalogScan(tiled_client[uid])
+ return CatalogScan(tiled_client["255id_testing"][uid])
@pytest.fixture()
def grid_scan(tiled_client):
uid = "85573831-f4b4-4f64-b613-a6007bf03a8d"
- return CatalogScan(tiled_client[uid])
+ return CatalogScan(tiled_client["255id_testing"][uid])
def test_unsnake():
@@ -52,7 +52,7 @@ async def test_load_nd_data(grid_scan):
@pytest.mark.asyncio
async def test_distinct(catalog, tiled_client):
- distinct = tiled_client.distinct("plan_name")
+ distinct = tiled_client["255id_testing"].distinct("plan_name")
assert await catalog.distinct("plan_name") == distinct
@@ -60,7 +60,7 @@ async def test_distinct(catalog, tiled_client):
async def test_search(catalog, tiled_client):
"""Make sure we can query to database properly."""
query = queries.Regex("plan_name", "xafs_scan")
- expected = tiled_client.search(query)
+ expected = tiled_client["255id_testing"].search(query)
response = await catalog.search(query)
assert len(expected) == await response.__len__()
@@ -68,7 +68,7 @@ async def test_search(catalog, tiled_client):
@pytest.mark.asyncio
async def test_values(catalog, tiled_client):
"""Get the individual scans in the catalog."""
- expected = [uid for uid in tiled_client.keys()]
+ expected = [uid for uid in tiled_client["255id_testing"].keys()]
response = [val.uid async for val in catalog.values()]
assert expected == response