Skip to content

Commit

Permalink
add deepcopy of variable to avoid issue with concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodlz authored Jan 6, 2025
1 parent 440bc02 commit 5fb2a65
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions kowalski/api/handlers/filter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
from abc import ABC
from typing import List, Optional, Union
from copy import deepcopy

from aiohttp import web
from astropy.time import Time
Expand Down Expand Up @@ -1262,20 +1263,20 @@ async def post(self, request: web.Request) -> web.Response:
filter_pipeline_upstream = config["database"]["filters"][
filter_existing.catalog
]
filter_pipeline = filter_pipeline_upstream + loads(filter_pipeline)
pipeline = deepcopy(filter_pipeline_upstream + loads(filter_pipeline))

# match permissions for ZTF
if filter_existing.catalog.startswith("ZTF"):
if "candid" in filter_pipeline[0]["$match"]:
del filter_pipeline[0]["$match"]["candid"]
filter_pipeline[0]["$match"]["candidate.programid"][
if "candid" in pipeline[0]["$match"]:
del pipeline[0]["$match"]["candid"]
pipeline[0]["$match"]["candidate.programid"][
"$in"
] = filter_existing.permissions
filter_pipeline[3]["$project"]["prv_candidates"]["$filter"]["cond"][
pipeline[3]["$project"]["prv_candidates"]["$filter"]["cond"][
"$and"
][0]["$in"][1] = filter_existing.permissions
if "fp_hists" in filter_pipeline[3]["$project"]:
filter_pipeline[3]["$project"]["fp_hists"]["$filter"]["cond"][
if "fp_hists" in pipeline[3]["$project"]:
pipeline[3]["$project"]["fp_hists"]["$filter"]["cond"][
"$and"
][0]["$in"][1] = filter_existing.permissions

Expand All @@ -1285,25 +1286,26 @@ async def post(self, request: web.Request) -> web.Response:
# {
# $lt: ["$$item.jd", "$candidate.jd"]
# }
filter_pipeline[3]["$project"]["prv_candidates"]["$filter"]["cond"][
pipeline[3]["$project"]["prv_candidates"]["$filter"]["cond"][
"$and"
].append({"$lt": ["$$item.jd", "$candidate.jd"]})
if "fp_hists" in filter_pipeline[3]["$project"]:
filter_pipeline[3]["$project"]["fp_hists"]["$filter"]["cond"][
if "fp_hists" in pipeline[3]["$project"]:
pipeline[3]["$project"]["fp_hists"]["$filter"]["cond"][
"$and"
].append({"$lt": ["$$item.jd", "$candidate.jd"]})

if objects is not None:
# match objects
filter_pipeline[0]["$match"]["objectId"] = {"$in": objects}
pipeline[0]["$match"]["objectId"] = {"$in": objects}
if start_date_jd is not None and end_date_jd is not None:
filter_pipeline[0]["$match"]["candidate.jd"] = {
pipeline[0]["$match"]["candidate.jd"] = {
"$gte": start_date_jd,
"$lte": end_date_jd,
}

print()
print(pipeline)
cursor = request.app["mongo"][filter_existing.catalog].aggregate(
filter_pipeline, allowDiskUse=False, maxTimeMS=max_time_ms
pipeline, allowDiskUse=False, maxTimeMS=max_time_ms
)
alerts = await cursor.to_list(length=None)

Expand Down

0 comments on commit 5fb2a65

Please sign in to comment.