Skip to content

Commit

Permalink
Basic implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
lpsinger committed May 15, 2024
1 parent b04338a commit c59a3ba
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# gcn-classic-to-json

A Kafka client instrumented with Prometheus.
Convert GCN Classic notices to JSON.

## Configuration

Expand Down
33 changes: 26 additions & 7 deletions gcn_classic_to_json/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,45 @@
#
# SPDX-License-Identifier: Apache-2.0
#
"""Monitor Kafka consumer connectivity."""
"""Convert GCN Classic notices to JSON."""
import json
import logging
import struct

import gcn_kafka

from . import metrics
from . import notices

log = logging.getLogger(__name__)


def stats_cb(data):
stats = json.loads(data)
for broker in stats["brokers"].values():
metrics.broker_state.labels(broker["name"]).state(broker["state"])
def kafka_delivered_cb(err, msg):
successful = not err
metrics.delivered.labels(msg.topic(), msg.partition(), successful).inc()


def run():
binary_topic_prefix = 'gcn.classic.binary.'
json_topic_prefix = 'gcn.classic.json.'
int4 = struct.Struct("!l")
funcs = {
key: value
for key, value in notices.__dict__.items()
if key.isupper()
}

log.info("Creating consumer")
config = gcn_kafka.config_from_env()
consumer = gcn_kafka.Consumer(config)

log.info("Creating producer")
config["client.id"] = __package__
config["on_delivery"] = kafka_delivered_cb
producer = gcn_kafka.Producer(config)

log.info("Subscribing")
topics = list(consumer.list_topics().topics.keys())
consumer.subscribe(topics)
consumer.subscribe([binary_topic_prefix + key for key in funcs])

log.info("Entering consume loop")
while True:
Expand All @@ -39,3 +53,8 @@ def run():
log.error("topic %s: got error %s", topic, error)
else:
log.info("topic %s: got message", topic)
ints = int4.iter_unpack(message.value())
key = topic[len(binary_topic_prefix):]
func = funcs[key]
json_data = json.dumps(func(*ints))
producer.produce(json_topic_prefix + key, json_data)
14 changes: 14 additions & 0 deletions gcn_classic_to_json/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,17 @@
#
"""Prometheus metrics."""
import prometheus_client

received = prometheus_client.Counter(
"received",
"Kafka messages received",
labelnames=["topic", "partition"],
namespace=__package__,
)

delivered = prometheus_client.Counter(
"delivered",
"Kafka messages delivered",
labelnames=["topic", "partition", "successful"],
namespace=__package__,
)
10 changes: 10 additions & 0 deletions gcn_classic_to_json/notices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
Translation from binary formats to JSON.
See https://gcn.gsfc.nasa.gov/sock_pkt_def_doc.html,
https://github.com/nasa-gcn/gcn-schema
"""


def SWIFT_BAT_GRB_POS_ACK(bin):
return dict(ra=1e4 * bin[7], dec=1e4 * bin[8])

0 comments on commit c59a3ba

Please sign in to comment.