Skip to content

Commit

Permalink
Merge pull request #54 from lsst-dm/tickets/DM-39022
Browse files Browse the repository at this point in the history
DM-39022: Add group name presence server.
  • Loading branch information
ktlim authored May 28, 2024
2 parents 9699666 + 5a66a64 commit 41deacf
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 3 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-manually.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ on:
rubinenvVersion:
description: 'rubin-env version'
required: true
default: '7.0.1'
default: '8.0.0'
obsLsstVersion:
description: 'Science Pipelines release'
required: true
default: 'w_2023_41'
default: 'w_2024_12'


env:
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
ENQUEUE_NAME: embargo-butler-enqueue
INGEST_NAME: embargo-butler-ingest
IDLE_NAME: embargo-butler-idle
PRESENCE_NAME: embargo-butler-presence

jobs:
push:
Expand All @@ -20,7 +21,7 @@ jobs:
contents: read
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Build enqueue image
run: |
Expand All @@ -43,6 +44,13 @@ jobs:
--tag $IDLE_NAME \
--label "runnumber=${GITHUB_RUN_ID}"
- name: Build presence image
run: |
docker build . \
-f Dockerfile.presence \
--tag $PRESENCE_NAME \
--label "runnumber=${GITHUB_RUN_ID}"
- name: Log in to GitHub Container Registry
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $ --password-stdin

Expand All @@ -51,6 +59,7 @@ jobs:
ENQUEUE_ID=ghcr.io/${{ github.repository_owner }}/$ENQUEUE_NAME
INGEST_ID=ghcr.io/${{ github.repository_owner }}/$INGEST_NAME
IDLE_ID=ghcr.io/${{ github.repository_owner }}/$IDLE_NAME
PRESENCE_ID=ghcr.io/${{ github.repository_owner }}/$PRESENCE_NAME
if [[ "${{ github.ref }}" == "refs/pull/"* ]]; then
VERSION=$(echo "${{ github.head_ref }}" | sed -e 's|.*/||')
Expand All @@ -62,10 +71,13 @@ jobs:
echo ENQUEUE_ID=$ENQUEUE_ID
echo INGEST_ID=$INGEST_ID
echo IDLE_ID=$IDLE_ID
echo PRESENCE_ID=$PRESENCE_ID
echo VERSION=$VERSION
docker tag $ENQUEUE_NAME $ENQUEUE_ID:$VERSION
docker push $ENQUEUE_ID:$VERSION
docker tag $INGEST_NAME $INGEST_ID:$VERSION
docker push $INGEST_ID:$VERSION
docker tag $IDLE_NAME $IDLE_ID:$VERSION
docker push $IDLE_ID:$VERSION
docker tag $PRESENCE_NAME $PRESENCE_ID:$VERSION
docker push $PRESENCE_ID:$VERSION
32 changes: 32 additions & 0 deletions Dockerfile.presence
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# This file is part of embargo_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Dockerfile for presence service.

FROM python:3.11
RUN pip install redis gunicorn flask
WORKDIR /presence
COPY src/presence.py src/utils.py /presence/
# environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD
# optional:
# DELETE_SEEN
ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "presence:app" ]
46 changes: 46 additions & 0 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
"""
Service to ingest images or LFA objects into per-bucket Butler repos.
"""
import json
import os
import socket
import time

import astropy.io.fits
import requests
from lsst.daf.butler import Butler
from lsst.obs.base import DefineVisitsTask, RawIngestTask
Expand All @@ -50,6 +52,7 @@
butler_repo = os.environ["BUTLER_REPO"]
webhook_uri = os.environ.get("WEBHOOK_URI", None)
is_lfa = "rubinobs-lfa-" in bucket
group_lifetime = int(os.environ.get("GROUP_LIFETIME", 86400))

worker_name = socket.gethostname()
worker_queue = f"WORKER:{bucket}:{worker_name}"
Expand Down Expand Up @@ -134,6 +137,44 @@ def on_metadata_failure(dataset, exc):
pipe.execute()


def record_groups(resources: list[ResourcePath]) -> None:
"""Record the group ids from received FITS files in Redis.
Parameters
----------
resources: `list` [`ResourcePath`]
The resources to record group ids from.
"""

global r, group_lifetime, logger
with r.pipeline() as pipe:
for res in resources:
if not res.exists():
continue
json_file = res.updatedExtension("json")
header = {}
try:
with json_file.open("rb") as f:
header = json.load(f)
except Exception:
try:
with res.open("rb") as f:
header = astropy.io.fits.open(f)[0].header
except Exception:
logger.exception("Error reading header for %s", res)
try:
instrument = header["INSTRUME"]
groupid = header["GROUPID"]
snap_number = int(header["CURINDEX"]) - 1
detector = header["RAFTBAY"] + "_" + header["CCDSLOT"]
key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}"
pipe.set(key, str(res))
pipe.expire(key, group_lifetime)
except Exception:
logger.exception("Error reading group for %s", res)
pipe.execute()


def main():
"""Ingest FITS files from a Redis queue."""
logger.info("Initializing Butler from %s", butler_repo)
Expand Down Expand Up @@ -162,12 +203,17 @@ def main():

# Ingest if we have resources
if resources:

if not is_lfa:
record_groups(resources)

logger.info("Ingesting %s", resources)
refs = None
try:
refs = ingester.run(resources)
except Exception:
logger.exception("Error while ingesting %s", resources)
continue

# Define visits if we ingested anything
if not is_lfa and refs:
Expand Down
78 changes: 78 additions & 0 deletions src/presence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# This file is part of embargo_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
Presence service to translate group names to image URIs.
"""
import os
import re

from flask import Flask

from utils import setup_logging, setup_redis

logger = setup_logging(__name__)
r = setup_redis()

# Delete image key when seen, before it expires, to save space
delete_seen = os.environ.get("DELETE_SEEN") is not None

app = Flask(__name__)


@app.get("/presence/<instrument>/<group_name>/<int:snap_index>/<detector_name>")
def presence(instrument: str, group_name: str, snap_index: int, detector_name: str) -> dict | tuple:
"""Return the presence and URI of an image matching the parameters.
Parameters
----------
instrument: `str`
Name of the instrument taking the image.
group_name: `str`
Name of the group (from the GROUPID FITS header).
snap_index: `int`
Number of the snap (zero-based).
detector_name: `str`
Name of the detector ("RNN_SNN").
Returns
-------
json: `dict`
JSON with "error", "present", "uri", and/or "message" keys.
"""

try:
if instrument not in ("LATISS", "LSSTComCam", "LSSTComCamSim", "LSSTCam", "LSST-TS8"):
return ({"error": True, "message": f"Unknown instrument {instrument}"}, 400)
if not re.match(r"R\d\d_S\d\d", detector_name):
return ({"error": True, "message": f"Unrecognized detector name {detector_name}"}, 400)
key = f"GROUP:{instrument}:{group_name}:{snap_index}:{detector_name}"
result = r.get(key)
if result:
logger.info(f"Found key {key}")
if delete_seen:
r.delete(key)
return {"error": False, "present": True, "uri": result.decode()}
else:
logger.debug(f"No key {key}")
return {"error": False, "present": False}
except Exception as e:
return ({"error": True, "message": str(e)}, 500)

0 comments on commit 41deacf

Please sign in to comment.