Skip to content

Commit

Permalink
Merge pull request #242 from lsst/tickets/DM-46668
Browse files Browse the repository at this point in the history
tickets/DM-46668 Produce ssSource information in diaPipe
  • Loading branch information
Gerenjie authored Nov 2, 2024
2 parents 3504e18 + 442c68e commit 353e27d
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 36 deletions.
19 changes: 17 additions & 2 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ class DiaPipelineConnections(
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
)
associatedSsSources = connTypes.Output(
doc="Optional output storing ssSource data computed during association.",
name="{fakesType}{coaddName}Diff_associatedSsSources",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
)
diaForcedSources = connTypes.Output(
doc="Optional output storing the forced sources computed at the diaObject positions.",
name="{fakesType}{coaddName}Diff_diaForcedSrc",
Expand All @@ -148,6 +154,8 @@ def __init__(self, *, config=None):
self.outputs.remove("diaForcedSources")
if not config.doSolarSystemAssociation:
self.inputs.remove("solarSystemObjectTable")
if (not config.doWriteAssociatedSources) or (not config.doSolarSystemAssociation):
self.outputs.remove("associatedSsSources")

def adjustQuantum(self, inputs, outputs, label, dataId):
"""Override to make adjustments to `lsst.daf.butler.DatasetRef` objects
Expand Down Expand Up @@ -444,6 +452,8 @@ def run(self,
- ``diaForcedSources`` : Catalog of new and previously detected
forced DiaSources. (`pandas.DataFrame`)
- ``diaObjects`` : Updated table of DiaObjects. (`pandas.DataFrame`)
- ``associatedSsSources`` : Catalog of ssSource records.
(`pandas.DataFrame`)
Raises
------
Expand All @@ -463,7 +473,7 @@ def run(self,
diaObjects = preloadedDiaObjects

# Associate DiaSources with DiaObjects
associatedDiaSources, newDiaObjects = self.associateDiaSources(
associatedDiaSources, newDiaObjects, associatedSsSources = self.associateDiaSources(
diaSourceTable, solarSystemObjectTable, diffIm, diaObjects
)

Expand Down Expand Up @@ -546,6 +556,7 @@ def run(self,
associatedDiaSources=associatedDiaSources,
diaForcedSources=diaForcedSources,
diaObjects=diaCalResult.diaObjectCat,
associatedSsSources=associatedSsSources
)

def createNewDiaObjects(self, unAssocDiaSources):
Expand Down Expand Up @@ -608,6 +619,8 @@ def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, di
Associated DiaSources with DiaObjects.
newDiaObjects : `pandas.DataFrame`
Table of new DiaObjects after association.
associatedSsSources : `pandas.DataFrame`
Table of new ssSources after association.
"""
# Associate new DiaSources with existing DiaObjects.
assocResults = self.associator.run(diaSourceTable, diaObjects)
Expand All @@ -624,11 +637,13 @@ def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, di
toAssociate.append(ssoAssocResult.ssoAssocDiaSources)
nTotalSsObjects = ssoAssocResult.nTotalSsObjects
nAssociatedSsObjects = ssoAssocResult.nAssociatedSsObjects
associatedSsSources = ssoAssocResult.ssSourceData
else:
# Create new DiaObjects from unassociated diaSources.
createResults = self.createNewDiaObjects(assocResults.unAssocDiaSources)
nTotalSsObjects = 0
nAssociatedSsObjects = 0
associatedSsSources = None
if len(assocResults.matchedDiaSources) > 0:
toAssociate.append(assocResults.matchedDiaSources)
toAssociate.append(createResults.diaSources)
Expand All @@ -644,7 +659,7 @@ def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, di
assocResults.nUnassociatedDiaObjects,
createResults.nNewDiaObjects,
)
return (associatedDiaSources, createResults.newDiaObjects)
return (associatedDiaSources, createResults.newDiaObjects, associatedSsSources)

@timeMethod
def mergeAssociatedCatalogs(self, preloadedDiaSources, associatedDiaSources, diaObjects, newDiaObjects,
Expand Down
46 changes: 33 additions & 13 deletions python/lsst/ap/association/mpSkyEphemerisQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import pyarrow as pa
import requests

from lsst.ap.association.utils import getMidpointFromTimespan
from lsst.ap.association.utils import getMidpointFromTimespan, objID_to_ssObjectID
from lsst.geom import SpherePoint
import lsst.pex.config as pexConfig
from lsst.utils.timer import timeMethod
Expand Down Expand Up @@ -115,10 +115,14 @@ def run(self, predictedRegionTime):
RA in decimal degrees (`float`)
``dec``
DEC in decimal degrees (`float`)
``obj_poly``
DO NOT USE until DM-46069 is resolved
``obs_poly``
DO NOT USE until DM-46069 is resolved
``obj_X_poly``, ``obj_Y_poly``, ``obj_Z_poly``
Chebyshev coefficients for object path
``obs_X_poly``, ``obs_Y_poly``, ``obs_Z_poly``
Chebyshev coefficients for observer path
``t_min``
Lower time bound for polynomials
``t_max``
Upper time bound for polynomials
"""
# Get detector center and timespan midpoint from predictedRegionTime.
region = predictedRegionTime.region
Expand All @@ -131,7 +135,6 @@ def run(self, predictedRegionTime):
mpSkyURL = os.environ.get('MP_SKY_URL', '')
mpSkySsObjects = self._mpSkyConeSearch(expCenter, expMidPointEPOCH,
expRadius + self.config.queryBufferRadiusDegrees, mpSkyURL)

return Struct(
ssObjects=mpSkySsObjects,
)
Expand All @@ -156,17 +159,24 @@ def read_mp_sky_response(self, response):
Array of object cartesian position polynomials
observer_polynomial : `np.ndarray`, (N,M)
Array of observer cartesian position polynomials
t_min : `np.ndarray`
Lower time bound for polynomials
t_max : `np.ndarray`
Upper time bound for polynomials
"""
with pa.input_stream(memoryview(response.content)) as stream:
stream.seek(0)
object_polynomial = pa.ipc.read_tensor(stream)
observer_polynomial = pa.ipc.read_tensor(stream)
object_polynomial = pa.ipc.read_tensor(stream).to_numpy()
observer_polynomial = pa.ipc.read_tensor(stream).to_numpy()
with pa.ipc.open_stream(stream) as reader:
columns = next(reader)
objID = columns["name"].to_numpy(zero_copy_only=False)
ra = columns["ra"].to_numpy()
dec = columns["dec"].to_numpy()
return objID, ra, dec, object_polynomial, observer_polynomial
t_min = columns["tmin"].to_numpy()
t_max = columns["tmax"].to_numpy()
return objID, ra, dec, object_polynomial, observer_polynomial, t_min, t_max

def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius, mpSkyURL):
"""Query MPSky ephemeris service for objects near the expected detector position
Expand Down Expand Up @@ -201,16 +211,26 @@ def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius, mpSkyURL):
try:
response = requests.get(mpSkyURL, params=params, timeout=self.config.mpSkyRequestTimeoutSeconds)
response.raise_for_status()
objID, ra, dec, object_polynomial, observer_polynomial = self.read_mp_sky_response(response)
response = self.read_mp_sky_response(response)
objID, ra, dec, object_polynomial, observer_polynomial, tmin, tmax = response

mpSkySsObjects = pd.DataFrame()
mpSkySsObjects['ObjID'] = objID
mpSkySsObjects['ra'] = ra
mpSkySsObjects['obj_poly'] = object_polynomial
mpSkySsObjects['obs_poly'] = observer_polynomial
mpSkySsObjects['obj_x_poly'] = [poly[0] for poly in object_polynomial.T]
mpSkySsObjects['obj_y_poly'] = [poly[1] for poly in object_polynomial.T]
mpSkySsObjects['obj_z_poly'] = [poly[2] for poly in object_polynomial.T]
mpSkySsObjects['obs_x_poly'] = [observer_polynomial.T[0] for
i in range(len(mpSkySsObjects))]
mpSkySsObjects['obs_y_poly'] = [observer_polynomial.T[1] for
i in range(len(mpSkySsObjects))]
mpSkySsObjects['obs_z_poly'] = [observer_polynomial.T[2] for
i in range(len(mpSkySsObjects))]
mpSkySsObjects['tmin'] = tmin
mpSkySsObjects['tmax'] = tmax
mpSkySsObjects['dec'] = dec
mpSkySsObjects['Err(arcsec)'] = 2
mpSkySsObjects['ssObjectId'] = [abs(hash(v)) for v in mpSkySsObjects['ObjID'].values]
mpSkySsObjects['ssObjectId'] = [objID_to_ssObjectID(v) for v in mpSkySsObjects['ObjID'].values]
nFound = len(mpSkySsObjects)

if nFound == 0:
Expand Down
40 changes: 36 additions & 4 deletions python/lsst/ap/association/ssoAssociation.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,23 @@ def run(self, diaSourceCatalog, solarSystemObjects, exposure):
contained in the CCD footprint. (`int`)
- ``nAssociatedSsObjects`` : Number of SolarSystemObjects
that were associated with DiaSources.
- ``ssSourceData`` : ssSource table data. (`pandas.DataFrame)
"""
mjd_midpoint = exposure.visitInfo.date.toAstropy().tai.mjd
ref_time = mjd_midpoint - solarSystemObjects["tmin"].values[0]

solarSystemObjects['obs_position'] = solarSystemObjects.apply(lambda row: np.array([
np.polynomial.chebyshev.chebval(ref_time, row['obs_x_poly']),
np.polynomial.chebyshev.chebval(ref_time, row['obs_y_poly']),
np.polynomial.chebyshev.chebval(ref_time, row['obs_z_poly'])
]), axis=1)

solarSystemObjects['obj_position'] = solarSystemObjects.apply(lambda row: np.array([
np.polynomial.chebyshev.chebval(ref_time, row['obj_x_poly']),
np.polynomial.chebyshev.chebval(ref_time, row['obj_y_poly']),
np.polynomial.chebyshev.chebval(ref_time, row['obj_z_poly'])
]), axis=1)

maskedObjects = self._maskToCcdRegion(
solarSystemObjects,
exposure,
Expand All @@ -104,7 +120,10 @@ def run(self, diaSourceCatalog, solarSystemObjects, exposure):
ssoAssocDiaSources=pd.DataFrame(columns=diaSourceCatalog.columns),
unAssocDiaSources=diaSourceCatalog,
nTotalSsObjects=0,
nAssociatedSsObjects=0)
nAssociatedSsObjects=0,
ssSourceData=pd.DataFrame(columns=["ssObjectId", "obs_position", "obj_position",
"residual_ras", "residual_decs"])
)
else:
self.log.debug("Matching solar system objects:\n%s", maskedObjects)
self.log.info("Attempting to associate %d objects...", nSolarSystemObjects)
Expand All @@ -121,23 +140,36 @@ def run(self, diaSourceCatalog, solarSystemObjects, exposure):
# Query the KDtree for DIA nearest neighbors to SSOs. Currently only
# picks the DiaSource with the shortest distance. We can do something
# fancier later.
ssSourceData = []
ras, decs, residual_ras, residual_decs = [], [], [], []
diaSourceCatalog["ssObjectId"] = 0
for index, ssObject in maskedObjects.iterrows():

ssoVect = self._radec_to_xyz(ssObject["ra"], ssObject["dec"])
# Which DIA Sources fall within r?
dist, idx = tree.query(ssoVect, distance_upper_bound=maxRadius)
if np.isfinite(dist[0]):
if len(idx) == 1 and np.isfinite(dist[0]):
nFound += 1
diaSourceCatalog.loc[diaSourceCatalog.index[idx[0]], "ssObjectId"] = ssObject["ssObjectId"]
ssSourceData.append(ssObject[["ssObjectId", "obs_position", "obj_position"]].values)
dia_ra = diaSourceCatalog.loc[diaSourceCatalog.index[idx[0]], "ra"]
dia_dec = diaSourceCatalog.loc[diaSourceCatalog.index[idx[0]], "dec"]
ras.append(dia_ra)
decs.append(dia_dec)
residual_ras.append(dia_ra - ssObject["ra"])
residual_decs.append(dia_dec - ssObject["dec"])

self.log.info("Successfully associated %d SolarSystemObjects.", nFound)
assocMask = diaSourceCatalog["ssObjectId"] != 0
ssSourceData = pd.DataFrame(ssSourceData, columns=["ssObjectId", "obs_position", "obj_position"])
ssSourceData["residual_ras"] = residual_ras
ssSourceData["residual_decs"] = residual_decs

return pipeBase.Struct(
ssoAssocDiaSources=diaSourceCatalog[assocMask].reset_index(drop=True),
unAssocDiaSources=diaSourceCatalog[~assocMask].reset_index(drop=True),
nTotalSsObjects=nSolarSystemObjects,
nAssociatedSsObjects=nFound)
nAssociatedSsObjects=nFound,
ssSourceData=ssSourceData)

