Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flyable area detectors #268

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/topic_guides/fluorescence_detectors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ The device can then be retrieved by its name for use in Bluesky plans.
detectors = haven.beamline.devices.findall(label="fluorescence_detectors")


Adding NDAttributes (Xspress3)
=============================

The EPICS support for an Xspress3 detector provides several additional
values besides the spectra, many of them useful for dead-time
correction. These can be saved using the Ophyd-async NDAttribute
support. Haven includes a function for generating these parameters so
they can be set on the IOC in a way that allows Tiled to read the
resulting values from the HDF5 file. This only needs to be done once
for each detector IOC.

.code-block:: python

from haven.devices.detectors.xspress import Xspress3Detector, ndattribute_params
from ophyd_async.plan_stubs import setup_ndattributes

# Assuming a 4-element detector
detector = Xspress3Detector(...)
params = ndattribute_params(device_name=detector.name, elements=range(4))
setup_ndattributes(detector, params)


Why can't I…
############

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ HavenPlugin = "firefly.pydm_plugin:HavenPlugin"
haven_config = "haven._iconfig:print_config_value"
haven_queueserver = "queueserver.launch_queueserver:launch_queueserver"
mongo_consumer = "queueserver.mongo_consumer:main"
tiled_consumer = "queueserver.tiled_consumer:main"

[project.gui-scripts]
firefly = "firefly.launcher:main"
Expand Down
1 change: 1 addition & 0 deletions src/haven/devices/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .detectors.aravis import AravisDetector # noqa: F401
from .detectors.sim_detector import SimDetector # noqa: F401
from .detectors.xspress import Xspress3Detector # noqa: F401
from .ion_chamber import IonChamber # noqa: F401
from .monochromator import Monochromator # noqa: F401
from .motor import HavenMotor, Motor # noqa: F401
Expand Down
85 changes: 84 additions & 1 deletion src/haven/devices/detectors/xspress.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
TriggerInfo,
)
from ophyd_async.epics import adcore
from ophyd_async.epics.adcore._utils import ADBaseDataType, convert_ad_dtype_to_np
from ophyd_async.epics.adcore._utils import ADBaseDataType, convert_ad_dtype_to_np, NDAttributeParam, NDAttributeDataType
from ophyd_async.epics.core import epics_signal_rw, epics_signal_x

from .area_detectors import HavenDetector, default_path_provider
Expand Down Expand Up @@ -113,6 +113,7 @@ def __init__(
path_provider,
lambda: self.name,
XspressDatasetDescriber(self.drv),
self.drv, # <- for DT ndattributes
),
config_sigs=(self.drv.acquire_period, self.drv.acquire_time, *config_sigs),
name=name,
Expand All @@ -125,3 +126,85 @@ async def stage(self) -> None:
self.drv.erase_on_start.set(False),
self.drv.erase.trigger(),
)


def ndattribute_params(
device_name: str, elements: Sequence[int]
) -> Sequence[NDAttributeParam]:
"""Create a set of ndattribute params that can be written to the AD's
HDF5 file.

These parameters can then be used with something like
:py:func:`ophyd_async.plan_stubs.setup_ndattributes` to build the
XML.

"""
params = []
for idx in elements:
new_params = [
NDAttributeParam(
name=f"{device_name}-element{idx}-deadtime_factor",
param="XSP3_CHAN_DTFACTOR",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} DTC Factor",
),
NDAttributeParam(
name=f"{device_name}-element{idx}-deadtime_percent",
param="XSP3_CHAN_DTPERCENT",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} DTC Percent",
),
NDAttributeParam(
name=f"{device_name}-element{idx}-event_width",
param="XSP3_EVENT_WIDTH",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} Event Width",
),
NDAttributeParam(
name=f"{device_name}-element{idx}-clock_ticks",
param="XSP3_CHAN_SCA0",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} ClockTicks",
),
NDAttributeParam(
name=f"{device_name}-element{idx}-reset_ticks",
param="XSP3_CHAN_SCA1",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} ResetTicks",
),
NDAttributeParam(
name=f"{device_name}-element{idx}-reset_counts",
param="XSP3_CHAN_SCA2",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} ResetCounts",
),
NDAttributeParam(
name=f"{device_name}-element{idx}-all_event",
param="XSP3_CHAN_SCA3",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} AllEvent",
),
NDAttributeParam(
name=f"{device_name}-element{idx}-all_good",
param="XSP3_CHAN_SCA4",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} AllGood",
),
NDAttributeParam(
name=f"{device_name}-element{idx}-pileup",
param="XSP3_CHAN_SCA7",
datatype=NDAttributeDataType.DOUBLE,
addr=idx,
description=f"Chan {idx} Pileup",
),
]
params.extend(new_params)
return params
6 changes: 5 additions & 1 deletion src/haven/iconfig_testing.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ beamline = "255-ID-C"
# not generate any devices, but is intended to be read by the Firefly
# GUI application to determine how to interact with the queue.

