From a7cf73349b9a880078e329f2a2b550fa71dd29ba Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 27 Mar 2024 13:42:46 -0700 Subject: [PATCH] Add group presence recording. --- src/ingest.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/ingest.py b/src/ingest.py index 08a4896..e2edd90 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -22,10 +22,12 @@ """ Service to ingest images or LFA objects into per-bucket Butler repos. """ +import json import os import socket import time +import astropy.io.fits import requests from lsst.daf.butler import Butler from lsst.obs.base import DefineVisitsTask, RawIngestTask @@ -50,6 +52,7 @@ butler_repo = os.environ["BUTLER_REPO"] webhook_uri = os.environ.get("WEBHOOK_URI", None) is_lfa = "rubinobs-lfa-" in bucket +group_lifetime = int(os.environ.get("GROUP_LIFETIME", 86400)) worker_name = socket.gethostname() worker_queue = f"WORKER:{bucket}:{worker_name}" @@ -134,6 +137,42 @@ def on_metadata_failure(dataset, exc): pipe.execute() +def record_groups(resources: list[ResourcePath]) -> None: + """Record the group ids from received FITS files in Redis. + + Parameters + ---------- + resources: `list` [`ResourcePath`] + The resources to record group ids from. + """ + + global r, group_lifetime, logger + with r.pipeline() as pipe: + for res in resources: + json_file = res.updatedExtension("json") + header = {} + try: + with json_file.open("rb") as f: + header = json.load(f) + except Exception: + try: + with res.open("rb") as f: + header = astropy.io.fits.open(f)[0].header + except Exception: + logger.exception("Error reading group for %s", res) + try: + instrument = header["INSTRUME"] + groupid = header["GROUPID"] + snap_number = int(header["CURINDEX"]) - 1 + detector = header["RAFTBAY"] + "_" + header["CCDSLOT"] + key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}" + pipe.set(key, str(res)) + pipe.expire(key, group_lifetime) + except Exception: + logger.exception("Error reading group for %s", res) + pipe.execute() + + def main(): """Ingest FITS files from a Redis queue.""" logger.info("Initializing Butler from %s", butler_repo) @@ -162,12 +201,17 @@ def main(): # Ingest if we have resources if resources: + + if not is_lfa: + record_groups(resources) + logger.info("Ingesting %s", resources) refs = None try: refs = ingester.run(resources) except Exception: logger.exception("Error while ingesting %s", resources) + continue # Define visits if we ingested anything if not is_lfa and refs: