diff --git a/src/queueserver/tiled_consumer.py b/src/queueserver/tiled_consumer.py index 237be2c5..741c8b8f 100755 --- a/src/queueserver/tiled_consumer.py +++ b/src/queueserver/tiled_consumer.py @@ -1,15 +1,12 @@ -import os -import sys import logging -from functools import partial +import sys from typing import Mapping, Sequence import msgpack -import msgpack_numpy as mpn from bluesky.callbacks.tiled_writer import TiledWriter -from bluesky_kafka import RemoteDispatcher, BlueskyConsumer -from tiled.client.base import BaseClient +from bluesky_kafka import BlueskyConsumer from tiled.client import from_uri +from tiled.client.base import BaseClient import haven @@ -47,6 +44,7 @@ class TiledConsumer(BlueskyConsumer): Function to deserialize data. Default is msgpack.loads. """ + def __init__( self, tiled_client: BaseClient, @@ -60,7 +58,9 @@ def __init__( self.topic_catalog_map = topic_catalog_map # Create writers for each Tiled catalog catalog_names = set(topic_catalog_map.values()) - self.writers = {catalog: TiledWriter(tiled_client[catalog]) for catalog in catalog_names} + self.writers = { + catalog: TiledWriter(tiled_client[catalog]) for catalog in catalog_names + } super().__init__( topics=list(topic_catalog_map.keys()), bootstrap_servers=",".join(bootstrap_servers), @@ -103,8 +103,8 @@ def main(): "s25idd_queueserver-dev": "haven-dev", } # Create a tiled writer that will write documents to tiled - tiled_uri = config['database']['tiled']['uri'] - tiled_api_key = config['database']['tiled']['api_key'] + tiled_uri = config["database"]["tiled"]["uri"] + tiled_api_key = config["database"]["tiled"]["api_key"] client = from_uri(tiled_uri, api_key=tiled_api_key) client.include_data_sources()