Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto triggering and forced photometry improvements #265

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1922,7 +1922,13 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
and compare_dicts(
passed_filter["auto_followup"]["data"]["payload"],
r["payload"],
ignore_keys=["priority", "start_date", "end_date", "advanced"],
ignore_keys=[
"priority",
"start_date",
"end_date",
"advanced",
"observation_choices",
],
)
is True
]
Expand Down
122 changes: 102 additions & 20 deletions kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,15 @@ def format_fp_hists(self, alert, fp_hists):
# sort by jd
fp_hists = sorted(fp_hists, key=lambda x: x["jd"])

# deduplicate by jd. We noticed in production that sometimes there are
# multiple fp_hist entries with the same jd, which is not supposed to happen
# and can affect our concurrency avoidance logic in update_fp_hists and take more space
fp_hists = [
fp_hist
for i, fp_hist in enumerate(fp_hists)
if i == 0 or fp_hist["jd"] != fp_hists[i - 1]["jd"]
]

# add the "alert_mag" field to the new fp_hist
# as well as alert_ra, alert_dec
for i, fp in enumerate(fp_hists):
Expand All @@ -550,14 +559,42 @@ def update_fp_hists(self, alert, formatted_fp_hists):
# instead of treating it as a set,
# if some entries have the same jd, keep the one with the highest alert_mag

# make sure this is an aggregate pipeline in mongodb
# if we have no fp_hists to add, we don't do anything
if len(formatted_fp_hists) == 0:
return

with timer(
f"Updating fp_hists of {alert['objectId']} {alert['candid']}",
self.verbose > 1,
):
# pipeline that returns the very last fp_hists entry from the DB
last_fp_hist_pipeline = [
# 0. match the document and check that the fp_hists field exists
{"$match": {"_id": alert["objectId"], "fp_hists": {"$exists": True}}},
# 1. add a field which is the size of the fp_hists array
{"$addFields": {"n_fp_hists": {"$size": "$fp_hists"}}},
# 2. only keep the last fp_hists entry and call it fp_hist
{
"$project": {
"fp_hist": {"$arrayElemAt": ["$fp_hists", -1]},
"n_fp_hists": 1,
}
},
# 3. project only the jd and alert_mag, alert_ra, alert_dec fields in the fp_hists, as well as the n_fp_hists
{
"$project": {
"fp_hist": {
"jd": "$fp_hist.jd",
"alert_mag": "$fp_hist.alert_mag",
"alert_ra": "$fp_hist.alert_ra",
"alert_dec": "$fp_hist.alert_dec",
},
"n_fp_hists": 1,
}
},
]

# pipeline that updates the fp_hists array if necessary
update_pipeline = [
# 0. match the document
{"$match": {"_id": alert["objectId"]}},
Expand Down Expand Up @@ -612,29 +649,73 @@ def update_fp_hists(self, alert, formatted_fp_hists):
# 8. project only the new fp_hists array
{"$project": {"fp_hists": 1, "_id": 0}},
]

n_retries = 0
while True:
# run the pipeline and then update the document
new_fp_hists = (
self.mongo.db[self.collection_alerts_aux]
.aggregate(
update_pipeline,
allowDiskUse=True,
try:
# get the very last fp_hists entry from the DB
last_fp_hist = (
self.mongo.db[self.collection_alerts_aux]
.aggregate(last_fp_hist_pipeline, allowDiskUse=True)
.next()
)
.next()
.get("fp_hists", [])
)

# update the document, only if there is still less points in the DB than in the new fp_hists.
# Otherwise, rerun the pipeline. This is to help a little bit with concurrency issues
result = self.mongo.db[self.collection_alerts_aux].find_one_and_update(
{
if last_fp_hist is None:
# the pipeline first checked if the fp_hists field exists, so if it's empty,
# we can cancel the upadte, as we only update existing fp_hists (new objects with full history)
return

# run the update pipeline
new_fp_hists = (
self.mongo.db[self.collection_alerts_aux]
.aggregate(
update_pipeline,
allowDiskUse=True,
)
.next()
.get("fp_hists", [])
)

# we apply some conditions when running find_one_and_update to avoid concurrency
# issues where another process might have updated the fp_hists while we were
# calculating our updated fp_hists

update_conditions = {
"_id": alert["objectId"],
f"fp_hists.{len(new_fp_hists)}": {"$exists": False},
},
{"$set": {"fp_hists": new_fp_hists}},
)
if result is None:
}
if last_fp_hist["n_fp_hists"] == 0:
# existing BUT empty fp_hists field for that object, we verify that it is still empty
update_conditions["fp_hists.0"] = {"$exists": False}
else:
# verify that the very last fp_hist entry in the DB is still the same
last_entry_str = f"fp_hists.{last_fp_hist['n_fp_hists'] - 1}"
update_conditions[f"{last_entry_str}.jd"] = last_fp_hist[
"fp_hist"
]["jd"]
update_conditions[f"{last_entry_str}.alert_mag"] = last_fp_hist[
"fp_hist"
]["alert_mag"]
update_conditions[f"{last_entry_str}.alert_ra"] = last_fp_hist[
"fp_hist"
]["alert_ra"]
update_conditions[f"{last_entry_str}.alert_dec"] = last_fp_hist[
"fp_hist"
]["alert_dec"]

result = self.mongo.db[
self.collection_alerts_aux
].find_one_and_update(
update_conditions,
{"$set": {"fp_hists": new_fp_hists}},
)
except Exception as e:
log(
f"Error occured trying to update fp_hists of {alert['objectId']} {alert['candid']}: {str(e)}"
)
result = None
if (
result is None
): # conditions not met, likely to be a concurrency issue, retry
n_retries += 1
if n_retries > 10:
log(
Expand All @@ -645,7 +726,8 @@ def update_fp_hists(self, alert, formatted_fp_hists):
log(
f"Retrying to update fp_hists of {alert['objectId']} {alert['candid']}"
)
time.sleep(1)
# add a random sleep between 0 and 5s, this should help avoid multiple processes from retrying at the exact same time
time.sleep(np.random.uniform(0, 5))
else:
break

Expand Down
1 change: 1 addition & 0 deletions kowalski/ingesters/ingest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ def convert_nparray_to_list(value):
except Exception as exception:
total_bad_documents += 1
log(str(exception))
continue

# ingest in batches
try:
Expand Down
17 changes: 17 additions & 0 deletions kowalski/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,23 @@ def compare_dicts(a: dict, b: dict, ignore_keys=[], same_keys=False):
elif isinstance(v, list):
if not all([i in b[k] for i in v]):
return False
elif k == "observation_type":
# we make an exception for observation_type, as the logic is a bit more complicated
# this has been designed with SEDM in mind, but can easily be extended to other instruments
obs_list = {"a": [], "b": []}
for dict_name, dict_content in [("a", a), ("b", b)]:
if dict_content[k] == "Mix 'n Match":
obs_list[dict_name] = dict_content.get("observation_choices", [])
else:
if "IFU" in dict_content[k]:
obs_list[dict_name].append("IFU")

if "3-shot" in dict_content[k]:
obs_list[dict_name].extend(["g", "r", "i"])
elif "4-shot" in dict_content[k]:
obs_list[dict_name].extend(["u", "g", "r", "i"])
if not set(obs_list["a"]).issubset(set(obs_list["b"])):
return False
elif b[k] != v:
return False
return True
Expand Down
Loading