[kafka]
servers = ["fedorov.xray.aps.anl.gov:9092"]
topic = "bluesky.documents.25idc-dev"

[queueserver]
kafka_topic = "s255idc_queueserver"
kafka_topic = "bluesky.documents.255idc"
control_host = "localhost"
control_port = "60615"
info_host = "localhost"
Expand Down
2 changes: 1 addition & 1 deletion src/haven/ipython_startup.ipy
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ log = logging.getLogger(__name__)

# Create a run engine
RE = haven.run_engine(
connect_databroker=True,
connect_kafka=True,
call_returns_result=True,
use_bec=False,
)
Expand Down
24 changes: 23 additions & 1 deletion src/haven/run_engine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from uuid import uuid4 as uuid
import logging

import databroker
Expand All @@ -6,7 +7,9 @@
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.callbacks.tiled_writer import TiledWriter
from bluesky.utils import ProgressBarManager, register_transform
from bluesky_kafka import Publisher

from haven import load_config
from .catalog import tiled_client
from .exceptions import ComponentNotFound
from .instrument import beamline
Expand All @@ -29,8 +32,25 @@ def save_to_databroker(name, doc):
catalog.v1.insert(name, doc)


def kafka_publisher():
config = load_config()
publisher = Publisher(
topic=config["kafka"]["topic"],
bootstrap_servers=",".join(config["kafka"]["servers"]),
producer_config={"enable.idempotence": True},
flush_on_stop_doc=True,
key=str(uuid()),
)
return publisher


