From 819edc3bfbacd10ec3318570235f4dd93204850b Mon Sep 17 00:00:00 2001 From: s25idcuser Date: Fri, 10 Jan 2025 22:54:15 -0600 Subject: [PATCH] Updated the Haven run engine to use a Kafka publisher by default instead of individual mongo/tiled callbacks. --- src/haven/iconfig_testing.toml | 6 +++++- src/haven/ipython_startup.ipy | 2 +- src/haven/run_engine.py | 24 +++++++++++++++++++++++- src/queueserver/launch_queueserver.py | 6 +----- src/queueserver/queueserver.sh | 20 -------------------- src/queueserver/queueserver_startup.py | 9 +++++++-- 6 files changed, 37 insertions(+), 30 deletions(-) delete mode 100755 src/queueserver/queueserver.sh diff --git a/src/haven/iconfig_testing.toml b/src/haven/iconfig_testing.toml index 2b9651e1..f7bc4deb 100644 --- a/src/haven/iconfig_testing.toml +++ b/src/haven/iconfig_testing.toml @@ -24,8 +24,12 @@ beamline = "255-ID-C" # not generate any devices, but is intended to be read by the Firefly # GUI application to determine how to interact with the queue. +[kafka] +servers = ["fedorov.xray.aps.anl.gov:9092"] +topic = "bluesky.documents.25idc-dev" + [queueserver] -kafka_topic = "s255idc_queueserver" +kafka_topic = "bluesky.documents.255idc" control_host = "localhost" control_port = "60615" info_host = "localhost" diff --git a/src/haven/ipython_startup.ipy b/src/haven/ipython_startup.ipy index 071de06a..5d678b66 100644 --- a/src/haven/ipython_startup.ipy +++ b/src/haven/ipython_startup.ipy @@ -29,7 +29,7 @@ log = logging.getLogger(__name__) # Create a run engine RE = haven.run_engine( - connect_databroker=True, + connect_kafka=True, call_returns_result=True, use_bec=False, ) diff --git a/src/haven/run_engine.py b/src/haven/run_engine.py index dc6ebbf5..23615a93 100644 --- a/src/haven/run_engine.py +++ b/src/haven/run_engine.py @@ -1,3 +1,4 @@ +from uuid import uuid4 as uuid import logging import databroker @@ -6,7 +7,9 @@ from bluesky.callbacks.best_effort import BestEffortCallback from bluesky.callbacks.tiled_writer import TiledWriter from bluesky.utils import ProgressBarManager, register_transform +from bluesky_kafka import Publisher +from haven import load_config from .catalog import tiled_client from .exceptions import ComponentNotFound from .instrument import beamline @@ -29,8 +32,25 @@ def save_to_databroker(name, doc): catalog.v1.insert(name, doc) +def kafka_publisher(): + config = load_config() + publisher = Publisher( + topic=config["kafka"]["topic"], + bootstrap_servers=",".join(config["kafka"]["servers"]), + producer_config={"enable.idempotence": True}, + flush_on_stop_doc=True, + key=str(uuid()), + ) + return publisher + + def run_engine( - *, connect_tiled=True, connect_databroker=True, use_bec=False, **kwargs + *, + connect_tiled=False, + connect_databroker=False, + connect_kafka=True, + use_bec=False, + **kwargs, ) -> BlueskyRunEngine: """Build a bluesky RunEngine() for Haven. @@ -81,6 +101,8 @@ def run_engine( client.include_data_sources() tiled_writer = TiledWriter(client) RE.subscribe(tiled_writer) + if connect_kafka: + RE.subscribe(kafka_publisher()) # Add preprocessors RE.preprocessors.append(inject_haven_md_wrapper) return RE diff --git a/src/queueserver/launch_queueserver.py b/src/queueserver/launch_queueserver.py index 245a9d70..c4dcd9d5 100644 --- a/src/queueserver/launch_queueserver.py +++ b/src/queueserver/launch_queueserver.py @@ -18,7 +18,6 @@ def launch_queueserver(): config = load_config(file_paths=config_files) control_port = config["queueserver"]["control_port"] info_port = config["queueserver"]["info_port"] - kafka_topic = config["queueserver"]["kafka_topic"] redis_addr = config["queueserver"]["redis_addr"] # Launch the queueserver args = [ @@ -36,12 +35,9 @@ def launch_queueserver(): "--redis-addr", redis_addr, "--keep-re", - "--kafka-topic", - kafka_topic, "--update-existing-plans-devices", "ENVIRONMENT_OPEN", "--use-ipython-kernel=ON", ] - print("Starting queueserver with command:") - print(" ", " ".join(args)) + log.info(f"Starting queueserver with command: {' '.join(args)}") subprocess.run(args) diff --git a/src/queueserver/queueserver.sh b/src/queueserver/queueserver.sh deleted file mode 100755 index 3cfe8451..00000000 --- a/src/queueserver/queueserver.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash - -# Set up configuration -THIS_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -export HAVEN_CONFIG_FILES="${BLUESKY_DIR}/iconfig.toml" -KAFKA_TOPIC=`haven_config queueserver.kafka_topic` -ZMQ_CONTROL_ADDR="tcp://*:`haven_config queueserver.control_port`" -ZMQ_INFO_ADDR="tcp://*:`haven_config queueserver.info_port`" - -# Lauch -start-re-manager \ - --startup-script ${THIS_DIR}/queueserver_startup.py \ - --existing-plans-devices ${BLUESKY_DIR}/queueserver_existing_plans_and_devices.yaml \ - --user-group-permissions ${THIS_DIR}/queueserver_user_group_permissions.yaml \ - --zmq-control-addr ${ZMQ_CONTROL_ADDR} \ - --zmq-info-addr ${ZMQ_INFO_ADDR} \ - --redis-addr ${REDIS_ADDR} \ - --keep-re \ - --kafka-topic ${KAFKA_TOPIC} \ - --update-existing-plans-devices ENVIRONMENT_OPEN diff --git a/src/queueserver/queueserver_startup.py b/src/queueserver/queueserver_startup.py index ef6e912a..f8660658 100755 --- a/src/queueserver/queueserver_startup.py +++ b/src/queueserver/queueserver_startup.py @@ -37,8 +37,13 @@ log = logging.getLogger(__name__) -# Create a run engine without all the bells and whistles -RE = run_engine(connect_databroker=False, use_bec=False) +# Create a run engine +RE = run_engine( + connect_databroker=False, + connect_tiled=False, + connect_kafka=True, + use_bec=False +) # Import devices beamline.load()