From d4366f5a4855c3f54d35877e377bc7f712257f61 Mon Sep 17 00:00:00 2001 From: s25idcuser Date: Thu, 9 Jan 2025 14:54:04 -0600 Subject: [PATCH 1/5] Include xspress detector driver as a plugin to allow ndattributes reading. --- src/haven/devices/__init__.py | 1 + src/haven/devices/detectors/xspress.py | 1 + 2 files changed, 2 insertions(+) diff --git a/src/haven/devices/__init__.py b/src/haven/devices/__init__.py index 36fc2038..94695355 100644 --- a/src/haven/devices/__init__.py +++ b/src/haven/devices/__init__.py @@ -1,5 +1,6 @@ from .detectors.aravis import AravisDetector # noqa: F401 from .detectors.sim_detector import SimDetector # noqa: F401 +from .detectors.xspress import Xspress3Detector # noqa: F401 from .ion_chamber import IonChamber # noqa: F401 from .monochromator import Monochromator # noqa: F401 from .motor import HavenMotor, Motor # noqa: F401 diff --git a/src/haven/devices/detectors/xspress.py b/src/haven/devices/detectors/xspress.py index fa58a42b..12485a3f 100644 --- a/src/haven/devices/detectors/xspress.py +++ b/src/haven/devices/detectors/xspress.py @@ -113,6 +113,7 @@ def __init__( path_provider, lambda: self.name, XspressDatasetDescriber(self.drv), + self.drv, # <- for DT ndattributes ), config_sigs=(self.drv.acquire_period, self.drv.acquire_time, *config_sigs), name=name, From 83a87d5c1f6e4ef945cc09ea640957f636b6ea19 Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Thu, 9 Jan 2025 16:40:09 -0600 Subject: [PATCH 2/5] Added a function for generating NDAttributeParam objects for an xspress3 detector. --- docs/topic_guides/fluorescence_detectors.rst | 22 ++++++ src/haven/devices/detectors/xspress.py | 83 ++++++++++++++++++++ src/haven/tests/test_xspress.py | 9 ++- 3 files changed, 113 insertions(+), 1 deletion(-) diff --git a/docs/topic_guides/fluorescence_detectors.rst b/docs/topic_guides/fluorescence_detectors.rst index d9d6fff1..b6f31917 100644 --- a/docs/topic_guides/fluorescence_detectors.rst +++ b/docs/topic_guides/fluorescence_detectors.rst @@ -68,6 +68,28 @@ The device can then be retrieved by its name for use in Bluesky plans. detectors = haven.beamline.devices.findall(label="fluorescence_detectors") +Adding NDAttributes (Xspress3) +============================= + +The EPICS support for an Xspress3 detector provides several additional +values besides the spectra, many of them useful for dead-time +correction. These can be saved using the Ophyd-async NDAttribute +support. Haven includes a function for generating these parameters so +they can be set on the IOC in a way that allows Tiled to read the +resulting values from the HDF5 file. This only needs to be done once +for each detector IOC. + +.code-block:: python + + from haven.devices.detectors.xspress import Xspress3Detector, ndattribute_params + from ophyd_async.plan_stubs import setup_ndattributes + + # Assuming a 4-element detector + detector = Xspress3Detector(...) + params = ndattribute_params(device_name=detector.name, elements=range(4)) + setup_ndattributes(detector, params) + + Why can't I… ############ diff --git a/src/haven/devices/detectors/xspress.py b/src/haven/devices/detectors/xspress.py index 12485a3f..9438221d 100644 --- a/src/haven/devices/detectors/xspress.py +++ b/src/haven/devices/detectors/xspress.py @@ -11,6 +11,7 @@ TriggerInfo, ) from ophyd_async.epics import adcore +from ophyd_async.epics.adcore import NDAttributeParam from ophyd_async.epics.adcore._utils import ADBaseDataType, convert_ad_dtype_to_np from ophyd_async.epics.core import epics_signal_rw, epics_signal_x @@ -126,3 +127,85 @@ async def stage(self) -> None: self.drv.erase_on_start.set(False), self.drv.erase.trigger(), ) + + +def ndattribute_params( + device_name: str, elements: Sequence[int] +) -> Sequence[NDAttributeParam]: + """Create a set of ndattribute params that can be written to the AD's + HDF5 file. + + These parameters can then be used with something like + :py:func:`ophyd_async.plan_stubs.setup_ndattributes` to build the + XML. + + """ + params = [] + for idx in elements: + new_params = [ + NDAttributeParam( + name=f"{device_name}-element{idx}-deadtime_factor", + param="XSP3_CHAN_DTFACTOR", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} DTC Factor", + ), + NDAttributeParam( + name=f"{device_name}-element{idx}-deadtime_percent", + param="XSP3_CHAN_DTPERCENT", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} DTC Percent", + ), + NDAttributeParam( + name=f"{device_name}-element{idx}-event_width", + param="XSP3_EVENT_WIDTH", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} Event Width", + ), + NDAttributeParam( + name=f"{device_name}-element{idx}-clock_ticks", + param="XSP3_CHAN_SCA0", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} ClockTicks", + ), + NDAttributeParam( + name=f"{device_name}-element{idx}-reset_ticks", + param="XSP3_CHAN_SCA1", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} ResetTicks", + ), + NDAttributeParam( + name=f"{device_name}-element{idx}-reset_counts", + param="XSP3_CHAN_SCA2", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} ResetCounts", + ), + NDAttributeParam( + name=f"{device_name}-element{idx}-all_event", + param="XSP3_CHAN_SCA3", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} AllEvent", + ), + NDAttributeParam( + name=f"{device_name}-element{idx}-all_good", + param="XSP3_CHAN_SCA4", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} AllGood", + ), + NDAttributeParam( + name=f"{device_name}-element{idx}-pileup", + param="XSP3_CHAN_SCA7", + datatype="DOUBLE", + addr=idx, + description=f"Chan {idx} Pileup", + ), + ] + params.extend(new_params) + return params diff --git a/src/haven/tests/test_xspress.py b/src/haven/tests/test_xspress.py index cd52a705..0fa218a1 100644 --- a/src/haven/tests/test_xspress.py +++ b/src/haven/tests/test_xspress.py @@ -5,7 +5,7 @@ from ophyd_async.core import TriggerInfo from ophyd_async.testing import get_mock_put, set_mock_value -from haven.devices.detectors.xspress import Xspress3Detector +from haven.devices.detectors.xspress import Xspress3Detector, ndattribute_params this_dir = Path(__file__).parent @@ -73,6 +73,13 @@ async def test_deadtime_correction(detector): assert not await detector.drv.deadtime_correction.get_value() +async def test_ndattribute_params(): + n_elem = 8 + n_params = 9 + params = ndattribute_params(device_name="xsp3", elements=range(n_elem)) + assert len(params) == n_elem * n_params + + # ----------------------------------------------------------------------------- # :author: Mark Wolfman # :email: wolfman@anl.gov From c163f0bc93e2317d398383c9cf426a642f74d4d8 Mon Sep 17 00:00:00 2001 From: 25-ID Staff Account <25-id@list.anl.gov> Date: Fri, 10 Jan 2025 22:33:16 -0600 Subject: [PATCH 3/5] Updated queueserver for new kafka configuration. --- pyproject.toml | 1 + src/queueserver/mongo_consumer.py | 5 ++--- .../systemd_units/mongo_consumer.service | 4 +++- .../systemd_units/tiled_consumer.service | 17 +++++++++++++++++ src/queueserver/tiled_consumer.py | 14 +++++++------- 5 files changed, 30 insertions(+), 11 deletions(-) create mode 100644 src/queueserver/systemd_units/tiled_consumer.service diff --git a/pyproject.toml b/pyproject.toml index 1892adb0..415860ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ HavenPlugin = "firefly.pydm_plugin:HavenPlugin" haven_config = "haven._iconfig:print_config_value" haven_queueserver = "queueserver.launch_queueserver:launch_queueserver" mongo_consumer = "queueserver.mongo_consumer:main" +tiled_consumer = "queueserver.tiled_consumer:main" [project.gui-scripts] firefly = "firefly.launcher:main" diff --git a/src/queueserver/mongo_consumer.py b/src/queueserver/mongo_consumer.py index 12b08bac..7ed7c02f 100755 --- a/src/queueserver/mongo_consumer.py +++ b/src/queueserver/mongo_consumer.py @@ -27,12 +27,11 @@ def main(): kafka_deserializer = partial(msgpack.loads, object_hook=mpn.decode) auto_offset_reset = "latest" - topics = ["s25idc_queueserver", "s25idd_queueserver"] topic_database_map = { - "s25idc_queueserver": "25idc-bluesky", - "s25idd_queueserver": "25idd-bluesky", + "bluesky.documents.haven": "25idc-bluesky", } + topics = topic_database_map.keys() # Create a MongoConsumer that will automatically listen to new beamline topics. # The parameter metadata.max.age.ms determines how often the consumer will check for diff --git a/src/queueserver/systemd_units/mongo_consumer.service b/src/queueserver/systemd_units/mongo_consumer.service index 911f632e..e53482f0 100644 --- a/src/queueserver/systemd_units/mongo_consumer.service +++ b/src/queueserver/systemd_units/mongo_consumer.service @@ -5,7 +5,9 @@ Wants=kafka.service After=kafka.service [Service] -ExecStart=/bin/bash -l -c 'mamba activate haven && env python %h/src/queueserver/mongo_consumer.py' +ExecStart=/bin/bash -l -c 'mamba activate haven && mongo_consumer' +Restart=always +RestartSec=60 # ExecStopPost=/APSshare/epics/base-7.0.7/bin/rhel8-x86_64/caput 100id:bluesky:mongo_consumer_state 1 # ExecStartPost=/APSshare/epics/base-7.0.7/bin/rhel8-x86_64/caput 100id:bluesky:mongo_consumer_state 2 diff --git a/src/queueserver/systemd_units/tiled_consumer.service b/src/queueserver/systemd_units/tiled_consumer.service new file mode 100644 index 00000000..16eddc4e --- /dev/null +++ b/src/queueserver/systemd_units/tiled_consumer.service @@ -0,0 +1,17 @@ +[Unit] +Description=consumer for saving bluesky documents to Tiled via kafka +After=syslog.target network.target +Wants=kafka.service +After=kafka.service + +[Service] +ExecStart=/bin/bash -l -c 'mamba activate haven && tiled_consumer' +Restart=always +RestartSec=60 +# ExecStopPost=/APSshare/epics/base-7.0.7/bin/rhel9-x86_64/caput 25idc:bluesky:tiled_consumer_state 1 +# ExecStopPost=/APSshare/epics/base-7.0.7/bin/rhel9-x86_64/caput 25idd:bluesky:tiled_consumer_state 1 +# ExecStartPost=/APSshare/epics/base-7.0.7/bin/rhel9-x86_64/caput 25idc:bluesky:tiled_consumer_state 2 +# ExecStartPost=/APSshare/epics/base-7.0.7/bin/rhel9-x86_64/caput 25idd:bluesky:tiled_consumer_state 2 + +[Install] +WantedBy=default.target diff --git a/src/queueserver/tiled_consumer.py b/src/queueserver/tiled_consumer.py index ea31b00f..1fb0bebf 100755 --- a/src/queueserver/tiled_consumer.py +++ b/src/queueserver/tiled_consumer.py @@ -29,9 +29,9 @@ class TiledConsumer(BlueskyConsumer): Translates Kafka topic names to Tiled catalogs. Each value should be the name of a catalog available directly under *tiled_client*. - bootstrap_servers : str + bootstrap_servers Kafka server addresses as strings such as - ``[broker1:9092, broker2:9092, 127.0.0.1:9092]`` + ``["broker1:9092", "broker2:9092", "127.0.0.1:9092"]`` group_id : str Required string identifier for the consumer's Kafka Consumer group. consumer_config : dict @@ -95,18 +95,17 @@ def process_document(self, topic: str, name: str, doc: Mapping) -> bool: def main(): """Launch the tiled consumer.""" - logging.basicConfig(level=logging.WARNING) + logging.basicConfig(level=logging.INFO) config = haven.load_config() bootstrap_servers = ["localhost:9092"] topic_catalog_map = { - "s25idc_queueserver-dev": "haven-dev", - "s25idd_queueserver-dev": "haven-dev", + "bluesky.documents.haven": "haven", + "bluesky.documents.haven-dev": "haven-dev", } # Create a tiled writer that will write documents to tiled tiled_uri = config["database"]["tiled"]["uri"] tiled_api_key = config["database"]["tiled"]["api_key"] - client = from_uri(tiled_uri, api_key=tiled_api_key) - client.include_data_sources() + client = from_uri(tiled_uri, api_key=tiled_api_key, include_data_sources=True) # Create a Tiled consumer that will listen for new documents. consumer = TiledConsumer( @@ -117,6 +116,7 @@ def main(): consumer_config={"auto.offset.reset": "latest"}, polling_duration=1.0, ) + log.info("Starting Tiled consumer") consumer.start() From 84ded81242ff9ff3a2e81c1251e19a10790d17ae Mon Sep 17 00:00:00 2001 From: s25idcuser Date: Fri, 10 Jan 2025 22:39:36 -0600 Subject: [PATCH 4/5] Fixes to the xspress NDAttributes support. --- src/haven/devices/detectors/xspress.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/haven/devices/detectors/xspress.py b/src/haven/devices/detectors/xspress.py index 9438221d..882419b9 100644 --- a/src/haven/devices/detectors/xspress.py +++ b/src/haven/devices/detectors/xspress.py @@ -11,8 +11,7 @@ TriggerInfo, ) from ophyd_async.epics import adcore -from ophyd_async.epics.adcore import NDAttributeParam -from ophyd_async.epics.adcore._utils import ADBaseDataType, convert_ad_dtype_to_np +from ophyd_async.epics.adcore._utils import ADBaseDataType, convert_ad_dtype_to_np, NDAttributeParam, NDAttributeDataType from ophyd_async.epics.core import epics_signal_rw, epics_signal_x from .area_detectors import HavenDetector, default_path_provider @@ -146,63 +145,63 @@ def ndattribute_params( NDAttributeParam( name=f"{device_name}-element{idx}-deadtime_factor", param="XSP3_CHAN_DTFACTOR", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} DTC Factor", ), NDAttributeParam( name=f"{device_name}-element{idx}-deadtime_percent", param="XSP3_CHAN_DTPERCENT", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} DTC Percent", ), NDAttributeParam( name=f"{device_name}-element{idx}-event_width", param="XSP3_EVENT_WIDTH", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} Event Width", ), NDAttributeParam( name=f"{device_name}-element{idx}-clock_ticks", param="XSP3_CHAN_SCA0", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} ClockTicks", ), NDAttributeParam( name=f"{device_name}-element{idx}-reset_ticks", param="XSP3_CHAN_SCA1", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} ResetTicks", ), NDAttributeParam( name=f"{device_name}-element{idx}-reset_counts", param="XSP3_CHAN_SCA2", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} ResetCounts", ), NDAttributeParam( name=f"{device_name}-element{idx}-all_event", param="XSP3_CHAN_SCA3", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} AllEvent", ), NDAttributeParam( name=f"{device_name}-element{idx}-all_good", param="XSP3_CHAN_SCA4", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} AllGood", ), NDAttributeParam( name=f"{device_name}-element{idx}-pileup", param="XSP3_CHAN_SCA7", - datatype="DOUBLE", + datatype=NDAttributeDataType.DOUBLE, addr=idx, description=f"Chan {idx} Pileup", ), From 819edc3bfbacd10ec3318570235f4dd93204850b Mon Sep 17 00:00:00 2001 From: s25idcuser Date: Fri, 10 Jan 2025 22:54:15 -0600 Subject: [PATCH 5/5] Updated the Haven run engine to use a Kafka publisher by default instead of individual mongo/tiled callbacks. --- src/haven/iconfig_testing.toml | 6 +++++- src/haven/ipython_startup.ipy | 2 +- src/haven/run_engine.py | 24 +++++++++++++++++++++++- src/queueserver/launch_queueserver.py | 6 +----- src/queueserver/queueserver.sh | 20 -------------------- src/queueserver/queueserver_startup.py | 9 +++++++-- 6 files changed, 37 insertions(+), 30 deletions(-) delete mode 100755 src/queueserver/queueserver.sh diff --git a/src/haven/iconfig_testing.toml b/src/haven/iconfig_testing.toml index 2b9651e1..f7bc4deb 100644 --- a/src/haven/iconfig_testing.toml +++ b/src/haven/iconfig_testing.toml @@ -24,8 +24,12 @@ beamline = "255-ID-C" # not generate any devices, but is intended to be read by the Firefly # GUI application to determine how to interact with the queue. +[kafka] +servers = ["fedorov.xray.aps.anl.gov:9092"] +topic = "bluesky.documents.25idc-dev" + [queueserver] -kafka_topic = "s255idc_queueserver" +kafka_topic = "bluesky.documents.255idc" control_host = "localhost" control_port = "60615" info_host = "localhost" diff --git a/src/haven/ipython_startup.ipy b/src/haven/ipython_startup.ipy index 071de06a..5d678b66 100644 --- a/src/haven/ipython_startup.ipy +++ b/src/haven/ipython_startup.ipy @@ -29,7 +29,7 @@ log = logging.getLogger(__name__) # Create a run engine RE = haven.run_engine( - connect_databroker=True, + connect_kafka=True, call_returns_result=True, use_bec=False, ) diff --git a/src/haven/run_engine.py b/src/haven/run_engine.py index dc6ebbf5..23615a93 100644 --- a/src/haven/run_engine.py +++ b/src/haven/run_engine.py @@ -1,3 +1,4 @@ +from uuid import uuid4 as uuid import logging import databroker @@ -6,7 +7,9 @@ from bluesky.callbacks.best_effort import BestEffortCallback from bluesky.callbacks.tiled_writer import TiledWriter from bluesky.utils import ProgressBarManager, register_transform +from bluesky_kafka import Publisher +from haven import load_config from .catalog import tiled_client from .exceptions import ComponentNotFound from .instrument import beamline @@ -29,8 +32,25 @@ def save_to_databroker(name, doc): catalog.v1.insert(name, doc) +def kafka_publisher(): + config = load_config() + publisher = Publisher( + topic=config["kafka"]["topic"], + bootstrap_servers=",".join(config["kafka"]["servers"]), + producer_config={"enable.idempotence": True}, + flush_on_stop_doc=True, + key=str(uuid()), + ) + return publisher + + def run_engine( - *, connect_tiled=True, connect_databroker=True, use_bec=False, **kwargs + *, + connect_tiled=False, + connect_databroker=False, + connect_kafka=True, + use_bec=False, + **kwargs, ) -> BlueskyRunEngine: """Build a bluesky RunEngine() for Haven. @@ -81,6 +101,8 @@ def run_engine( client.include_data_sources() tiled_writer = TiledWriter(client) RE.subscribe(tiled_writer) + if connect_kafka: + RE.subscribe(kafka_publisher()) # Add preprocessors RE.preprocessors.append(inject_haven_md_wrapper) return RE diff --git a/src/queueserver/launch_queueserver.py b/src/queueserver/launch_queueserver.py index 245a9d70..c4dcd9d5 100644 --- a/src/queueserver/launch_queueserver.py +++ b/src/queueserver/launch_queueserver.py @@ -18,7 +18,6 @@ def launch_queueserver(): config = load_config(file_paths=config_files) control_port = config["queueserver"]["control_port"] info_port = config["queueserver"]["info_port"] - kafka_topic = config["queueserver"]["kafka_topic"] redis_addr = config["queueserver"]["redis_addr"] # Launch the queueserver args = [ @@ -36,12 +35,9 @@ def launch_queueserver(): "--redis-addr", redis_addr, "--keep-re", - "--kafka-topic", - kafka_topic, "--update-existing-plans-devices", "ENVIRONMENT_OPEN", "--use-ipython-kernel=ON", ] - print("Starting queueserver with command:") - print(" ", " ".join(args)) + log.info(f"Starting queueserver with command: {' '.join(args)}") subprocess.run(args) diff --git a/src/queueserver/queueserver.sh b/src/queueserver/queueserver.sh deleted file mode 100755 index 3cfe8451..00000000 --- a/src/queueserver/queueserver.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash - -# Set up configuration -THIS_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -export HAVEN_CONFIG_FILES="${BLUESKY_DIR}/iconfig.toml" -KAFKA_TOPIC=`haven_config queueserver.kafka_topic` -ZMQ_CONTROL_ADDR="tcp://*:`haven_config queueserver.control_port`" -ZMQ_INFO_ADDR="tcp://*:`haven_config queueserver.info_port`" - -# Lauch -start-re-manager \ - --startup-script ${THIS_DIR}/queueserver_startup.py \ - --existing-plans-devices ${BLUESKY_DIR}/queueserver_existing_plans_and_devices.yaml \ - --user-group-permissions ${THIS_DIR}/queueserver_user_group_permissions.yaml \ - --zmq-control-addr ${ZMQ_CONTROL_ADDR} \ - --zmq-info-addr ${ZMQ_INFO_ADDR} \ - --redis-addr ${REDIS_ADDR} \ - --keep-re \ - --kafka-topic ${KAFKA_TOPIC} \ - --update-existing-plans-devices ENVIRONMENT_OPEN diff --git a/src/queueserver/queueserver_startup.py b/src/queueserver/queueserver_startup.py index ef6e912a..f8660658 100755 --- a/src/queueserver/queueserver_startup.py +++ b/src/queueserver/queueserver_startup.py @@ -37,8 +37,13 @@ log = logging.getLogger(__name__) -# Create a run engine without all the bells and whistles -RE = run_engine(connect_databroker=False, use_bec=False) +# Create a run engine +RE = run_engine( + connect_databroker=False, + connect_tiled=False, + connect_kafka=True, + use_bec=False +) # Import devices beamline.load()