From 329c0e2a2344f02771e75bc52f1a95f398e8560c Mon Sep 17 00:00:00 2001 From: yannachen Date: Mon, 16 Dec 2024 09:58:15 -0600 Subject: [PATCH 01/13] Added a tiled writer to the run engine factory. --- src/haven/catalog.py | 12 +++++++----- src/haven/ipython_startup.ipy | 2 +- src/haven/run_engine.py | 28 +++++++++++++++++++++++++++- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/haven/catalog.py b/src/haven/catalog.py index 959ae8d7..e2cfa2d8 100644 --- a/src/haven/catalog.py +++ b/src/haven/catalog.py @@ -172,12 +172,13 @@ def write_safe(self): def tiled_client( - entry_node=None, uri=None, cache_filepath=None, structure_clients="dask" + entry_node=None, uri=None, cache_filepath=None, structure_clients="numpy" ): config = load_config() + tiled_config = config["database"].get("tiled", {}) # Create a cache for saving local copies if cache_filepath is None: - cache_filepath = config["database"].get("tiled", {}).get("cache_filepath", "") + cache_filepath = tiled_config.get("cache_filepath", "") cache_filepath = cache_filepath or None if os.access(cache_filepath, os.W_OK): cache = ThreadSafeCache(filepath=cache_filepath) @@ -186,10 +187,11 @@ def tiled_client( cache = None # Create the client if uri is None: - uri = config["database"]["tiled"]["uri"] - client_ = from_uri(uri, structure_clients) + uri = tiled_config["uri"] + api_key = tiled_config.get("api_key") + client_ = from_uri(uri, structure_clients, api_key=api_key) if entry_node is None: - entry_node = config["database"]["tiled"]["entry_node"] + entry_node = tiled_config["entry_node"] client_ = client_[entry_node] return client_ diff --git a/src/haven/ipython_startup.ipy b/src/haven/ipython_startup.ipy index 080ed397..071de06a 100644 --- a/src/haven/ipython_startup.ipy +++ b/src/haven/ipython_startup.ipy @@ -31,7 +31,7 @@ log = logging.getLogger(__name__) RE = haven.run_engine( connect_databroker=True, call_returns_result=True, - use_bec=True, + use_bec=False, ) # Add metadata to the run engine diff --git a/src/haven/run_engine.py b/src/haven/run_engine.py index 56378323..79047784 100644 --- a/src/haven/run_engine.py +++ b/src/haven/run_engine.py @@ -4,8 +4,10 @@ import IPython from bluesky import RunEngine as BlueskyRunEngine from bluesky.callbacks.best_effort import BestEffortCallback +from bluesky.callbacks.tiled_writer import TiledWriter from bluesky.utils import ProgressBarManager, register_transform +from .catalog import tiled_client from .exceptions import ComponentNotFound from .instrument import beamline from .preprocessors import inject_haven_md_wrapper @@ -27,7 +29,29 @@ def save_data(name, doc): catalog.v1.insert(name, doc) -def run_engine(connect_databroker=True, use_bec=True, **kwargs) -> BlueskyRunEngine: +client = tiled_client() +client.include_data_sources() +tiled_writer = TiledWriter(client) + + +def run_engine( + *, connect_tiled=True, connect_databroker=False, use_bec=False, **kwargs +) -> BlueskyRunEngine: + """Build a bluesky RunEngine() for Haven. + + Parameters + ========== + connect_tiled + The run engine will have a callback for writing to the default + tiled client. + connect_databroker + The run engine will have a callback for writing to the default + databroker catalog. + use_bec + The run engine will have the bluesky BestEffortCallback + subscribed to it. + + """ RE = BlueskyRunEngine(**kwargs) # Add the best-effort callback if use_bec: @@ -57,6 +81,8 @@ def run_engine(connect_databroker=True, use_bec=True, **kwargs) -> BlueskyRunEng # Install databroker connection if connect_databroker: RE.subscribe(save_data) + if connect_tiled: + RE.subscribe(tiled_writer) # Add preprocessors RE.preprocessors.append(inject_haven_md_wrapper) return RE From 8a7bf6c0a37fcf8ddf20aff8588bbff024fbf517 Mon Sep 17 00:00:00 2001 From: yannachen Date: Tue, 17 Dec 2024 20:53:02 -0600 Subject: [PATCH 02/13] Enabled both tiled and databroker in the run engine by default. --- src/haven/run_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/haven/run_engine.py b/src/haven/run_engine.py index 79047784..c3bf358d 100644 --- a/src/haven/run_engine.py +++ b/src/haven/run_engine.py @@ -35,7 +35,7 @@ def save_data(name, doc): def run_engine( - *, connect_tiled=True, connect_databroker=False, use_bec=False, **kwargs + *, connect_tiled=True, connect_databroker=True, use_bec=False, **kwargs ) -> BlueskyRunEngine: """Build a bluesky RunEngine() for Haven. From 70cdc0bed76a37f911fecb97e23c376c9db2bc13 Mon Sep 17 00:00:00 2001 From: yannachen Date: Thu, 19 Dec 2024 13:11:35 -0600 Subject: [PATCH 03/13] Partially updated the run browser support to use the new SQL interface. --- src/firefly/run_browser/client.py | 28 +++++++++++++--------------- src/haven/catalog.py | 27 ++++++++------------------- src/haven/run_engine.py | 12 +++++------- 3 files changed, 26 insertions(+), 41 deletions(-) diff --git a/src/firefly/run_browser/client.py b/src/firefly/run_browser/client.py index 04c55692..3bb92bef 100644 --- a/src/firefly/run_browser/client.py +++ b/src/firefly/run_browser/client.py @@ -28,22 +28,20 @@ async def filtered_nodes(self, filters: Mapping): log.debug(f"Filtering nodes: {filters}") filter_params = [ # (filter_name, query type, metadata key) - ("user", queries.Regex, "proposal_users"), - ("proposal", queries.Regex, "proposal_id"), - ("esaf", queries.Regex, "esaf_id"), - ("sample", queries.Regex, "sample_name"), - # ('exit_status', queries.Regex, "exit_status"), - ("plan", queries.Regex, "plan_name"), - ("edge", queries.Regex, "edge"), + ("user", queries.Contains, "start.proposal_users"), + ("proposal", queries.Eq, "start.proposal_id"), + ("esaf", queries.Eq, "start.esaf_id"), + ("sample", queries.Contains, "start.sample_name"), + ('exit_status', queries.Eq, "stop.exit_status"), + ("plan", queries.Eq, "start.plan_name"), + ("edge", queries.Contains, "start.edge"), ] # Apply filters runs = self.catalog for filter_name, Query, md_name in filter_params: val = filters.get(filter_name, "") if val != "": - runs = await runs.search( - Query(md_name, val, case_sensitive=case_sensitive) - ) + runs = await runs.search(Query(md_name, val)) full_text = filters.get("full_text", "") if full_text != "": runs = await runs.search( @@ -135,7 +133,7 @@ async def signal_names(self, hinted_only: bool = False): if hinted_only: xsig, ysig = await run.hints() else: - df = await run.to_dataframe() + df = await run.data() xsig = ysig = df.columns xsignals.extend(xsig) ysignals.extend(ysig) @@ -176,8 +174,8 @@ async def images(self, signal): images[run.uid] = image return images - async def all_signals(self, hinted_only=False): - """Produce dataframe with all signals for each run. + async def all_signals(self, hinted_only=False) -> dict: + """Produce dataframes with all signals for each run. The keys of the dictionary are the labels for each curve, and the corresponding value is a pandas dataframe with the scan data. @@ -188,7 +186,7 @@ async def all_signals(self, hinted_only=False): dfs = OrderedDict() for run in self.selected_runs: # Get data from the database - df = await run.to_dataframe(signals=xsignals + ysignals) + df = await run.data(signals=xsignals + ysignals) dfs[run.uid] = df return dfs @@ -236,7 +234,7 @@ async def signals( if uids is not None and run.uid not in uids: break # Get data from the database - df = await run.to_dataframe(signals=signals) + df = await run.data(signals=signals) # Check for missing signals missing_x = x_signal not in df.columns and df.index.name != x_signal missing_y = y_signal not in df.columns diff --git a/src/haven/catalog.py b/src/haven/catalog.py index e2cfa2d8..a85cb258 100644 --- a/src/haven/catalog.py +++ b/src/haven/catalog.py @@ -208,7 +208,7 @@ def __init__(self, container, executor=None): self.container = container self.executor = executor - def _read_data(self, signals, dataset="primary/data"): + def _read_data(self, signals, dataset="primary/internal/events"): # Fetch data if needed data = self.container[dataset] return data.read(signals) @@ -234,31 +234,20 @@ async def export(self, filename: str, format: str): def formats(self): return self.container.formats - async def data(self, stream="primary"): + async def data(self, signals=None, stream="primary"): return await self.loop.run_in_executor( - None, self._read_data, None, f"{stream}/data" + None, self._read_data, signals, f"{stream}/internal/events/" ) - async def to_dataframe(self, signals=None): - """Convert the dataset into a pandas dataframe.""" - xarray = await self.run(self._read_data, signals) - if len(xarray) > 0: - df = xarray.to_dataframe() - # Add a copy of the index to the dataframe itself - if df.index.name is not None: - df[df.index.name] = df.index - else: - df = pd.DataFrame() - return df - @property def loop(self): return asyncio.get_running_loop() + def _data_keys(self, stream): + return self.container[stream]['internal/events'].columns + async def data_keys(self, stream="primary"): - stream_md = await self.loop.run_in_executor(None, self._read_metadata, stream) - # Assumes the 0-th descriptor is for the primary stream - return stream_md["descriptors"][0]["data_keys"] + return await self.run(self._data_keys, ("primary",)) async def hints(self): """Retrieve the data hints for this scan. @@ -279,7 +268,7 @@ async def hints(self): # Get hints for the dependent (X) dependent = [] primary_metadata = await self.run(self._read_metadata, "primary") - hints = primary_metadata["descriptors"][0]["hints"] + hints = primary_metadata["hints"] for device, dev_hints in hints.items(): dependent.extend(dev_hints["fields"]) return independent, dependent diff --git a/src/haven/run_engine.py b/src/haven/run_engine.py index c3bf358d..dc6ebbf5 100644 --- a/src/haven/run_engine.py +++ b/src/haven/run_engine.py @@ -18,7 +18,7 @@ catalog = None -def save_data(name, doc): +def save_to_databroker(name, doc): # This is a hack around a problem with garbage collection # Has been fixed in main, maybe released in databroker v2? # Create the databroker callback if necessary @@ -29,11 +29,6 @@ def save_data(name, doc): catalog.v1.insert(name, doc) -client = tiled_client() -client.include_data_sources() -tiled_writer = TiledWriter(client) - - def run_engine( *, connect_tiled=True, connect_databroker=True, use_bec=False, **kwargs ) -> BlueskyRunEngine: @@ -80,8 +75,11 @@ def run_engine( register_transform("RE", prefix="<", ip=ip) # Install databroker connection if connect_databroker: - RE.subscribe(save_data) + RE.subscribe(save_to_databroker) if connect_tiled: + client = tiled_client() + client.include_data_sources() + tiled_writer = TiledWriter(client) RE.subscribe(tiled_writer) # Add preprocessors RE.preprocessors.append(inject_haven_md_wrapper) From f581a5e12a5dcc27914a47a32e734b82baa2a402 Mon Sep 17 00:00:00 2001 From: yannachen Date: Fri, 20 Dec 2024 11:54:45 -0600 Subject: [PATCH 04/13] Updated bluesky minimum version to 1.13.1rc1 to get proper consolidator support for ADs. --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 325f1dc2..e8de987b 100644 --- a/environment.yml +++ b/environment.yml @@ -57,7 +57,7 @@ dependencies: - bluesky-queueserver-api - bluesky-widgets - bluesky-adaptive - - bluesky >=1.8.1 + - bluesky >=1.13.1rc1 - ophyd >=1.6.3 - ophyd-async >=0.9.0a1 - apstools == 1.6.20 # Leave at 1.6.20 until this is fixed: https://github.com/BCDA-APS/apstools/issues/1022 From 7a117cf754a2486a4cc1ae0126813dc408e6b713 Mon Sep 17 00:00:00 2001 From: yannachen Date: Mon, 6 Jan 2025 10:19:00 -0600 Subject: [PATCH 05/13] Updated the run browser to work properly with the new database. Developed on the beamline, tests fail. --- src/firefly/run_browser/client.py | 5 ++--- src/firefly/run_browser/display.py | 25 ++++++++++++++----------- src/firefly/run_browser/widgets.py | 2 +- src/haven/catalog.py | 5 +++++ src/haven/devices/labjack.py | 2 +- src/haven/devices/motor.py | 3 --- src/haven/preprocessors.py | 2 +- 7 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/firefly/run_browser/client.py b/src/firefly/run_browser/client.py index 3bb92bef..2a99ea6a 100644 --- a/src/firefly/run_browser/client.py +++ b/src/firefly/run_browser/client.py @@ -154,7 +154,6 @@ async def metadata(self): async def load_selected_runs(self, uids): # Prepare the query for finding the runs uids = list(dict.fromkeys(uids)) - print(f"Loading runs: {uids}") # Retrieve runs from the database runs = [await self.catalog[uid] for uid in uids] # runs = await asyncio.gather(*run_coros) @@ -168,8 +167,8 @@ async def images(self, signal): # Load datasets from the database try: image = await run[signal] - except KeyError: - log.warning(f"Signal {signal} not found in run {run}.") + except KeyError as exc: + log.exception(exc) else: images[run.uid] = image return images diff --git a/src/firefly/run_browser/display.py b/src/firefly/run_browser/display.py index cf6fb1de..4ea39ab1 100644 --- a/src/firefly/run_browser/display.py +++ b/src/firefly/run_browser/display.py @@ -180,10 +180,12 @@ def customize_ui(self): self.ui.plot_1d_hints_checkbox.stateChanged.connect(self.update_1d_signals) self.ui.autorange_1d_button.clicked.connect(self.auto_range) # Respond to changes in displaying the 2d plot - self.ui.plot_multi_hints_checkbox.stateChanged.connect( - self.update_multi_signals - ) - self.ui.plot_multi_hints_checkbox.stateChanged.connect(self.update_multi_plot) + for signal in [ + self.ui.plot_multi_hints_checkbox.stateChanged, + self.ui.multi_signal_x_combobox.currentTextChanged, + ]: + signal.connect(self.update_multi_signals) + signal.connect(self.update_multi_plot) # Respond to changes in displaying the 2d plot self.ui.signal_value_combobox.currentTextChanged.connect(self.update_2d_plot) self.ui.logarithm_checkbox_2d.stateChanged.connect(self.update_2d_plot) @@ -288,10 +290,12 @@ async def update_multi_signals(self, *args): ) xcols, ycols = await signals_task # Update the comboboxes with new signals - combobox.clear() - combobox.addItems(xcols) - # Restore previous value - combobox.setCurrentText(old_value) + old_cols = [combobox.itemText(idx) for idx in range(combobox.count())] + if xcols != old_cols: + combobox.clear() + combobox.addItems(xcols) + # Restore previous value + combobox.setCurrentText(old_value) @asyncSlot() @cancellable @@ -376,7 +380,7 @@ async def export_runs(self): @asyncSlot(str) @cancellable async def update_running_scan(self, uid: str): - print(f"Updating running scan: {uid=}") + log.debug(f"Updating running scan: {uid=}") await self.update_1d_plot(uids=[uid]) @asyncSlot() @@ -432,7 +436,6 @@ async def update_1d_plot(self, *args, uids: Sequence[str] = None): if use_grad: ylabel = f"∇ {ylabel}" # Do the plotting - print("RUNNING", self.ui.plot_1d_view.plot_runs) self.ui.plot_1d_view.plot_runs(runs, xlabel=xlabel, ylabel=ylabel) if self.ui.autorange_1d_checkbox.isChecked(): self.ui.plot_1d_view.autoRange() @@ -448,7 +451,7 @@ async def update_2d_plot(self): use_grad = self.ui.gradient_checkbox_2d.isChecked() images = await self.db_task(self.db.images(value_signal), "2D plot") # Get axis labels - # Eventually this will be replaced with robus choices for plotting multiple images + # Eventually this will be replaced with robust choices for plotting multiple images metadata = await self.db_task(self.db.metadata(), "2D plot") metadata = list(metadata.values())[0] dimensions = metadata["start"]["hints"]["dimensions"] diff --git a/src/firefly/run_browser/widgets.py b/src/firefly/run_browser/widgets.py index 66ad4356..ffbc0e93 100644 --- a/src/firefly/run_browser/widgets.py +++ b/src/firefly/run_browser/widgets.py @@ -101,6 +101,7 @@ def plot_runs(self, runs: Mapping, xsignal: str): ysignals = [] for run in runs.values(): ysignals.extend(run.columns) + print(xsignal, ysignals) # Remove the x-signal from the list of y signals ysignals = sorted(list(dict.fromkeys(ysignals))) # Plot the runs @@ -172,7 +173,6 @@ def plot_runs(self, runs: Mapping, ylabel="", xlabel=""): ) # Cursor to drag around on the data if self.cursor_line is None: - print("CURSOR LINE: ", np.median(series.index), series.index) self.cursor_line = plot_item.addLine( x=np.median(series.index), movable=True, label="{value:.3f}" ) diff --git a/src/haven/catalog.py b/src/haven/catalog.py index a85cb258..4573e9ea 100644 --- a/src/haven/catalog.py +++ b/src/haven/catalog.py @@ -211,6 +211,11 @@ def __init__(self, container, executor=None): def _read_data(self, signals, dataset="primary/internal/events"): # Fetch data if needed data = self.container[dataset] + try: + # Remove duplicates + signals = list(set(signals)) + except TypeError: + pass return data.read(signals) def _read_metadata(self, keys=None): diff --git a/src/haven/devices/labjack.py b/src/haven/devices/labjack.py index 9fbfd6a0..d4e83ee9 100644 --- a/src/haven/devices/labjack.py +++ b/src/haven/devices/labjack.py @@ -495,7 +495,7 @@ def __init__( self.ljm_version = epics_signal_r(str, f"{prefix}LJMVersion") self.driver_version = epics_signal_r(str, f"{prefix}DriverVersion") self.last_error_message = epics_signal_r( - Array1D[np.uint8], f"{prefix}LastErrorMessage" + str, f"{prefix}LastErrorMessage" ) self.poll_sleep_ms = epics_signal_rw(float, f"{prefix}PollSleepMS") self.analog_in_settling_time_all = epics_signal_rw( diff --git a/src/haven/devices/motor.py b/src/haven/devices/motor.py index b2813a64..64198476 100644 --- a/src/haven/devices/motor.py +++ b/src/haven/devices/motor.py @@ -63,9 +63,6 @@ def __init__( self.soft_limit_violation = epics_signal_r(int, f"{prefix}.LVIO") # Load all the parent signals super().__init__(prefix=prefix, name=name) - # Override the motor stop signal to use the right trigger value - self.motor_stop = epics_signal_xval(f"{prefix}.STOP") - self.set_name(self.name) async def connect( self, diff --git a/src/haven/preprocessors.py b/src/haven/preprocessors.py index f3eff498..1acbf493 100644 --- a/src/haven/preprocessors.py +++ b/src/haven/preprocessors.py @@ -88,7 +88,7 @@ def _inject_md(msg): "EPICS_CA_MAX_ARRAY_BYTES": os.environ.get("EPICS_CA_MAX_ARRAY_BYTES"), # Facility "beamline_id": config["beamline"]["name"], - "facility_id": ", ".join(cfg["name"] for cfg in config["synchrotron"]), + "facility_id": ", ".join(cfg["name"] for cfg in config.get("synchrotron", [])), "xray_source": config["xray_source"]["type"], # Computer "login_id": f"{getpass.getuser()}@{socket.gethostname()}", From b5a56f5ec3533007d753c650c85a27617ed35348 Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Mon, 6 Jan 2025 11:59:00 -0600 Subject: [PATCH 06/13] Updated tests (and a few code spots) to match the new relational database structure. --- src/conftest.py | 55 +++++++++++-------- src/firefly/run_browser/tests/test_display.py | 3 +- src/haven/catalog.py | 18 +++--- src/haven/motor_position.py | 11 ++-- src/haven/tests/test_catalog.py | 9 --- src/haven/tests/test_motor.py | 9 --- src/haven/tests/test_preprocessors.py | 17 +++--- src/haven/tests/test_run_engine.py | 6 +- src/haven/tests/test_save_motor_positions.py | 35 +++++++----- 9 files changed, 81 insertions(+), 82 deletions(-) diff --git a/src/conftest.py b/src/conftest.py index 11769fb6..30ddf915 100644 --- a/src/conftest.py +++ b/src/conftest.py @@ -11,6 +11,7 @@ from ophyd.sim import instantiate_fake_device, make_fake_device from tiled.adapters.mapping import MapAdapter from tiled.adapters.xarray import DatasetAdapter +from tiled.adapters.table import TableAdapter from tiled.client import Context, from_context from tiled.server.app import build_app @@ -260,10 +261,11 @@ def filters(sim_registry): run1 = pd.DataFrame( { "energy_energy": np.linspace(8300, 8400, num=100), + "energy_id_energy_readback": np.linspace(8.3, 8.4, num=100), "It_net_counts": np.abs(np.sin(np.linspace(0, 4 * np.pi, num=100))), "I0_net_counts": np.linspace(1, 2, num=100), } -).to_xarray() +) grid_scan = pd.DataFrame( { @@ -272,7 +274,7 @@ def filters(sim_registry): "aerotech_horiz": np.linspace(0, 104, num=105), "aerotech_vert": np.linspace(0, 104, num=105), } -).to_xarray() +) hints = { "energy": {"fields": ["energy_energy", "energy_id_energy_readback"]}, @@ -283,9 +285,13 @@ def filters(sim_registry): { "primary": MapAdapter( { - "data": DatasetAdapter.from_dataset(run1), + "internal": MapAdapter( + { + "events": TableAdapter.from_pandas(run1), + } + ), }, - metadata={"descriptors": [{"hints": hints}]}, + metadata={"hints": hints}, ), }, metadata={ @@ -301,12 +307,17 @@ def filters(sim_registry): { "primary": MapAdapter( { - "data": DatasetAdapter.from_dataset(run1), + "internal": MapAdapter( + { + "events": TableAdapter.from_pandas(run1), + } + ), }, - metadata={"descriptors": [{"hints": hints}]}, + metadata={"hints": hints}, ), }, metadata={ + "plan_name": "rel_scan", "start": { "plan_name": "rel_scan", "uid": "9d33bf66-9701-4ee3-90f4-3be730bc226c", @@ -319,24 +330,24 @@ def filters(sim_registry): { "primary": MapAdapter( { - "data": DatasetAdapter.from_dataset(grid_scan), + "internal": MapAdapter( + { + "events": TableAdapter.from_pandas(grid_scan), + }, + ), }, metadata={ - "descriptors": [ - { - "hints": { - "Ipreslit": {"fields": ["Ipreslit_net_counts"]}, - "CdnIPreKb": {"fields": ["CdnIPreKb_net_counts"]}, - "I0": {"fields": ["I0_net_counts"]}, - "CdnIt": {"fields": ["CdnIt_net_counts"]}, - "aerotech_vert": {"fields": ["aerotech_vert"]}, - "aerotech_horiz": {"fields": ["aerotech_horiz"]}, - "Ipre_KB": {"fields": ["Ipre_KB_net_counts"]}, - "CdnI0": {"fields": ["CdnI0_net_counts"]}, - "It": {"fields": ["It_net_counts"]}, - } - } - ] + "hints": { + "Ipreslit": {"fields": ["Ipreslit_net_counts"]}, + "CdnIPreKb": {"fields": ["CdnIPreKb_net_counts"]}, + "I0": {"fields": ["I0_net_counts"]}, + "CdnIt": {"fields": ["CdnIt_net_counts"]}, + "aerotech_vert": {"fields": ["aerotech_vert"]}, + "aerotech_horiz": {"fields": ["aerotech_horiz"]}, + "Ipre_KB": {"fields": ["Ipre_KB_net_counts"]}, + "CdnI0": {"fields": ["CdnI0_net_counts"]}, + "It": {"fields": ["It_net_counts"]}, + }, }, ), }, diff --git a/src/firefly/run_browser/tests/test_display.py b/src/firefly/run_browser/tests/test_display.py index 429c801f..d8da645f 100644 --- a/src/firefly/run_browser/tests/test_display.py +++ b/src/firefly/run_browser/tests/test_display.py @@ -30,7 +30,8 @@ async def display(qtbot, catalog, mocker): run = [run async for run in catalog.values()][0] display.db.selected_runs = [run] await display.update_1d_signals() - run_data = await run.to_dataframe() + print(run.uid) + run_data = await run.data() expected_xdata = run_data.energy_energy expected_ydata = np.log(run_data.I0_net_counts / run_data.It_net_counts) expected_ydata = np.gradient(expected_ydata, expected_xdata) diff --git a/src/haven/catalog.py b/src/haven/catalog.py index 4573e9ea..2d2b2375 100644 --- a/src/haven/catalog.py +++ b/src/haven/catalog.py @@ -3,6 +3,7 @@ import os import sqlite3 import threading +from typing import Sequence import warnings from concurrent.futures import ThreadPoolExecutor from functools import partial @@ -179,7 +180,6 @@ def tiled_client( # Create a cache for saving local copies if cache_filepath is None: cache_filepath = tiled_config.get("cache_filepath", "") - cache_filepath = cache_filepath or None if os.access(cache_filepath, os.W_OK): cache = ThreadSafeCache(filepath=cache_filepath) else: @@ -208,15 +208,15 @@ def __init__(self, container, executor=None): self.container = container self.executor = executor - def _read_data(self, signals, dataset="primary/internal/events"): - # Fetch data if needed + def _read_data(self, signals: Sequence | None, dataset: str="primary/internal/events"): data = self.container[dataset] - try: - # Remove duplicates - signals = list(set(signals)) - except TypeError: - pass - return data.read(signals) + if signals is None: + return data.read() + # Remove duplicates and missing signals + signals = set(signals) + available_signals = set(data.columns) + signals = signals & available_signals + return data.read() def _read_metadata(self, keys=None): container = self.container diff --git a/src/haven/motor_position.py b/src/haven/motor_position.py index b77dad75..4236f090 100644 --- a/src/haven/motor_position.py +++ b/src/haven/motor_position.py @@ -41,16 +41,15 @@ class MotorPosition(BaseModel): @classmethod def _load(Cls, run_md, data_keys, data): - """Common routines for synch and async loading.""" + """Common routines for sync and async loading.""" if run_md["start"]["plan_name"] != "save_motor_position": raise ValueError(f"Run {run_md['start']['uid']} is not a motor position.") # Extract motor positions from the run motor_axes = [] - for data_key in data_keys.values(): - mname = data_key["object_name"] + for axis_name in data_keys: axis = MotorAxis( - name=mname, - readback=data[mname][0], + name=axis_name, + readback=data[axis_name][0], ) motor_axes.append(axis) # Create the motor position object @@ -68,7 +67,7 @@ def load(Cls, run): run_md=run.metadata, # Assumes the 0-th descriptor is for the primary stream data_keys=run["primary"].metadata["descriptors"][0]["data_keys"], - data=run["primary"]["data"].read(), + data=run["primary/internal/events"].read(), ) @classmethod diff --git a/src/haven/tests/test_catalog.py b/src/haven/tests/test_catalog.py index f6918952..529b7ab5 100644 --- a/src/haven/tests/test_catalog.py +++ b/src/haven/tests/test_catalog.py @@ -43,15 +43,6 @@ async def test_load_scan(catalog): assert isinstance(scan, CatalogScan) -@pytest.mark.asyncio -async def test_dataframe(scan): - """Check that the catalogscan can produce a pandas dataframe.""" - df = await scan.to_dataframe() - assert isinstance(df, pd.DataFrame) - # Check that the index is added as a column - assert df.index.name in df.columns - - @pytest.mark.asyncio async def test_load_nd_data(grid_scan): """Check that the catalog scan can convert e.g. grid_scan results.""" diff --git a/src/haven/tests/test_motor.py b/src/haven/tests/test_motor.py index 935585b2..3b87a1d9 100644 --- a/src/haven/tests/test_motor.py +++ b/src/haven/tests/test_motor.py @@ -60,15 +60,6 @@ def test_motor_flyer(motor): assert isinstance(motor, Flyable) -async def test_stop_button(motor): - # Check that it got set up properly during __init__ - assert motor.motor_stop.name == "motor_1-motor_stop" - assert motor.motor_stop.parent is motor - await motor.motor_stop.trigger() - mock = get_mock_put(motor.motor_stop) - mock.assert_called_once_with(1, wait=True) - - @pytest.mark.asyncio async def test_auto_naming_default(monkeypatch): motor = AsyncMotor(prefix="255idVME:m1") diff --git a/src/haven/tests/test_preprocessors.py b/src/haven/tests/test_preprocessors.py index 6ade7be4..0337825a 100644 --- a/src/haven/tests/test_preprocessors.py +++ b/src/haven/tests/test_preprocessors.py @@ -11,12 +11,16 @@ from haven.preprocessors import shutter_suspend_decorator, shutter_suspend_wrapper +@pytest.fixture() +def RE(): + return run_engine(use_bec=False, connect_databroker=False, connect_tiled=False) + + @pytest.mark.xfail -def test_shutter_suspend_wrapper(aps, shutters, sim_registry): +def test_shutter_suspend_wrapper(aps, shutters, sim_registry, RE): # Check that the run engine does not have any shutter suspenders # Currently this test is fragile since we might add non-shutter # suspenders in the future. - RE = run_engine(use_bec=False, connect_databroker=False) assert len(RE.suspenders) == 1 # Check that the shutter suspenders get added plan = bp.count([det]) @@ -41,7 +45,7 @@ def test_shutter_suspend_wrapper(aps, shutters, sim_registry): assert len(unsub_msgs) == 2 -def test_baseline_wrapper(sim_registry, aps, event_loop): +def test_baseline_wrapper(sim_registry, aps, event_loop, RE): # Create a test device motor_baseline = SynAxis(name="baseline_motor", labels={"motors", "baseline"}) sim_registry.register(motor_baseline) @@ -51,7 +55,6 @@ def test_baseline_wrapper(sim_registry, aps, event_loop): cb.descriptor = MagicMock() cb.event = MagicMock() cb.stop = MagicMock() - RE = run_engine(use_bec=False, connect_databroker=False) plan = bp.count([det], num=1) plan = baseline_wrapper(plan, devices="baseline") RE(plan, cb) @@ -64,7 +67,7 @@ def test_baseline_wrapper(sim_registry, aps, event_loop): assert "baseline_motor" in baseline_doc["data_keys"].keys() -def test_baseline_decorator(sim_registry, aps): +def test_baseline_decorator(sim_registry, aps, RE): """Similar to baseline wrapper test, but used as a decorator.""" # Create the decorated function before anything else func = baseline_decorator(devices="motors")(bp.count) @@ -77,7 +80,6 @@ def test_baseline_decorator(sim_registry, aps): cb.descriptor = MagicMock() cb.event = MagicMock() cb.stop = MagicMock() - RE = run_engine(use_bec=False, connect_databroker=False) plan = func([det], num=1) RE(plan, cb) # Check that the callback has the baseline stream inserted @@ -89,7 +91,7 @@ def test_baseline_decorator(sim_registry, aps): assert "baseline_motor" in baseline_doc["data_keys"].keys() -def test_metadata(sim_registry, aps, monkeypatch): +def test_metadata(sim_registry, aps, monkeypatch, RE): """Similar to baseline wrapper test, but used as a decorator.""" # Load devices bss = instantiate_fake_device(BSS, name="bss", prefix="255id:bss:") @@ -116,7 +118,6 @@ def test_metadata(sim_registry, aps, monkeypatch): cb.descriptor = MagicMock() cb.event = MagicMock() cb.stop = MagicMock() - RE = run_engine(use_bec=False, connect_databroker=False) plan = bp.count([det], num=1) RE(plan, cb) # Check that the callback has the correct metadata diff --git a/src/haven/tests/test_run_engine.py b/src/haven/tests/test_run_engine.py index 2a0f03f2..88e5b328 100644 --- a/src/haven/tests/test_run_engine.py +++ b/src/haven/tests/test_run_engine.py @@ -23,19 +23,19 @@ def test_subscribers_garbage_collection(monkeypatch, aps): """ monkeypatch.setattr(databroker, "catalog", {"bluesky": databroker.temp()}) - RE = run_engine(use_bec=False) + RE = run_engine(use_bec=False, connect_tiled=False) assert len(RE.dispatcher.cb_registry.callbacks) == 12 gc.collect() assert len(RE.dispatcher.cb_registry.callbacks) == 12 def test_run_engine_preprocessors(aps): - RE = run_engine(use_bec=False) + RE = run_engine(use_bec=False, connect_databroker=False, connect_tiled=False) assert len(RE.preprocessors) > 0 def test_run_engine_created(aps): - RE = run_engine(use_bec=False) + RE = run_engine(use_bec=False, connect_databroker=False, connect_tiled=False) assert isinstance(RE, RunEngine) diff --git a/src/haven/tests/test_save_motor_positions.py b/src/haven/tests/test_save_motor_positions.py index 22f5b40f..93dc15b7 100644 --- a/src/haven/tests/test_save_motor_positions.py +++ b/src/haven/tests/test_save_motor_positions.py @@ -9,6 +9,7 @@ import time_machine from ophyd_async.testing import set_mock_value from tiled.adapters.mapping import MapAdapter +from tiled.adapters.table import TableAdapter from tiled.adapters.xarray import DatasetAdapter from tiled.client import Context, from_context from tiled.server.app import build_app @@ -34,14 +35,16 @@ { "primary": MapAdapter( { - "data": DatasetAdapter.from_dataset( - pd.DataFrame( - { - "motor_A": [12.0], - "motor_B": [-113.25], - } - ).to_xarray() - ), + "internal": MapAdapter({ + "events": TableAdapter.from_pandas( + pd.DataFrame( + { + "motor_A": [12.0], + "motor_B": [-113.25], + } + ) + ), + }), }, metadata={ "descriptors": [ @@ -73,13 +76,15 @@ { "primary": MapAdapter( { - "data": DatasetAdapter.from_dataset( - pd.DataFrame( - { - "motorC": [11250.0], - } - ).to_xarray() - ), + "internal": MapAdapter({ + "events": TableAdapter.from_pandas( + pd.DataFrame( + { + "motorC": [11250.0], + } + ) + ), + }), }, metadata={ "descriptors": [ From 348b727a791d7c293cd81db26e6476112d7bd89e Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Mon, 6 Jan 2025 12:07:24 -0600 Subject: [PATCH 07/13] Linting (isort, flake8, black) and removed stray print statements. --- src/conftest.py | 3 +- src/firefly/run_browser/client.py | 2 +- src/firefly/run_browser/display.py | 4 +- src/haven/catalog.py | 9 ++-- src/haven/devices/labjack.py | 4 +- src/haven/devices/motor.py | 1 - src/haven/plans/fly.py | 1 - src/haven/plans/record_dark_current.py | 1 - src/haven/plans/robot_transfer_sample.py | 1 - src/haven/preprocessors.py | 4 +- src/haven/tests/test_catalog.py | 1 - src/haven/tests/test_iconfig.py | 1 - src/haven/tests/test_labjack.py | 5 --- src/haven/tests/test_motor.py | 1 - .../tests/test_record_dark_current_plan.py | 3 -- src/haven/tests/test_save_motor_positions.py | 43 ++++++++++--------- src/haven/tests/test_scaler.py | 3 -- 17 files changed, 36 insertions(+), 51 deletions(-) diff --git a/src/conftest.py b/src/conftest.py index 30ddf915..88acf19f 100644 --- a/src/conftest.py +++ b/src/conftest.py @@ -10,7 +10,6 @@ from ophyd import Kind from ophyd.sim import instantiate_fake_device, make_fake_device from tiled.adapters.mapping import MapAdapter -from tiled.adapters.xarray import DatasetAdapter from tiled.adapters.table import TableAdapter from tiled.client import Context, from_context from tiled.server.app import build_app @@ -322,7 +321,7 @@ def filters(sim_registry): "plan_name": "rel_scan", "uid": "9d33bf66-9701-4ee3-90f4-3be730bc226c", "hints": {"dimensions": [[["pitch2"], "primary"]]}, - } + }, }, ), # 2D grid scan map data diff --git a/src/firefly/run_browser/client.py b/src/firefly/run_browser/client.py index 2a99ea6a..fb579258 100644 --- a/src/firefly/run_browser/client.py +++ b/src/firefly/run_browser/client.py @@ -32,7 +32,7 @@ async def filtered_nodes(self, filters: Mapping): ("proposal", queries.Eq, "start.proposal_id"), ("esaf", queries.Eq, "start.esaf_id"), ("sample", queries.Contains, "start.sample_name"), - ('exit_status', queries.Eq, "stop.exit_status"), + ("exit_status", queries.Eq, "stop.exit_status"), ("plan", queries.Eq, "start.plan_name"), ("edge", queries.Contains, "start.edge"), ] diff --git a/src/firefly/run_browser/display.py b/src/firefly/run_browser/display.py index 4ea39ab1..039e026e 100644 --- a/src/firefly/run_browser/display.py +++ b/src/firefly/run_browser/display.py @@ -181,8 +181,8 @@ def customize_ui(self): self.ui.autorange_1d_button.clicked.connect(self.auto_range) # Respond to changes in displaying the 2d plot for signal in [ - self.ui.plot_multi_hints_checkbox.stateChanged, - self.ui.multi_signal_x_combobox.currentTextChanged, + self.ui.plot_multi_hints_checkbox.stateChanged, + self.ui.multi_signal_x_combobox.currentTextChanged, ]: signal.connect(self.update_multi_signals) signal.connect(self.update_multi_plot) diff --git a/src/haven/catalog.py b/src/haven/catalog.py index 2d2b2375..7ec4185a 100644 --- a/src/haven/catalog.py +++ b/src/haven/catalog.py @@ -3,14 +3,13 @@ import os import sqlite3 import threading -from typing import Sequence import warnings from concurrent.futures import ThreadPoolExecutor from functools import partial +from typing import Sequence import databroker import numpy as np -import pandas as pd from tiled.client import from_uri from tiled.client.cache import Cache @@ -208,7 +207,9 @@ def __init__(self, container, executor=None): self.container = container self.executor = executor - def _read_data(self, signals: Sequence | None, dataset: str="primary/internal/events"): + def _read_data( + self, signals: Sequence | None, dataset: str = "primary/internal/events" + ): data = self.container[dataset] if signals is None: return data.read() @@ -249,7 +250,7 @@ def loop(self): return asyncio.get_running_loop() def _data_keys(self, stream): - return self.container[stream]['internal/events'].columns + return self.container[stream]["internal/events"].columns async def data_keys(self, stream="primary"): return await self.run(self._data_keys, ("primary",)) diff --git a/src/haven/devices/labjack.py b/src/haven/devices/labjack.py index d4e83ee9..ae4284d4 100644 --- a/src/haven/devices/labjack.py +++ b/src/haven/devices/labjack.py @@ -494,9 +494,7 @@ def __init__( ) self.ljm_version = epics_signal_r(str, f"{prefix}LJMVersion") self.driver_version = epics_signal_r(str, f"{prefix}DriverVersion") - self.last_error_message = epics_signal_r( - str, f"{prefix}LastErrorMessage" - ) + self.last_error_message = epics_signal_r(str, f"{prefix}LastErrorMessage") self.poll_sleep_ms = epics_signal_rw(float, f"{prefix}PollSleepMS") self.analog_in_settling_time_all = epics_signal_rw( float, f"{prefix}AiAllSettlingUS" diff --git a/src/haven/devices/motor.py b/src/haven/devices/motor.py index 64198476..ba39b7db 100644 --- a/src/haven/devices/motor.py +++ b/src/haven/devices/motor.py @@ -14,7 +14,6 @@ from ophydregistry import Registry from .motor_flyer import MotorFlyer -from .signal import epics_signal_xval log = logging.getLogger(__name__) diff --git a/src/haven/plans/fly.py b/src/haven/plans/fly.py index 6a3a7b20..0580ef77 100644 --- a/src/haven/plans/fly.py +++ b/src/haven/plans/fly.py @@ -154,7 +154,6 @@ def fly_line_scan(detectors: list, *args, num, dwell_time): flyers = [*motors, *detectors] flyers = [flyer for flyer in flyers if isinstance(flyer, EventPageCollectable)] for flyer_ in flyers: - print(f"Collecting from {flyer_}") yield from bps.collect(flyer_) diff --git a/src/haven/plans/record_dark_current.py b/src/haven/plans/record_dark_current.py index 06cffdb1..e790cbf5 100644 --- a/src/haven/plans/record_dark_current.py +++ b/src/haven/plans/record_dark_current.py @@ -15,7 +15,6 @@ def count_is_complete(*, old_value, value, **kwargs): was_running = old_value == 1 is_running_now = value == 1 is_done = was_running and not is_running_now - print(is_done) return is_done diff --git a/src/haven/plans/robot_transfer_sample.py b/src/haven/plans/robot_transfer_sample.py index 03145582..7fe1bc0b 100644 --- a/src/haven/plans/robot_transfer_sample.py +++ b/src/haven/plans/robot_transfer_sample.py @@ -75,7 +75,6 @@ def robot_transfer_sample(robot, sampleN, *args): sample = getattr( robot.samples, f"sample{sampleN}" ) # Access the Sample device corresponding to sampleN - print(sample.load) yield from bps.mv(sample.load, ON) # Assuming '1' initiates the loading action # Return to the initial position diff --git a/src/haven/preprocessors.py b/src/haven/preprocessors.py index 1acbf493..d5c3c18f 100644 --- a/src/haven/preprocessors.py +++ b/src/haven/preprocessors.py @@ -88,7 +88,9 @@ def _inject_md(msg): "EPICS_CA_MAX_ARRAY_BYTES": os.environ.get("EPICS_CA_MAX_ARRAY_BYTES"), # Facility "beamline_id": config["beamline"]["name"], - "facility_id": ", ".join(cfg["name"] for cfg in config.get("synchrotron", [])), + "facility_id": ", ".join( + cfg["name"] for cfg in config.get("synchrotron", []) + ), "xray_source": config["xray_source"]["type"], # Computer "login_id": f"{getpass.getuser()}@{socket.gethostname()}", diff --git a/src/haven/tests/test_catalog.py b/src/haven/tests/test_catalog.py index 529b7ab5..17681fea 100644 --- a/src/haven/tests/test_catalog.py +++ b/src/haven/tests/test_catalog.py @@ -1,5 +1,4 @@ import numpy as np -import pandas as pd import pytest from tiled import queries diff --git a/src/haven/tests/test_iconfig.py b/src/haven/tests/test_iconfig.py index ae0a84a6..ed11f16f 100644 --- a/src/haven/tests/test_iconfig.py +++ b/src/haven/tests/test_iconfig.py @@ -43,7 +43,6 @@ def test_merging_dicts(): ] test_file = this_dir / "test_iconfig.toml" config = load_config(file_paths=(*default_files, test_file)) - print(config) assert "prefix" in config["area_detector"][0].keys() diff --git a/src/haven/tests/test_labjack.py b/src/haven/tests/test_labjack.py index 4202bf2d..4cd5d8d9 100644 --- a/src/haven/tests/test_labjack.py +++ b/src/haven/tests/test_labjack.py @@ -58,10 +58,6 @@ async def test_base_signals_device(labjack): "labjack-analog_outputs-1-scanning_rate", } desc = await labjack.describe_configuration() - from pprint import pprint - - pprint(await labjack.analog_outputs[0].describe_configuration()) - # pprint(desc) assert set(desc.keys()) == cfg_names @@ -90,7 +86,6 @@ async def test_analog_inputs(LabJackDevice, num_ais): # Check read attrs read_attrs = ["final_value"] description = await device.describe() - print(description) for n in range(num_ais): for attr in read_attrs: full_attr = f"{device.name}-analog_inputs-{n}-{attr}" diff --git a/src/haven/tests/test_motor.py b/src/haven/tests/test_motor.py index 3b87a1d9..4c6e9b07 100644 --- a/src/haven/tests/test_motor.py +++ b/src/haven/tests/test_motor.py @@ -2,7 +2,6 @@ import pytest from bluesky.protocols import Flyable -from ophyd_async.testing import get_mock_put from haven.devices.motor import HavenMotor from haven.devices.motor import Motor as AsyncMotor diff --git a/src/haven/tests/test_record_dark_current_plan.py b/src/haven/tests/test_record_dark_current_plan.py index 7e67f2c9..df7b0605 100644 --- a/src/haven/tests/test_record_dark_current_plan.py +++ b/src/haven/tests/test_record_dark_current_plan.py @@ -20,9 +20,6 @@ def test_shutters_get_reset(shutters, ion_chamber): def test_messages(shutters, ion_chamber): shutter = shutters[0] msgs = list(record_dark_current(ion_chambers=[ion_chamber], shutters=[shutter])) - from pprint import pprint - - pprint(msgs) # Check the shutters get closed trigger_msg = msgs[3] assert trigger_msg.obj is ion_chamber diff --git a/src/haven/tests/test_save_motor_positions.py b/src/haven/tests/test_save_motor_positions.py index 93dc15b7..b830b8a9 100644 --- a/src/haven/tests/test_save_motor_positions.py +++ b/src/haven/tests/test_save_motor_positions.py @@ -35,16 +35,18 @@ { "primary": MapAdapter( { - "internal": MapAdapter({ - "events": TableAdapter.from_pandas( - pd.DataFrame( - { - "motor_A": [12.0], - "motor_B": [-113.25], - } - ) - ), - }), + "internal": MapAdapter( + { + "events": TableAdapter.from_pandas( + pd.DataFrame( + { + "motor_A": [12.0], + "motor_B": [-113.25], + } + ) + ), + } + ), }, metadata={ "descriptors": [ @@ -76,15 +78,17 @@ { "primary": MapAdapter( { - "internal": MapAdapter({ - "events": TableAdapter.from_pandas( - pd.DataFrame( - { - "motorC": [11250.0], - } - ) - ), - }), + "internal": MapAdapter( + { + "events": TableAdapter.from_pandas( + pd.DataFrame( + { + "motorC": [11250.0], + } + ) + ), + } + ), }, metadata={ "descriptors": [ @@ -267,7 +271,6 @@ async def test_get_motor_positions(client): async def test_get_motor_positions_by_name(client): results = get_motor_positions(name=r"^.*good.+itio.+[AB]$", case_sensitive=False) results = [pos async for pos in results] - print([r.name for r in results]) assert len(results) == 2 # Check the motor position details motorA, motorB = results diff --git a/src/haven/tests/test_scaler.py b/src/haven/tests/test_scaler.py index 7dae2c54..135b3078 100644 --- a/src/haven/tests/test_scaler.py +++ b/src/haven/tests/test_scaler.py @@ -51,9 +51,6 @@ def test_mcs_signals(mcs): async def test_mcs_reading(mcs): await mcs.connect(mock=True) reading = await mcs.read() - from pprint import pprint - - pprint(list(reading.keys())) # Check that the correct readings are included assert mcs.elapsed_time.name in reading assert mcs.current_channel.name in reading From 5b247bbcec60ef47f12180a194e1382a751eb220 Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Mon, 6 Jan 2025 12:47:11 -0600 Subject: [PATCH 08/13] Included tiled writer info in example iconfig.toml. --- src/haven/iconfig_testing.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/haven/iconfig_testing.toml b/src/haven/iconfig_testing.toml index b7f40879..2b9651e1 100644 --- a/src/haven/iconfig_testing.toml +++ b/src/haven/iconfig_testing.toml @@ -35,7 +35,8 @@ redis_addr = "localhost:6379" [database.tiled] uri = "http://localhost:8337/" entry_node = "255id_testing" - +cache_filterpath = "/tmp/tiled_cache/http_response_cache.db" +api_key = "" ################# # Device support From 78ff1f1c8654f6be1133d9f3d2a7620f166deb54 Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Mon, 6 Jan 2025 12:47:31 -0600 Subject: [PATCH 09/13] Added latest bluesky RC to the environment file for CI. --- environment.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 5b27c86f..54a9730a 100644 --- a/environment.yml +++ b/environment.yml @@ -57,7 +57,8 @@ dependencies: - bluesky-queueserver-api - bluesky-widgets - bluesky-adaptive - - bluesky >=1.13.1rc1 + # - bluesky >=1.13.1 + - git+https://github.com/bluesky/bluesky.git@v1.13.1rc1 # Replace with pypi version once released - ophyd >=1.6.3 - ophyd-async >=0.9.0a1 - apstools == 1.6.20 # Leave at 1.6.20 until this is fixed: https://github.com/BCDA-APS/apstools/issues/1022 From a8235951e3270553d05ed422785e853219fd59e7 Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Sun, 5 Jan 2025 00:31:37 -0600 Subject: [PATCH 10/13] Fixed a numpy precision issue in a test. --- src/haven/tests/test_energy_xafs_scan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/haven/tests/test_energy_xafs_scan.py b/src/haven/tests/test_energy_xafs_scan.py index d9a2e8af..3f011736 100644 --- a/src/haven/tests/test_energy_xafs_scan.py +++ b/src/haven/tests/test_energy_xafs_scan.py @@ -183,12 +183,12 @@ def test_exafs_k_range(mono_motor, exposure_motor, I0): real_energies = [ i.args[0] for i in scan_list if i[0] == "set" and i.obj.name == "mono_energy" ] - np.testing.assert_equal(real_energies, expected_energies) + np.testing.assert_almost_equal(real_energies, expected_energies) # Check that the exposure is set correctly real_exposures = [ i.args[0] for i in scan_list if i[0] == "set" and i.obj.name == "exposure" ] - np.testing.assert_equal(real_exposures, expected_exposures) + np.testing.assert_almost_equal(real_exposures, expected_exposures) def test_named_E0(mono_motor, exposure_motor, I0): From 4421103f106c05fbaa1e5020031fb44d1fa137c4 Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Tue, 7 Jan 2025 13:29:52 -0600 Subject: [PATCH 11/13] Added a Tiled consumer for streaming Bluesky docs from Kafka to Tiled writer. --- src/queueserver/tiled_consumer.py | 124 ++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100755 src/queueserver/tiled_consumer.py diff --git a/src/queueserver/tiled_consumer.py b/src/queueserver/tiled_consumer.py new file mode 100755 index 00000000..237be2c5 --- /dev/null +++ b/src/queueserver/tiled_consumer.py @@ -0,0 +1,124 @@ +import os +import sys +import logging +from functools import partial +from typing import Mapping, Sequence + +import msgpack +import msgpack_numpy as mpn +from bluesky.callbacks.tiled_writer import TiledWriter +from bluesky_kafka import RemoteDispatcher, BlueskyConsumer +from tiled.client.base import BaseClient +from tiled.client import from_uri + +import haven + +log = logging.getLogger(__name__) + + +class TiledConsumer(BlueskyConsumer): + """Send Bluesky documents received from a Kafka broker to Tiled for writing. + + There is no default configuration. A reasonable configuration for production is + consumer_config={ + "auto.offset.reset": "latest" + } + + Parameters + ---------- + tiled_client + The top-level Tiled client object to use for writing. + topic_catalog_map + Translates Kafka topic names to Tiled catalogs. Each value + should be the name of a catalog available directly under + *tiled_client*. + bootstrap_servers : str + Kafka server addresses as strings such as + ``[broker1:9092, broker2:9092, 127.0.0.1:9092]`` + group_id : str + Required string identifier for the consumer's Kafka Consumer group. + consumer_config : dict + Override default configuration or specify additional configuration + options to confluent_kafka.Consumer. + polling_duration : float + Time in seconds to wait for a message before running function work_during_wait + in the _poll method. Default is 0.05. + deserializer : function, optional + Function to deserialize data. Default is msgpack.loads. + + """ + def __init__( + self, + tiled_client: BaseClient, + topic_catalog_map: Mapping, + bootstrap_servers: Sequence[str], + group_id: str, + consumer_config: Mapping | None = None, + polling_duration: float | int = 0.05, + deserializer=msgpack.loads, + ): + self.topic_catalog_map = topic_catalog_map + # Create writers for each Tiled catalog + catalog_names = set(topic_catalog_map.values()) + self.writers = {catalog: TiledWriter(tiled_client[catalog]) for catalog in catalog_names} + super().__init__( + topics=list(topic_catalog_map.keys()), + bootstrap_servers=",".join(bootstrap_servers), + group_id=group_id, + consumer_config=consumer_config, + polling_duration=polling_duration, + deserializer=deserializer, + ) + + def process_document(self, topic: str, name: str, doc: Mapping) -> bool: + """Write the Bluesky document to the associated Tiled catalog. + + Parameters + ---------- + topic + the Kafka topic of the message containing name and doc + name + bluesky document name: `start`, `descriptor`, `event`, etc. + doc + bluesky document + + Returns + ------- + continue_polling + return False to break out of the polling loop, return True to continue polling + """ + catalog = self.topic_catalog_map[topic] + log.info(f"Writing {name} doc from {topic=} to {catalog=}.") + writer = self.writers[catalog] + writer(name, doc) + + +def main(): + """Launch the mongo consumer.""" + logging.basicConfig(level=logging.WARNING) + config = haven.load_config() + bootstrap_servers = ["localhost:9092"] + topic_catalog_map = { + "s25idc_queueserver-dev": "haven-dev", + "s25idd_queueserver-dev": "haven-dev", + } + # Create a tiled writer that will write documents to tiled + tiled_uri = config['database']['tiled']['uri'] + tiled_api_key = config['database']['tiled']['api_key'] + client = from_uri(tiled_uri, api_key=tiled_api_key) + client.include_data_sources() + + # Create a MongoConsumer that will listen for new documents. + consumer = TiledConsumer( + tiled_client=client, + topic_catalog_map=topic_catalog_map, + bootstrap_servers=bootstrap_servers, + group_id="tiled_writer", + consumer_config={"auto.offset.reset": "latest"}, + polling_duration=1.0, + ) + consumer.start() + + +if __name__ == "__main__": + sys.exit(main()) From 5df0837093f2611542b30330bff960380d9540fb Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Tue, 7 Jan 2025 14:18:49 -0600 Subject: [PATCH 12/13] Black, isort, and flake8. --- src/queueserver/tiled_consumer.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/queueserver/tiled_consumer.py b/src/queueserver/tiled_consumer.py index 237be2c5..741c8b8f 100755 --- a/src/queueserver/tiled_consumer.py +++ b/src/queueserver/tiled_consumer.py @@ -1,15 +1,12 @@ -import os -import sys import logging -from functools import partial +import sys from typing import Mapping, Sequence import msgpack -import msgpack_numpy as mpn from bluesky.callbacks.tiled_writer import TiledWriter -from bluesky_kafka import RemoteDispatcher, BlueskyConsumer -from tiled.client.base import BaseClient +from bluesky_kafka import BlueskyConsumer from tiled.client import from_uri +from tiled.client.base import BaseClient import haven @@ -47,6 +44,7 @@ class TiledConsumer(BlueskyConsumer): Function to deserialize data. Default is msgpack.loads. """ + def __init__( self, tiled_client: BaseClient, @@ -60,7 +58,9 @@ def __init__( self.topic_catalog_map = topic_catalog_map # Create writers for each Tiled catalog catalog_names = set(topic_catalog_map.values()) - self.writers = {catalog: TiledWriter(tiled_client[catalog]) for catalog in catalog_names} + self.writers = { + catalog: TiledWriter(tiled_client[catalog]) for catalog in catalog_names + } super().__init__( topics=list(topic_catalog_map.keys()), bootstrap_servers=",".join(bootstrap_servers), @@ -103,8 +103,8 @@ def main(): "s25idd_queueserver-dev": "haven-dev", } # Create a tiled writer that will write documents to tiled - tiled_uri = config['database']['tiled']['uri'] - tiled_api_key = config['database']['tiled']['api_key'] + tiled_uri = config["database"]["tiled"]["uri"] + tiled_api_key = config["database"]["tiled"]["api_key"] client = from_uri(tiled_uri, api_key=tiled_api_key) client.include_data_sources() From c04daeefd31863fd3cb227d9f292c5eff6732526 Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Tue, 7 Jan 2025 18:21:05 -0600 Subject: [PATCH 13/13] Cleaned up stray print, and corrected 'mongo' -> 'tiled' in tiled consumer docstrings. --- src/firefly/run_browser/widgets.py | 1 - src/queueserver/tiled_consumer.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/firefly/run_browser/widgets.py b/src/firefly/run_browser/widgets.py index ffbc0e93..2fcf74ba 100644 --- a/src/firefly/run_browser/widgets.py +++ b/src/firefly/run_browser/widgets.py @@ -101,7 +101,6 @@ def plot_runs(self, runs: Mapping, xsignal: str): ysignals = [] for run in runs.values(): ysignals.extend(run.columns) - print(xsignal, ysignals) # Remove the x-signal from the list of y signals ysignals = sorted(list(dict.fromkeys(ysignals))) # Plot the runs diff --git a/src/queueserver/tiled_consumer.py b/src/queueserver/tiled_consumer.py index 741c8b8f..ea31b00f 100755 --- a/src/queueserver/tiled_consumer.py +++ b/src/queueserver/tiled_consumer.py @@ -94,7 +94,7 @@ def process_document(self, topic: str, name: str, doc: Mapping) -> bool: def main(): - """Launch the mongo consumer.""" + """Launch the tiled consumer.""" logging.basicConfig(level=logging.WARNING) config = haven.load_config() bootstrap_servers = ["localhost:9092"] @@ -108,7 +108,7 @@ def main(): client = from_uri(tiled_uri, api_key=tiled_api_key) client.include_data_sources() - # Create a MongoConsumer that will listen for new documents. + # Create a Tiled consumer that will listen for new documents. consumer = TiledConsumer( tiled_client=client, topic_catalog_map=topic_catalog_map,