diff --git a/README.md b/README.md index 05a2218..bff406e 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # gcn-classic-to-json -A Kafka client instrumented with Prometheus. +Convert GCN Classic notices to JSON. ## Configuration diff --git a/gcn_classic_to_json/kafka.py b/gcn_classic_to_json/kafka.py index 0c7b7db..e04a6cb 100644 --- a/gcn_classic_to_json/kafka.py +++ b/gcn_classic_to_json/kafka.py @@ -5,31 +5,45 @@ # # SPDX-License-Identifier: Apache-2.0 # -"""Monitor Kafka consumer connectivity.""" +"""Convert GCN Classic notices to JSON.""" import json import logging +import struct import gcn_kafka from . import metrics +from . import notices log = logging.getLogger(__name__) -def stats_cb(data): - stats = json.loads(data) - for broker in stats["brokers"].values(): - metrics.broker_state.labels(broker["name"]).state(broker["state"]) +def kafka_delivered_cb(err, msg): + successful = not err + metrics.delivered.labels(msg.topic(), msg.partition(), successful).inc() def run(): + binary_topic_prefix = 'gcn.classic.binary.' + json_topic_prefix = 'gcn.classic.json.' + int4 = struct.Struct("!l") + funcs = { + key: value + for key, value in notices.__dict__.items() + if key.isupper() + } + log.info("Creating consumer") config = gcn_kafka.config_from_env() consumer = gcn_kafka.Consumer(config) + log.info("Creating producer") + config["client.id"] = __package__ + config["on_delivery"] = kafka_delivered_cb + producer = gcn_kafka.Producer(config) + log.info("Subscribing") - topics = list(consumer.list_topics().topics.keys()) - consumer.subscribe(topics) + consumer.subscribe([binary_topic_prefix + key for key in funcs]) log.info("Entering consume loop") while True: @@ -39,3 +53,8 @@ def run(): log.error("topic %s: got error %s", topic, error) else: log.info("topic %s: got message", topic) + ints = int4.iter_unpack(message.value()) + key = topic[len(binary_topic_prefix):] + func = funcs[key] + json_data = json.dumps(func(*ints)) + producer.produce(json_topic_prefix + key, json_data) diff --git a/gcn_classic_to_json/metrics.py b/gcn_classic_to_json/metrics.py index 003d406..290f477 100644 --- a/gcn_classic_to_json/metrics.py +++ b/gcn_classic_to_json/metrics.py @@ -7,3 +7,17 @@ # """Prometheus metrics.""" import prometheus_client + +received = prometheus_client.Counter( + "received", + "Kafka messages received", + labelnames=["topic", "partition"], + namespace=__package__, +) + +delivered = prometheus_client.Counter( + "delivered", + "Kafka messages delivered", + labelnames=["topic", "partition", "successful"], + namespace=__package__, +) diff --git a/gcn_classic_to_json/notices.py b/gcn_classic_to_json/notices.py new file mode 100644 index 0000000..d1f9e6e --- /dev/null +++ b/gcn_classic_to_json/notices.py @@ -0,0 +1,10 @@ +""" +Translation from binary formats to JSON. + +See https://gcn.gsfc.nasa.gov/sock_pkt_def_doc.html, +https://github.com/nasa-gcn/gcn-schema +""" + + +def SWIFT_BAT_GRB_POS_ACK(bin): + return dict(ra=1e4 * bin[7], dec=1e4 * bin[8])