Skip to content

Commit

Permalink
Merge pull request #53 from lsst-dm/tickets/DM-43558
Browse files Browse the repository at this point in the history
DM-43558: Ingest LFA data.
  • Loading branch information
ktlim authored Apr 18, 2024
2 parents 88ae13f + cf8669c commit 9699666
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 131 deletions.
4 changes: 3 additions & 1 deletion Dockerfile.enqueue
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
FROM python:3.11
RUN pip install redis gunicorn flask
WORKDIR /enqueue
COPY src/enqueue.py src/exposure_info.py src/utils.py /enqueue/
COPY src/enqueue.py src/info.py src/utils.py /enqueue/
# environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD NOTIFICATION_SECRET
# optional:
# DATASET_REGEXP
ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "enqueue:app" ]
10 changes: 5 additions & 5 deletions Dockerfile.ingest
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@

# Dockerfile for ingest service.

ARG RUBINENV_VERSION=7.0.1
ARG RUBINENV_VERSION=8.0.0
FROM lsstsqre/newinstall:${RUBINENV_VERSION}
ARG OBS_LSST_VERSION
ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2023_41}
ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_12}
USER lsst
RUN source loadLSST.bash && mamba install redis-py rucio-clients
RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst
COPY src/ingest.py src/exposure_info.py src/utils.py src/rucio_interface.py ./ingest/
RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" lsst_obs
COPY src/ingest.py src/info.py src/utils.py src/rucio_interface.py ./ingest/
# Environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD BUCKET BUTLER_REPO
# For Rucio (all must be set if RUCIO_RSE is set):
# RUCIO_RSE RUCIO_DTN RUCIO_SCOPE RUCIO_CONFIG
# Optional:
# WEBHOOK_URI
ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ingest/ingest.py" ]
ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup lsst_obs; python ingest/ingest.py" ]
42 changes: 23 additions & 19 deletions src/enqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
Enqueue service to post notifications to per-bucket queues.
"""
import os
import re
import time
import urllib.parse

import redis
from flask import Flask, request

from exposure_info import ExposureInfo
from info import ExposureInfo, Info
from utils import setup_logging, setup_redis

FILE_RETENTION: float = 7 * 24 * 60 * 60
Expand All @@ -38,30 +39,32 @@
logger = setup_logging(__name__)
r = setup_redis()
notification_secret = os.environ["NOTIFICATION_SECRET"]
regexp = re.compile(os.environ.get("DATASET_REGEXP", r"fits$"))


def enqueue_objects(objects):
"""Enqueue FITS objects onto per-bucket queues.
"""Enqueue objects onto per-bucket queues.
Compute the `ExposureInfo` for each FITS object and return the list.
Compute the `Info` for each object with a selected extension and return
the list.
Parameters
----------
objects: `list` [`str`]
Returns
-------
info_list: `list` [`ExposureInfo`]
info_list: `list` [`Info`]
"""
info_list = []
# Use a pipeline for efficiency.
with r.pipeline() as pipe:
for o in objects:
if o.endswith(".fits"):
e = ExposureInfo(o)
pipe.lpush(f"QUEUE:{e.bucket}", o)
logger.info("Enqueued %s to %s", o, e.bucket)
info_list.append(e)
if regexp.search(o):
info = Info.from_path(o)
pipe.lpush(f"QUEUE:{info.bucket}", o)
logger.info("Enqueued %s to %s", o, info.bucket)
info_list.append(info)
pipe.execute()
return info_list

