Skip to content

Commit

Permalink
Updated the Haven run engine to use a Kafka publisher by default inst…
Browse files Browse the repository at this point in the history
…ead of individual mongo/tiled callbacks.
  • Loading branch information
s25idcuser committed Jan 11, 2025
1 parent 84ded81 commit 819edc3
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 30 deletions.
6 changes: 5 additions & 1 deletion src/haven/iconfig_testing.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ beamline = "255-ID-C"
# not generate any devices, but is intended to be read by the Firefly
# GUI application to determine how to interact with the queue.

[kafka]
servers = ["fedorov.xray.aps.anl.gov:9092"]
topic = "bluesky.documents.25idc-dev"

[queueserver]
kafka_topic = "s255idc_queueserver"
kafka_topic = "bluesky.documents.255idc"
control_host = "localhost"
control_port = "60615"
info_host = "localhost"
Expand Down
2 changes: 1 addition & 1 deletion src/haven/ipython_startup.ipy
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ log = logging.getLogger(__name__)

# Create a run engine
RE = haven.run_engine(
connect_databroker=True,
connect_kafka=True,
call_returns_result=True,
use_bec=False,
)
Expand Down
24 changes: 23 additions & 1 deletion src/haven/run_engine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from uuid import uuid4 as uuid
import logging

import databroker
Expand All @@ -6,7 +7,9 @@
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.callbacks.tiled_writer import TiledWriter
from bluesky.utils import ProgressBarManager, register_transform
from bluesky_kafka import Publisher

from haven import load_config
from .catalog import tiled_client
from .exceptions import ComponentNotFound
from .instrument import beamline
Expand All @@ -29,8 +32,25 @@ def save_to_databroker(name, doc):
catalog.v1.insert(name, doc)


def kafka_publisher():
config = load_config()
publisher = Publisher(
topic=config["kafka"]["topic"],
bootstrap_servers=",".join(config["kafka"]["servers"]),
producer_config={"enable.idempotence": True},
flush_on_stop_doc=True,
key=str(uuid()),
)
return publisher


def run_engine(
*, connect_tiled=True, connect_databroker=True, use_bec=False, **kwargs
*,
connect_tiled=False,
connect_databroker=False,
connect_kafka=True,
use_bec=False,
**kwargs,
) -> BlueskyRunEngine:
"""Build a bluesky RunEngine() for Haven.
Expand Down Expand Up @@ -81,6 +101,8 @@ def run_engine(
client.include_data_sources()
tiled_writer = TiledWriter(client)
RE.subscribe(tiled_writer)
if connect_kafka:
RE.subscribe(kafka_publisher())
# Add preprocessors
RE.preprocessors.append(inject_haven_md_wrapper)
return RE
Expand Down
6 changes: 1 addition & 5 deletions src/queueserver/launch_queueserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def launch_queueserver():
config = load_config(file_paths=config_files)
control_port = config["queueserver"]["control_port"]
info_port = config["queueserver"]["info_port"]
kafka_topic = config["queueserver"]["kafka_topic"]
redis_addr = config["queueserver"]["redis_addr"]
# Launch the queueserver
args = [
Expand All @@ -36,12 +35,9 @@ def launch_queueserver():
"--redis-addr",
redis_addr,
"--keep-re",
"--kafka-topic",
kafka_topic,
"--update-existing-plans-devices",
"ENVIRONMENT_OPEN",
"--use-ipython-kernel=ON",
]
print("Starting queueserver with command:")
print(" ", " ".join(args))
log.info(f"Starting queueserver with command: {' '.join(args)}")
subprocess.run(args)
20 changes: 0 additions & 20 deletions src/queueserver/queueserver.sh

This file was deleted.

9 changes: 7 additions & 2 deletions src/queueserver/queueserver_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@

log = logging.getLogger(__name__)

# Create a run engine without all the bells and whistles
RE = run_engine(connect_databroker=False, use_bec=False)
# Create a run engine
RE = run_engine(
connect_databroker=False,
connect_tiled=False,
connect_kafka=True,
use_bec=False
)

# Import devices
beamline.load()
Expand Down

0 comments on commit 819edc3

Please sign in to comment.