Skip to content

Commit

Permalink
Add group presence recording.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed Apr 1, 2024
1 parent cf8669c commit a7cf733
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
"""
Service to ingest images or LFA objects into per-bucket Butler repos.
"""
import json
import os
import socket
import time

import astropy.io.fits
import requests
from lsst.daf.butler import Butler
from lsst.obs.base import DefineVisitsTask, RawIngestTask
Expand All @@ -50,6 +52,7 @@
butler_repo = os.environ["BUTLER_REPO"]
webhook_uri = os.environ.get("WEBHOOK_URI", None)
is_lfa = "rubinobs-lfa-" in bucket
group_lifetime = int(os.environ.get("GROUP_LIFETIME", 86400))

worker_name = socket.gethostname()
worker_queue = f"WORKER:{bucket}:{worker_name}"
Expand Down Expand Up @@ -134,6 +137,42 @@ def on_metadata_failure(dataset, exc):
pipe.execute()


def record_groups(resources: list[ResourcePath]) -> None:
"""Record the group ids from received FITS files in Redis.
Parameters
----------
resources: `list` [`ResourcePath`]
The resources to record group ids from.
"""

global r, group_lifetime, logger
with r.pipeline() as pipe:
for res in resources:
json_file = res.updatedExtension("json")
header = {}
try:
with json_file.open("rb") as f:
header = json.load(f)
except Exception:
try:
with res.open("rb") as f:
header = astropy.io.fits.open(f)[0].header
except Exception:
logger.exception("Error reading group for %s", res)
try:
instrument = header["INSTRUME"]
groupid = header["GROUPID"]
snap_number = int(header["CURINDEX"]) - 1
detector = header["RAFTBAY"] + "_" + header["CCDSLOT"]
key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}"
pipe.set(key, str(res))
pipe.expire(key, group_lifetime)
except Exception:
logger.exception("Error reading group for %s", res)
pipe.execute()


def main():
"""Ingest FITS files from a Redis queue."""
logger.info("Initializing Butler from %s", butler_repo)
Expand Down Expand Up @@ -162,12 +201,17 @@ def main():

# Ingest if we have resources
if resources:

if not is_lfa:
record_groups(resources)

logger.info("Ingesting %s", resources)
refs = None
try:
refs = ingester.run(resources)
except Exception:
logger.exception("Error while ingesting %s", resources)
continue

# Define visits if we ingested anything
if not is_lfa and refs:
Expand Down

0 comments on commit a7cf733

Please sign in to comment.