def run_engine(
*, connect_tiled=True, connect_databroker=True, use_bec=False, **kwargs
*,
connect_tiled=False,
connect_databroker=False,
connect_kafka=True,
use_bec=False,
**kwargs,
) -> BlueskyRunEngine:
"""Build a bluesky RunEngine() for Haven.

Expand Down Expand Up @@ -81,6 +101,8 @@ def run_engine(
client.include_data_sources()
tiled_writer = TiledWriter(client)
RE.subscribe(tiled_writer)
if connect_kafka:
RE.subscribe(kafka_publisher())
# Add preprocessors
RE.preprocessors.append(inject_haven_md_wrapper)
return RE
Expand Down
9 changes: 8 additions & 1 deletion src/haven/tests/test_xspress.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ophyd_async.core import TriggerInfo
from ophyd_async.testing import get_mock_put, set_mock_value

from haven.devices.detectors.xspress import Xspress3Detector
from haven.devices.detectors.xspress import Xspress3Detector, ndattribute_params

this_dir = Path(__file__).parent

Expand Down Expand Up @@ -73,6 +73,13 @@ async def test_deadtime_correction(detector):
assert not await detector.drv.deadtime_correction.get_value()


async def test_ndattribute_params():
n_elem = 8
n_params = 9
params = ndattribute_params(device_name="xsp3", elements=range(n_elem))
assert len(params) == n_elem * n_params


# -----------------------------------------------------------------------------
# :author: Mark Wolfman
# :email: [email protected]
Expand Down
6 changes: 1 addition & 5 deletions src/queueserver/launch_queueserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def launch_queueserver():
config = load_config(file_paths=config_files)
control_port = config["queueserver"]["control_port"]
info_port = config["queueserver"]["info_port"]
kafka_topic = config["queueserver"]["kafka_topic"]
redis_addr = config["queueserver"]["redis_addr"]
# Launch the queueserver
args = [
Expand All @@ -36,12 +35,9 @@ def launch_queueserver():
"--redis-addr",
redis_addr,
"--keep-re",
"--kafka-topic",
kafka_topic,
"--update-existing-plans-devices",
"ENVIRONMENT_OPEN",
"--use-ipython-kernel=ON",
]
print("Starting queueserver with command:")
print(" ", " ".join(args))
log.info(f"Starting queueserver with command: {' '.join(args)}")
subprocess.run(args)
5 changes: 2 additions & 3 deletions src/queueserver/mongo_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ def main():

kafka_deserializer = partial(msgpack.loads, object_hook=mpn.decode)
auto_offset_reset = "latest"
topics = ["s25idc_queueserver", "s25idd_queueserver"]

topic_database_map = {
"s25idc_queueserver": "25idc-bluesky",
"s25idd_queueserver": "25idd-bluesky",
"bluesky.documents.haven": "25idc-bluesky",
}
topics = topic_database_map.keys()

# Create a MongoConsumer that will automatically listen to new beamline topics.
# The parameter metadata.max.age.ms determines how often the consumer will check for
Expand Down
20 changes: 0 additions & 20 deletions src/queueserver/queueserver.sh

This file was deleted.

9 changes: 7 additions & 2 deletions src/queueserver/queueserver_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@

log = logging.getLogger(__name__)

# Create a run engine without all the bells and whistles
RE = run_engine(connect_databroker=False, use_bec=False)
# Create a run engine
RE = run_engine(
connect_databroker=False,
connect_tiled=False,
connect_kafka=True,
use_bec=False
)

# Import devices
beamline.load()
Expand Down
4 changes: 3 additions & 1 deletion src/queueserver/systemd_units/mongo_consumer.service
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Wants=kafka.service
After=kafka.service

[Service]
ExecStart=/bin/bash -l -c 'mamba activate haven && env python %h/src/queueserver/mongo_consumer.py'
ExecStart=/bin/bash -l -c 'mamba activate haven && mongo_consumer'
Restart=always
RestartSec=60
# ExecStopPost=/APSshare/epics/base-7.0.7/bin/rhel8-x86_64/caput 100id:bluesky:mongo_consumer_state 1
# ExecStartPost=/APSshare/epics/base-7.0.7/bin/rhel8-x86_64/caput 100id:bluesky:mongo_consumer_state 2

Expand Down
17 changes: 17 additions & 0 deletions src/queueserver/systemd_units/tiled_consumer.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[Unit]
Description=consumer for saving bluesky documents to Tiled via kafka
After=syslog.target network.target
Wants=kafka.service
After=kafka.service

[Service]
ExecStart=/bin/bash -l -c 'mamba activate haven && tiled_consumer'
Restart=always
RestartSec=60
# ExecStopPost=/APSshare/epics/base-7.0.7/bin/rhel9-x86_64/caput 25idc:bluesky:tiled_consumer_state 1
# ExecStopPost=/APSshare/epics/base-7.0.7/bin/rhel9-x86_64/caput 25idd:bluesky:tiled_consumer_state 1
# ExecStartPost=/APSshare/epics/base-7.0.7/bin/rhel9-x86_64/caput 25idc:bluesky:tiled_consumer_state 2
# ExecStartPost=/APSshare/epics/base-7.0.7/bin/rhel9-x86_64/caput 25idd:bluesky:tiled_consumer_state 2

[Install]
WantedBy=default.target
14 changes: 7 additions & 7 deletions src/queueserver/tiled_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class TiledConsumer(BlueskyConsumer):
Translates Kafka topic names to Tiled catalogs. Each value
should be the name of a catalog available directly under
*tiled_client*.
bootstrap_servers : str
bootstrap_servers
Kafka server addresses as strings such as
``[broker1:9092, broker2:9092, 127.0.0.1:9092]``
``["broker1:9092", "broker2:9092", "127.0.0.1:9092"]``
group_id : str
Required string identifier for the consumer's Kafka Consumer group.
consumer_config : dict
Expand Down Expand Up @@ -95,18 +95,17 @@ def process_document(self, topic: str, name: str, doc: Mapping) -> bool:

def main():
"""Launch the tiled consumer."""
logging.basicConfig(level=logging.WARNING)
logging.basicConfig(level=logging.INFO)
config = haven.load_config()
bootstrap_servers = ["localhost:9092"]
topic_catalog_map = {
"s25idc_queueserver-dev": "haven-dev",
"s25idd_queueserver-dev": "haven-dev",
"bluesky.documents.haven": "haven",
"bluesky.documents.haven-dev": "haven-dev",
}
# Create a tiled writer that will write documents to tiled
tiled_uri = config["database"]["tiled"]["uri"]
tiled_api_key = config["database"]["tiled"]["api_key"]
client = from_uri(tiled_uri, api_key=tiled_api_key)
client.include_data_sources()
client = from_uri(tiled_uri, api_key=tiled_api_key, include_data_sources=True)

# Create a Tiled consumer that will listen for new documents.
consumer = TiledConsumer(
Expand All @@ -117,6 +116,7 @@ def main():
consumer_config={"auto.offset.reset": "latest"},
polling_duration=1.0,
)
log.info("Starting Tiled consumer")
consumer.start()


Expand Down
Loading