Expand All @@ -71,18 +74,19 @@ def update_stats(info_list):
Parameters
----------
info_list: `list` [`ExposureInfo`]
info_list: `list` [`Info`]
"""
max_seqnum = {}
with r.pipeline() as pipe:
max_seqnum = {}
for e in info_list:
pipe.hincrby(f"REC:{e.bucket}", e.obs_day, 1)
bucket_instrument = f"{e.bucket}:{e.instrument}"
pipe.hincrby(f"RECINSTR:{bucket_instrument}", e.obs_day, 1)
pipe.hset(f"FILE:{e.path}", "recv_time", str(time.time()))
pipe.expire(f"FILE:{e.path}", FILE_RETENTION)
seqnum_key = f"MAXSEQ:{bucket_instrument}:{e.obs_day}"
max_seqnum[seqnum_key] = max(int(e.seq_num), max_seqnum.get(seqnum_key, 0))
for info in info_list:
pipe.hincrby(f"REC:{info.bucket}", info.obs_day, 1)
bucket_instrument = f"{info.bucket}:{info.instrument}"
pipe.hincrby(f"RECINSTR:{bucket_instrument}", info.obs_day, 1)
pipe.hset(f"FILE:{info.path}", "recv_time", str(time.time()))
pipe.expire(f"FILE:{info.path}", FILE_RETENTION)
if isinstance(info, ExposureInfo):
seqnum_key = f"MAXSEQ:{bucket_instrument}:{info.obs_day}"
max_seqnum[seqnum_key] = max(int(info.seq_num), max_seqnum.get(seqnum_key, 0))
pipe.execute()

for seqnum_key in max_seqnum:
Expand Down
67 changes: 0 additions & 67 deletions src/exposure_info.py

This file was deleted.

154 changes: 154 additions & 0 deletions src/info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# This file is part of embargo_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
from dataclasses import dataclass
from typing import Self

__all__ = ("Info", "ExposureInfo", "LfaInfo")

logger = logging.getLogger(__name__)


@dataclass
class Info:
"""Base class for information extracted from notification messages."""

path: str
"""Path component of the S3 URL.
"""

bucket: str
"""Bucket component of the S3 URL.
"""

instrument: str
"""Instrument name.
"""

filename: str
"""Filename component of the S3 URL.
"""

obs_day: str
"""Observation day (in timezone UTC-12).
"""

@staticmethod
def from_path(url: str) -> Self:
"""Create an `Info` of the proper subclass from an S3 URL.
Parameters
----------
url: `str`
S3 URL (including bucket).
Returns
-------
info: `Info`
Either an `LfaInfo` or an `ExposureInfo` as appropriate.
"""

if url.startswith("s3://"):
url = url[len("s3://") :]
if "rubinobs-lfa-" in url:
return LfaInfo(url)
else:
return ExposureInfo(url)


@dataclass
class ExposureInfo(Info):
"""Class used to extract exposure information from notification messages.
Parameters
----------
path: `str`
Path portion of S3 URL (after bucket).
"""

exp_id: str
"""Exposure ID.
"""

instrument_code: str
"""Instrument code (two characters).
"""

controller: str
"""Controller code (one character).
"""

seq_num: str
"""Sequence number within an observation day.
"""

def __init__(self, path):
try:
self.path = path
(
self.bucket,
self.instrument,
self.obs_day,
self.exp_id,
self.filename,
) = path.split("/")
(
self.instrument_code,
self.controller,
obs_day,
self.seq_num,
) = self.exp_id.split("_")
if obs_day != self.obs_day:
logger.warn("Mismatched observation dates: %s", path)
except Exception:
logger.exception("Unable to parse: %s", path)
raise


@dataclass
class LfaInfo(Info):
"""Class used to extract LFA information from notification messages.
Parameters
----------
path: `str`
Path portion of S3 URL (after bucket).
"""

def __init__(self, path):
try:
self.path = path
components = path.split("/")
if len(components) == 8:
self.bucket, csc, generator, year, month, day, directory, self.filename = components
self.instrument = f"{csc}/{generator}"
elif len(components) == 7:
self.bucket, csc, generator, year, month, day, self.filename = components
self.instrument = f"{csc}/{generator}"
elif len(components) == 6:
self.bucket, self.instrument, year, month, day, self.filename = components
else:
raise ValueError(f"Unrecognized number of components: {len(components)}")
self.obs_day = f"{year}{month}{day}"
except Exception:
logger.exception("Unable to parse: %s", path)
raise
Loading

0 comments on commit 9699666

Please sign in to comment.