diff --git a/src/ingest.py b/src/ingest.py index 9fe2e29..feb5660 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -174,14 +174,15 @@ def record_groups(resources: list[ResourcePath]) -> None: instrument = "LSSTCam" case "latiss": instrument = "LATISS" - groupid = header["GROUPID"] - snap_number = int(header["CURINDEX"]) - 1 - detector = header["RAFTBAY"] + "_" + header["CCDSLOT"] - key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}" - pipe.set(key, str(res)) - pipe.expire(key, group_lifetime) + if "GROUPID" in header: + groupid = header["GROUPID"] + snap_number = int(header["CURINDEX"]) - 1 + detector = header["RAFTBAY"] + "_" + header["CCDSLOT"] + key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}" + pipe.set(key, str(res)) + pipe.expire(key, group_lifetime) except Exception: - logger.exception("Error reading group for %s", res) + logger.exception("Error reading snap/detector for %s", res) pipe.execute() @@ -222,17 +223,30 @@ def main(): try: refs = ingester.run(resources) except Exception: - logger.exception("Error while ingesting %s", resources) - continue + logger.exception("Error while ingesting %s, retrying one by one", resources) + refs = [] + for resource in resources: + try: + refs.append(ingester.run([resource])) + except Exception: + logger.exception("Error while ingesting %s", resource) + info = Info.from_path(resource.geturl()) + r.lrem(worker_queue, 0, info.path) # Define visits if we ingested anything if not is_lfa and refs: + ids = [ref.dataId for ref in refs] try: - ids = [ref.dataId for ref in refs] visit_definer.run(ids, incremental=True) logger.info("Defined visits for %s", ids) except Exception: - logger.exception("Error while defining visits for %s", refs) + logger.exception("Error while defining visits for %s, retrying one by one", refs) + for id in ids: + try: + visit_definer.run([id], incremental=True) + logger.info("Defined visit for %s", id) + except Exception: + logger.exception("Error while defining visits for %s", id) if not is_lfa and rucio_rse: # Register with Rucio if we ingested anything try: