Skip to content

Commit

Permalink
Enlarge the region to load diaObjects from
Browse files Browse the repository at this point in the history
Before many diaObjects were being missed, now they are all correctly loaded
  • Loading branch information
isullivan committed Jan 13, 2025
1 parent 31a3ad6 commit 6638ecb
Showing 1 changed file with 36 additions and 14 deletions.
50 changes: 36 additions & 14 deletions python/lsst/ap/association/testApdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
dropEmptyColumns,
)
import lsst.sphgeom
from lsst.utils.timer import timeMethod


class TestApdbConnections(
Expand Down Expand Up @@ -252,10 +253,17 @@ def run(self, visit, detector,

diaSources = convertTableToSdmSchema(self.schema, diaSourcesRaw, tableName="DiaSource")

diaObjects = self.loadDiaObjects(region, self.schema)
diaObjects = self.loadDiaObjects(region.getBoundingCircle(), self.schema)

if diaObjects.empty:
self.log.info(f"diaObjects contain 0 diaSources (empty)")

Check failure on line 259 in python/lsst/ap/association/testApdb.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F541

f-string is missing placeholders
else:
nDiaSources = diaObjects.nDiaSources
self.log.info(f"diaObjects contain {np.min(nDiaSources)} to {np.max(nDiaSources)} diaSources")

# Associate DiaSources with DiaObjects
associatedDiaSources, newDiaObjects = self.associateDiaSources(diaSources, diaObjects)
associatedDiaSources, newDiaObjects = self.associateDiaSources(diaSources, diaObjects,
diaSourcesReal, diaSourcesBogus)
# Merge new and preloaded diaObjects
mergedDiaObjects = self.mergeAssociatedCatalogs(diaObjects, newDiaObjects)

Expand All @@ -274,12 +282,14 @@ def run(self, visit, detector,
else:
diaSourcesChunk = associatedDiaSources.iloc[start:srcEnd]
finalDiaSources = convertTableToSdmSchema(self.schema, diaSourcesChunk, tableName="DiaSource")
self.log.info(f"Writing diaOSource chunk {ind} of length {len(diaSourcesChunk)} to the APDB")
self.log.info(f"Writing diaSource chunk {ind} of length {len(diaSourcesChunk)} to the APDB")
diaForcedSources = self.runForcedMeasurement(diaObjectsChunk, idGenForced, visit, detector)

finalDiaObjects = convertTableToSdmSchema(self.schema, diaObjectsChunk, tableName="DiaObject")
finalDiaForcedSources = convertTableToSdmSchema(self.schema, diaForcedSources,
tableName="DiaForcedSource")
self.log.info(f"Writing forced source chunk {ind} of length"
f" {len(finalDiaForcedSources)} to the APDB")
self.writeToApdb(finalDiaObjects, finalDiaSources, finalDiaForcedSources, dateTime)
ind += 1
marker = pexConfig.Config()
Expand Down Expand Up @@ -394,7 +404,7 @@ def simpleMatch(self, diaSourceTable, diaObjects):
nUnassociatedDiaObjects=len(unAssocDiaSources),
)

def associateDiaSources(self, diaSourceTable, diaObjects):
def associateDiaSources(self, diaSourceTable, diaObjects, diaSourcesReal, diaSourcesBogus):
"""Associate DiaSources with DiaObjects.
Parameters
Expand All @@ -413,6 +423,8 @@ def associateDiaSources(self, diaSourceTable, diaObjects):
"""
# Associate new DiaSources with existing DiaObjects.
assocResults = self.simpleMatch(diaSourceTable, diaObjects)
assocReal = self.simpleMatch(diaSourcesReal, diaObjects)
assocBogus = self.simpleMatch(diaSourcesBogus, diaObjects)

toAssociate = []

Expand All @@ -423,11 +435,15 @@ def associateDiaSources(self, diaSourceTable, diaObjects):
toAssociate.append(createResults.diaSources)
associatedDiaSources = pd.concat(toAssociate)

self.log.info("%i updated and %i unassociated diaObjects. Creating %i new diaObjects",
self.log.info("%i updated and %i unassociated diaSources. Creating %i new diaObjects",
assocResults.nUpdatedDiaObjects,
assocResults.nUnassociatedDiaObjects,
createResults.nNewDiaObjects,
)
self.log.info(f"{assocReal.nUpdatedDiaObjects} real sources associated"
f" and {assocReal.nUnassociatedDiaObjects} not associated")
self.log.info(f"{assocBogus.nUpdatedDiaObjects} fake sources associated"
f" and {assocBogus.nUnassociatedDiaObjects} not associated")

# Index the DiaSource catalog for this visit after all associations
# have been made.
Expand Down Expand Up @@ -554,6 +570,7 @@ def runForcedMeasurement(self, diaObjects, idGenerator, visit, detector):
)
return diaForcedSources

@timeMethod
def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources, dateTime):
"""Write to the Alert Production Database (Apdb).
Expand All @@ -573,17 +590,22 @@ def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources,
# Store DiaSources, updated DiaObjects, and DiaForcedSources in the
# Apdb.
# Drop empty columns that are nullable in the APDB.
diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject")
if associatedDiaSources is None:
diaSourceStore = None
else:
diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource")
diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource")
# diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject")
# if associatedDiaSources is None:
# diaSourceStore = None
# else:
# diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource")
# diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource")
# self.apdb.store(
# dateTime,
# diaObjectStore,
# diaSourceStore,
# diaForcedSourceStore)
self.apdb.store(
dateTime,
diaObjectStore,
diaSourceStore,
diaForcedSourceStore)
updatedDiaObjects,
associatedDiaSources,
diaForcedSources)
self.log.info("APDB updated.")


Expand Down

0 comments on commit 6638ecb

Please sign in to comment.