diff --git a/Dockerfile.enqueue b/Dockerfile.enqueue
index 1ce3469..b678b04 100644
--- a/Dockerfile.enqueue
+++ b/Dockerfile.enqueue
@@ -24,7 +24,9 @@
FROM python:3.11
RUN pip install redis gunicorn flask
WORKDIR /enqueue
-COPY src/enqueue.py src/exposure_info.py src/utils.py /enqueue/
+COPY src/enqueue.py src/info.py src/utils.py /enqueue/
# environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD NOTIFICATION_SECRET
+# optional:
+# DATASET_REGEXP
ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "enqueue:app" ]
diff --git a/Dockerfile.ingest b/Dockerfile.ingest
index 859f912..289177d 100644
--- a/Dockerfile.ingest
+++ b/Dockerfile.ingest
@@ -21,18 +21,18 @@
# Dockerfile for ingest service.
-ARG RUBINENV_VERSION=7.0.1
+ARG RUBINENV_VERSION=8.0.0
FROM lsstsqre/newinstall:${RUBINENV_VERSION}
ARG OBS_LSST_VERSION
-ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2023_41}
+ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_12}
USER lsst
RUN source loadLSST.bash && mamba install redis-py rucio-clients
-RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst
-COPY src/ingest.py src/exposure_info.py src/utils.py src/rucio_interface.py ./ingest/
+RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" lsst_obs
+COPY src/ingest.py src/info.py src/utils.py src/rucio_interface.py ./ingest/
# Environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD BUCKET BUTLER_REPO
# For Rucio (all must be set if RUCIO_RSE is set):
# RUCIO_RSE RUCIO_DTN RUCIO_SCOPE RUCIO_CONFIG
# Optional:
# WEBHOOK_URI
-ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ingest/ingest.py" ]
+ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup lsst_obs; python ingest/ingest.py" ]
diff --git a/src/enqueue.py b/src/enqueue.py
index 12a9ac2..a44e6c8 100644
--- a/src/enqueue.py
+++ b/src/enqueue.py
@@ -23,13 +23,14 @@
Enqueue service to post notifications to per-bucket queues.
"""
import os
+import re
import time
import urllib.parse
import redis
from flask import Flask, request
-from exposure_info import ExposureInfo
+from info import ExposureInfo, Info
from utils import setup_logging, setup_redis
FILE_RETENTION: float = 7 * 24 * 60 * 60
@@ -38,12 +39,14 @@
logger = setup_logging(__name__)
r = setup_redis()
notification_secret = os.environ["NOTIFICATION_SECRET"]
+regexp = re.compile(os.environ.get("DATASET_REGEXP", r"fits$"))
def enqueue_objects(objects):
- """Enqueue FITS objects onto per-bucket queues.
+ """Enqueue objects onto per-bucket queues.
- Compute the `ExposureInfo` for each FITS object and return the list.
+ Compute the `Info` for each object with a selected extension and return
+ the list.
Parameters
----------
@@ -51,17 +54,17 @@ def enqueue_objects(objects):
Returns
-------
- info_list: `list` [`ExposureInfo`]
+ info_list: `list` [`Info`]
"""
info_list = []
# Use a pipeline for efficiency.
with r.pipeline() as pipe:
for o in objects:
- if o.endswith(".fits"):
- e = ExposureInfo(o)
- pipe.lpush(f"QUEUE:{e.bucket}", o)
- logger.info("Enqueued %s to %s", o, e.bucket)
- info_list.append(e)
+ if regexp.search(o):
+ info = Info.from_path(o)
+ pipe.lpush(f"QUEUE:{info.bucket}", o)
+ logger.info("Enqueued %s to %s", o, info.bucket)
+ info_list.append(info)
pipe.execute()
return info_list
@@ -71,18 +74,19 @@ def update_stats(info_list):
Parameters
----------
- info_list: `list` [`ExposureInfo`]
+ info_list: `list` [`Info`]
"""
+ max_seqnum = {}
with r.pipeline() as pipe:
- max_seqnum = {}
- for e in info_list:
- pipe.hincrby(f"REC:{e.bucket}", e.obs_day, 1)
- bucket_instrument = f"{e.bucket}:{e.instrument}"
- pipe.hincrby(f"RECINSTR:{bucket_instrument}", e.obs_day, 1)
- pipe.hset(f"FILE:{e.path}", "recv_time", str(time.time()))
- pipe.expire(f"FILE:{e.path}", FILE_RETENTION)
- seqnum_key = f"MAXSEQ:{bucket_instrument}:{e.obs_day}"
- max_seqnum[seqnum_key] = max(int(e.seq_num), max_seqnum.get(seqnum_key, 0))
+ for info in info_list:
+ pipe.hincrby(f"REC:{info.bucket}", info.obs_day, 1)
+ bucket_instrument = f"{info.bucket}:{info.instrument}"
+ pipe.hincrby(f"RECINSTR:{bucket_instrument}", info.obs_day, 1)
+ pipe.hset(f"FILE:{info.path}", "recv_time", str(time.time()))
+ pipe.expire(f"FILE:{info.path}", FILE_RETENTION)
+ if isinstance(info, ExposureInfo):
+ seqnum_key = f"MAXSEQ:{bucket_instrument}:{info.obs_day}"
+ max_seqnum[seqnum_key] = max(int(info.seq_num), max_seqnum.get(seqnum_key, 0))
pipe.execute()
for seqnum_key in max_seqnum:
diff --git a/src/exposure_info.py b/src/exposure_info.py
deleted file mode 100644
index f4ce41e..0000000
--- a/src/exposure_info.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# This file is part of embargo_butler.
-#
-# Developed for the LSST Data Management System.
-# This product includes software developed by the LSST Project
-# (http://www.lsst.org).
-# See the COPYRIGHT file at the top-level directory of this distribution
-# for details of code ownership.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-"""
-ExposureInfo class used to extract information from notification messages.
-"""
-import logging
-from dataclasses import dataclass
-
-__all__ = ("ExposureInfo",)
-
-logger = logging.getLogger(__name__)
-
-
-@dataclass
-class ExposureInfo:
- path: str
- bucket: str
- instrument: str
- filename: str
- exp_id: str
- instrument_code: str
- controller: str
- obs_day: str
- seq_num: str
-
- def __init__(self, path):
- try:
- if path.startswith("s3://"):
- path = path[len("s3://") :]
- self.path = path
- (
- self.bucket,
- self.instrument,
- self.obs_day,
- self.exp_id,
- self.filename,
- ) = path.split("/")
- (
- self.instrument_code,
- self.controller,
- obs_day,
- self.seq_num,
- ) = self.exp_id.split("_")
- if obs_day != self.obs_day:
- logger.warn("Mismatched observation dates: %s", path)
- except Exception:
- logger.exception("Unable to parse: %s", path)
- raise
diff --git a/src/info.py b/src/info.py
new file mode 100644
index 0000000..1867611
--- /dev/null
+++ b/src/info.py
@@ -0,0 +1,154 @@
+# This file is part of embargo_butler.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import logging
+from dataclasses import dataclass
+from typing import Self
+
+__all__ = ("Info", "ExposureInfo", "LfaInfo")
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class Info:
+ """Base class for information extracted from notification messages."""
+
+ path: str
+ """Path component of the S3 URL.
+ """
+
+ bucket: str
+ """Bucket component of the S3 URL.
+ """
+
+ instrument: str
+ """Instrument name.
+ """
+
+ filename: str
+ """Filename component of the S3 URL.
+ """
+
+ obs_day: str
+ """Observation day (in timezone UTC-12).
+ """
+
+ @staticmethod
+ def from_path(url: str) -> Self:
+ """Create an `Info` of the proper subclass from an S3 URL.
+
+ Parameters
+ ----------
+ url: `str`
+ S3 URL (including bucket).
+
+ Returns
+ -------
+ info: `Info`
+ Either an `LfaInfo` or an `ExposureInfo` as appropriate.
+ """
+
+ if url.startswith("s3://"):
+ url = url[len("s3://") :]
+ if "rubinobs-lfa-" in url:
+ return LfaInfo(url)
+ else:
+ return ExposureInfo(url)
+
+
+@dataclass
+class ExposureInfo(Info):
+ """Class used to extract exposure information from notification messages.
+
+ Parameters
+ ----------
+ path: `str`
+ Path portion of S3 URL (after bucket).
+ """
+
+ exp_id: str
+ """Exposure ID.
+ """
+
+ instrument_code: str
+ """Instrument code (two characters).
+ """
+
+ controller: str
+ """Controller code (one character).
+ """
+
+ seq_num: str
+ """Sequence number within an observation day.
+ """
+
+ def __init__(self, path):
+ try:
+ self.path = path
+ (
+ self.bucket,
+ self.instrument,
+ self.obs_day,
+ self.exp_id,
+ self.filename,
+ ) = path.split("/")
+ (
+ self.instrument_code,
+ self.controller,
+ obs_day,
+ self.seq_num,
+ ) = self.exp_id.split("_")
+ if obs_day != self.obs_day:
+ logger.warn("Mismatched observation dates: %s", path)
+ except Exception:
+ logger.exception("Unable to parse: %s", path)
+ raise
+
+
+@dataclass
+class LfaInfo(Info):
+ """Class used to extract LFA information from notification messages.
+
+ Parameters
+ ----------
+ path: `str`
+ Path portion of S3 URL (after bucket).
+ """
+
+ def __init__(self, path):
+ try:
+ self.path = path
+ components = path.split("/")
+ if len(components) == 8:
+ self.bucket, csc, generator, year, month, day, directory, self.filename = components
+ self.instrument = f"{csc}/{generator}"
+ elif len(components) == 7:
+ self.bucket, csc, generator, year, month, day, self.filename = components
+ self.instrument = f"{csc}/{generator}"
+ elif len(components) == 6:
+ self.bucket, self.instrument, year, month, day, self.filename = components
+ else:
+ raise ValueError(f"Unrecognized number of components: {len(components)}")
+ self.obs_day = f"{year}{month}{day}"
+ except Exception:
+ logger.exception("Unable to parse: %s", path)
+ raise
diff --git a/src/ingest.py b/src/ingest.py
index 1e475d7..08a4896 100644
--- a/src/ingest.py
+++ b/src/ingest.py
@@ -20,7 +20,7 @@
# along with this program. If not, see .
"""
-Service to ingest images into per-bucket Butler repos.
+Service to ingest images or LFA objects into per-bucket Butler repos.
"""
import os
import socket
@@ -31,7 +31,7 @@
from lsst.obs.base import DefineVisitsTask, RawIngestTask
from lsst.resources import ResourcePath
-from exposure_info import ExposureInfo
+from info import Info
from rucio_interface import RucioInterface
from utils import setup_logging, setup_redis
@@ -44,19 +44,23 @@
logger = setup_logging(__name__)
r = setup_redis()
bucket = os.environ["BUCKET"]
+if bucket.startswith("rubin:"):
+ os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1"
redis_queue = f"QUEUE:{bucket}"
butler_repo = os.environ["BUTLER_REPO"]
webhook_uri = os.environ.get("WEBHOOK_URI", None)
+is_lfa = "rubinobs-lfa-" in bucket
worker_name = socket.gethostname()
worker_queue = f"WORKER:{bucket}:{worker_name}"
-rucio_rse = os.environ.get("RUCIO_RSE", None)
-if rucio_rse:
- dtn_url = os.environ["RUCIO_DTN"]
- if not dtn_url.endswith("/"):
- dtn_url += "/"
- rucio_interface = RucioInterface(rucio_rse, dtn_url, bucket, os.environ["RUCIO_SCOPE"])
+if not is_lfa:
+ rucio_rse = os.environ.get("RUCIO_RSE", None)
+ if rucio_rse:
+ dtn_url = os.environ["RUCIO_DTN"]
+ if not dtn_url.endswith("/"):
+ dtn_url += "/"
+ rucio_interface = RucioInterface(rucio_rse, dtn_url, bucket, os.environ["RUCIO_SCOPE"])
def on_success(datasets):
@@ -71,15 +75,15 @@ def on_success(datasets):
"""
for dataset in datasets:
logger.info("Ingested %s", dataset)
- e = ExposureInfo(dataset.path.geturl())
- logger.debug("Exposure %s", e)
+ info = Info.from_path(dataset.path.geturl())
+ logger.debug("%s", info)
with r.pipeline() as pipe:
- pipe.lrem(worker_queue, 0, e.path)
- pipe.hset(f"FILE:{e.path}", "ingest_time", str(time.time()))
- pipe.hincrby(f"INGEST:{e.bucket}:{e.instrument}", f"{e.obs_day}", 1)
+ pipe.lrem(worker_queue, 0, info.path)
+ pipe.hset(f"FILE:{info.path}", "ingest_time", str(time.time()))
+ pipe.hincrby(f"INGEST:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
pipe.execute()
if webhook_uri:
- resp = requests.post(webhook_uri, json=e.__dict__, timeout=0.5)
+ resp = requests.post(webhook_uri, json=info.__dict__, timeout=0.5)
logger.info("Webhook response: %s", resp)
@@ -96,15 +100,15 @@ def on_ingest_failure(dataset, exc):
Exception raised by the ingest failure.
"""
logger.error("Failed to ingest %s: %s", dataset, exc)
- e = ExposureInfo(dataset.files[0].filename.geturl())
- logger.debug("Exposure %s", e)
+ info = Info.from_path(dataset.files[0].filename.geturl())
+ logger.debug("%s", info)
with r.pipeline() as pipe:
- pipe.hincrby(f"FAIL:{e.bucket}:{e.instrument}", f"{e.obs_day}", 1)
- pipe.hset(f"FILE:{e.path}", "ing_fail_exc", str(exc))
- pipe.hincrby(f"FILE:{e.path}", "ing_fail_count", 1)
+ pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
+ pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc))
+ pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1)
pipe.execute()
- if int(r.hget(f"FILE:{e.path}", "ing_fail_count")) >= MAX_FAILURES:
- r.lrem(worker_queue, 0, e.path)
+ if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= MAX_FAILURES:
+ r.lrem(worker_queue, 0, info.path)
def on_metadata_failure(dataset, exc):
@@ -121,12 +125,12 @@ def on_metadata_failure(dataset, exc):
"""
logger.error("Failed to translate metadata for %s: %s", dataset, exc)
- e = ExposureInfo(dataset.geturl())
- logger.debug("Exposure %s", e)
+ info = Info.from_path(dataset.geturl())
+ logger.debug("%s", info)
with r.pipeline() as pipe:
- pipe.hincrby(f"FAIL:{e.bucket}:{e.instrument}", f"{e.obs_day}", 1)
- pipe.hset(f"FILE:{e.path}", "md_fail_exc", str(exc))
- pipe.lrem(worker_queue, 0, e.path)
+ pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
+ pipe.hset(f"FILE:{info.path}", "md_fail_exc", str(exc))
+ pipe.lrem(worker_queue, 0, info.path)
pipe.execute()
@@ -144,22 +148,17 @@ def main():
on_metadata_failure=on_metadata_failure,
)
- define_visits_config = DefineVisitsTask.ConfigClass()
- define_visits_config.groupExposures = "one-to-one"
- visit_definer = DefineVisitsTask(config=define_visits_config, butler=butler)
+ if not is_lfa:
+ define_visits_config = DefineVisitsTask.ConfigClass()
+ define_visits_config.groupExposures = "one-to-one"
+ visit_definer = DefineVisitsTask(config=define_visits_config, butler=butler)
logger.info("Waiting on %s", worker_queue)
while True:
# Process any entries on the worker queue.
if r.llen(worker_queue) > 0:
blobs = r.lrange(worker_queue, 0, -1)
- resources = []
- for b in blobs:
- if b.endswith(b".fits"):
- # Should always be the case
- resources.append(ResourcePath(f"s3://{b.decode()}"))
- else:
- r.lrem(worker_queue, 0, b)
+ resources = [ResourcePath(f"s3://{b.decode()}") for b in blobs]
# Ingest if we have resources
if resources:
@@ -171,14 +170,14 @@ def main():
logger.exception("Error while ingesting %s", resources)
# Define visits if we ingested anything
- if refs:
+ if not is_lfa and refs:
try:
ids = [ref.dataId for ref in refs]
visit_definer.run(ids)
logger.info("Defined visits for %s", ids)
except Exception:
logger.exception("Error while defining visits for %s", refs)
- if rucio_rse:
+ if not is_lfa and rucio_rse:
# Register with Rucio if we ingested anything
try:
rucio_interface.register(resources)
diff --git a/src/rucio_interface.py b/src/rucio_interface.py
index fab27ab..a3e6b15 100644
--- a/src/rucio_interface.py
+++ b/src/rucio_interface.py
@@ -26,10 +26,10 @@
import time
import zlib
+import rucio.common.exception
from lsst.resources import ResourcePath
from rucio.client.didclient import DIDClient
from rucio.client.replicaclient import ReplicaClient
-import rucio.common.exception
__all__ = ["RucioInterface"]