def _maskToCcdRegion(self, solarSystemObjects, exposure, marginArcsec):
"""Mask the input SolarSystemObjects to only those in the exposure
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ap/association/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""
__all__ = ("convertTableToSdmSchema", "readSdmSchemaFile", "readSchemaFromApdb",
"dropEmptyColumns", "make_empty_catalog", "getMidpointFromTimespan",
"makeEmptyForcedSourceTable")
"makeEmptyForcedSourceTable", "objID_to_ssObjectID", "ssObjectID_to_objID")

from collections.abc import Mapping
import os
Expand Down
3 changes: 2 additions & 1 deletion tests/test_diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ def solarSystemAssociator_run(unAssocDiaSources, solarSystemObjectTable, diffIm)
return lsst.pipe.base.Struct(nTotalSsObjects=42,
nAssociatedSsObjects=30,
ssoAssocDiaSources=_makeMockDataFrame(),
unAssocDiaSources=_makeMockDataFrame())
unAssocDiaSources=_makeMockDataFrame(),
ssSourceData=_makeMockDataFrame())

def associator_run(table, diaObjects):
return lsst.pipe.base.Struct(nUpdatedDiaObjects=2, nUnassociatedDiaObjects=3,
Expand Down
35 changes: 20 additions & 15 deletions tests/test_ssoAssociationTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,25 @@ def setUp(self):
inplace=True,)
self.testDiaSources.loc[:, "ra"] = np.rad2deg(self.testDiaSources["ra"])
self.testDiaSources.loc[:, "dec"] = np.rad2deg(self.testDiaSources["dec"])
self.testDiaSources = pd.concat([self.testDiaSources,
pd.DataFrame([[45, 45]], columns=["ra", "dec"])],
ignore_index=True)
self.testDiaSources["ssObjectId"] = 0

# Grab a subset to treat as solar system objects
self.testSsObjects = self.testDiaSources[2:8].reset_index()
# Assign them ids starting from 1.
self.testSsObjects.loc[:, "ssObjectId"] = np.arange(
1, len(self.testSsObjects) + 1, dtype=int,)
self.testSsObjects = pd.DataFrame()
self.testSsObjects["ObjID"] = ["test_ob"]
self.testSsObjects["ssObjectId"] = [1234]
self.testSsObjects["ra"] = [45]
self.testSsObjects["dec"] = [45]
self.testSsObjects["tmin"] = [-1]
self.testSsObjects["tmax"] = [1]
self.testSsObjects["obs_x_poly"] = [np.array([0])]
self.testSsObjects["obs_y_poly"] = [np.array([0])]
self.testSsObjects["obs_z_poly"] = [np.array([0])]
self.testSsObjects["obj_x_poly"] = [np.array([1])]
self.testSsObjects["obj_y_poly"] = [np.array([1])]
self.testSsObjects["obj_z_poly"] = [np.array([2 ** 0.5])]
self.testSsObjects["Err(arcsec)"] = np.ones(len(self.testSsObjects))

def test_run(self):
Expand All @@ -86,17 +98,10 @@ def test_run(self):
results = ssAssocTask.run(self.testDiaSources,
self.testSsObjects,
self.exposure)
self.assertEqual(self.nSources,
len(results.ssoAssocDiaSources)
+ len(results.unAssocDiaSources))
self.assertEqual(len(self.testSsObjects),
len(results.ssoAssocDiaSources))
self.assertEqual(self.nSources - len(self.testSsObjects),
len(results.unAssocDiaSources))
for idx, ssObject in self.testSsObjects.iterrows():
self.assertEqual(
ssObject["ssObjectId"],
results.ssoAssocDiaSources.iloc[idx]["ssObjectId"])
self.assertEqual(len(results.ssoAssocDiaSources), 1)
self.assertEqual(results.ssoAssocDiaSources['ra'].iloc[0], 45.0)
self.assertEqual(results.ssoAssocDiaSources['dec'].iloc[0], 45.0)
self.assertEqual(results.ssoAssocDiaSources['ssObjectId'].iloc[0], 1234)

def test_mask(self):
"""Test that masking against the CCD bounding box works as expected.
Expand Down

0 comments on commit 353e27d

Please sign in to comment.