From 060181b3a47579fffe0736d4e4db2031cf74bc30 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Fri, 6 Sep 2024 12:34:06 -0700 Subject: [PATCH 1/2] Improve exception handling. Retry one by one if a batch fails to ingest or have visits defined. --- src/ingest.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/ingest.py b/src/ingest.py index 9fe2e29..ccb8909 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -222,17 +222,30 @@ def main(): try: refs = ingester.run(resources) except Exception: - logger.exception("Error while ingesting %s", resources) - continue + logger.exception("Error while ingesting %s, retrying one by one", resources) + refs = [] + for resource in resources: + try: + refs.append(ingester.run([resource])) + except Exception: + logger.exception("Error while ingesting %s", resource) + info = Info.from_path(resource.geturl()) + r.lrem(worker_queue, 0, info.path) # Define visits if we ingested anything if not is_lfa and refs: + ids = [ref.dataId for ref in refs] try: - ids = [ref.dataId for ref in refs] visit_definer.run(ids, incremental=True) logger.info("Defined visits for %s", ids) except Exception: - logger.exception("Error while defining visits for %s", refs) + logger.exception("Error while defining visits for %s, retrying one by one", refs) + for id in ids: + try: + visit_definer.run([id], incremental=True) + logger.info("Defined visit for %s", id) + except Exception: + logger.exception("Error while defining visits for %s", id) if not is_lfa and rucio_rse: # Register with Rucio if we ingested anything try: From 27b6e4b92a31096ce4dfb29510a5c1b8609f0dea Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Fri, 6 Sep 2024 16:45:54 -0700 Subject: [PATCH 2/2] Protect against missing GROUPID header. --- src/ingest.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/ingest.py b/src/ingest.py index ccb8909..feb5660 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -174,14 +174,15 @@ def record_groups(resources: list[ResourcePath]) -> None: instrument = "LSSTCam" case "latiss": instrument = "LATISS" - groupid = header["GROUPID"] - snap_number = int(header["CURINDEX"]) - 1 - detector = header["RAFTBAY"] + "_" + header["CCDSLOT"] - key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}" - pipe.set(key, str(res)) - pipe.expire(key, group_lifetime) + if "GROUPID" in header: + groupid = header["GROUPID"] + snap_number = int(header["CURINDEX"]) - 1 + detector = header["RAFTBAY"] + "_" + header["CCDSLOT"] + key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}" + pipe.set(key, str(res)) + pipe.expire(key, group_lifetime) except Exception: - logger.exception("Error reading group for %s", res) + logger.exception("Error reading snap/detector for %s", res) pipe.execute()