Skip to content

Commit

Permalink
Switch exposure pipeline to use ModelLibrary instead of a list of mod…
Browse files Browse the repository at this point in the history
…els (#1525)
  • Loading branch information
braingram authored Dec 9, 2024
2 parents d68a9ca + b6f3799 commit b37aee9
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 132 deletions.
1 change: 1 addition & 0 deletions changes/1525.exposure_pipeline.0.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update exposure pipeline to use ModelLibrary.
1 change: 1 addition & 0 deletions changes/1525.exposure_pipeline.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix exposure pipeline handling of all saturated inputs.
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ directory = "scripts"
name = "Scripts"
showcontent = true

[[tool.towncrier.type]]
directory = "exposure_pipeline"
name = "``exposure_pipeline``"
showcontent = true

[[tool.towncrier.type]]
directory = "mosaic_pipeline"
name = "``mosaic_pipeline``"
Expand Down
189 changes: 75 additions & 114 deletions romancal/pipeline/exposure_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
#!/usr/bin/env python
import logging
from os.path import basename

import numpy as np
from roman_datamodels import datamodels as rdm
from roman_datamodels.dqflags import group

import romancal.datamodels.filetype as filetype

# step imports
from romancal.assign_wcs import AssignWcsStep
from romancal.associations.asn_from_list import asn_from_list
from romancal.associations.load_asn import load_asn
from romancal.dark_current import DarkCurrentStep
from romancal.datamodels.library import ModelLibrary
from romancal.dq_init import dq_init_step
from romancal.flatfield import FlatFieldStep
from romancal.lib.basic_utils import is_fully_saturated
Expand All @@ -36,10 +33,8 @@
class ExposurePipeline(RomanPipeline):
"""
ExposurePipeline: Apply all calibration steps to raw Roman WFI
ramps to produce a 2-D slope product. Included steps are:
dq_init, saturation, linearity, dark current, jump detection, ramp_fit,
assign_wcs, flatfield (only applied to WFI imaging data), photom,
and source_catalog.
ramps to produce a 2-D slope product. Included steps are documented
in the ``step_defs``.
"""

class_alias = "roman_elp"
Expand Down Expand Up @@ -73,123 +68,89 @@ def process(self, input):
self.source_catalog.return_updated_model = True
# make sure we update source catalog coordinates afer running TweakRegStep
self.tweakreg.update_source_catalog_coordinates = True
# make output filenames based on input filenames
self.output_use_model = True

log.info("Starting Roman exposure calibration pipeline ...")
if isinstance(input, str):
input_filename = basename(input)
else:
input_filename = None

# determine the input type
file_type = filetype.check(input)
if file_type == "asn":
with open(input_filename) as f:
asn = load_asn(f)
elif file_type == "asdf":
try:
# set the product name based on the input filename
asn = asn_from_list([input], product_name=input_filename.split(".")[0])
file_type = "asn"
except TypeError:
log.debug("Error opening file:")
return

# Build a list of observations to process
expos_file = []
n_members = 0
# extract the members from the asn to run the files through the steps
results = []
tweakreg_input = []

# populate expos_file
if file_type == "asn":
for product in asn["products"]:
# extract the members from the asn to run the files through the steps
n_members = len(product["members"])
for member in product["members"]:
expos_file.append(member["expname"])
elif file_type == "DataModel":
expos_file.append(input)

for in_file in expos_file:
if isinstance(in_file, str):
input_filename = basename(in_file)
# Open the file
input = rdm.open(in_file)
elif isinstance(in_file, rdm.DataModel):
input_filename = in_file.meta.filename
else:
input_filename = None

log.info(f"Processing a WFI exposure {input_filename}")

self.dq_init.suffix = "dq_init"
result = self.dq_init.run(input)

if input_filename:
result.meta.filename = input_filename

result = self.saturation.run(result)

# Test for fully saturated data
if is_fully_saturated(result):
# Return fully saturated image file (stopping pipeline)
log.info("All pixels are saturated. Returning a zeroed-out image.")

# if is_fully_saturated(result):
# Set all subsequent steps to skipped
for step_str in [
"assign_wcs",
"flat_field",
"photom",
"source_catalog",
"dark",
"refpix",
"linearity",
"ramp_fit",
"jump",
"tweakreg",
]:
result.meta.cal_step[step_str] = "SKIPPED"

results.append(result)
return results

result = self.refpix.run(result)
result = self.linearity.run(result)
result = self.dark_current.run(result)
result = self.rampfit.run(result)
result = self.assign_wcs.run(result)

if result.meta.exposure.type == "WFI_IMAGE":
result = self.flatfield.run(result)
result = self.photom.run(result)
result = self.source_catalog.run(result)
tweakreg_input.append(result)
log.info(
f"Number of models to tweakreg: {len(tweakreg_input), n_members}"
)
else:
log.info("Flat Field step is being SKIPPED")
log.info("Photom step is being SKIPPED")
log.info("Source Detection step is being SKIPPED")
log.info("Tweakreg step is being SKIPPED")
result.meta.cal_step.flat_field = "SKIPPED"
result.meta.cal_step.photom = "SKIPPED"
result.meta.cal_step.source_catalog = "SKIPPED"
result.meta.cal_step.tweakreg = "SKIPPED"

self.output_use_model = True
results.append(result)
return_lib = True
if file_type == "ModelLibrary":
lib = input
elif file_type == "asn":
lib = ModelLibrary(input)
else:
# for a non-asn non-library input process it as a library
lib = ModelLibrary([input])
# but return it as a datamodel
return_lib = False

# Flag to track if any of the input models are fully saturated
any_saturated = False

with lib:
for model_index, model in enumerate(lib):
self.dq_init.suffix = "dq_init"
result = self.dq_init.run(model)

del model

result = self.saturation.run(result)

if is_fully_saturated(result):
log.info("All pixels are saturated. Returning a zeroed-out image.")
result = self.create_fully_saturated_zeroed_image(result)

# Track that we've seen a fully saturated input
any_saturated = True
log.warning(
"tweakreg will not be run due to a fully saturated input"
)
else:
result = self.refpix.run(result)
result = self.linearity.run(result)
result = self.dark_current.run(result)
result = self.rampfit.run(result)
result = self.assign_wcs.run(result)

if result.meta.exposure.type == "WFI_IMAGE":
result = self.flatfield.run(result)
result = self.photom.run(result)
result = self.source_catalog.run(result)
else:
log.info("Flat Field step is being SKIPPED")
log.info("Photom step is being SKIPPED")
log.info("Source Detection step is being SKIPPED")
log.info("Tweakreg step is being SKIPPED")
result.meta.cal_step.flat_field = "SKIPPED"
result.meta.cal_step.photom = "SKIPPED"
result.meta.cal_step.source_catalog = "SKIPPED"

if any_saturated:
# the input association contains a fully saturated model
# where source_catalog can't be run which means we
# also can't run tweakreg.
result.meta.cal_step.tweakreg = "SKIPPED"
lib.shelve(result, model_index)

# Now that all the exposures are collated, run tweakreg
# Note: this does not cover the case where the asn mixes imaging and spectral
# observations. This should not occur on-prem
result = self.tweakreg.run(results)
if not any_saturated:
self.tweakreg.run(lib)

log.info("Roman exposure calibration pipeline ending...")

return results
# return a ModelLibrary
if return_lib:
return lib

# or a DataModel (for non-asn non-lib inputs)
with lib:
model = lib.borrow(0)
lib.shelve(model)
return model

def create_fully_saturated_zeroed_image(self, input_model):
"""
Expand All @@ -209,14 +170,14 @@ def create_fully_saturated_zeroed_image(self, input_model):

# Set all subsequent steps to skipped
for step_str in [
"refpix",
"linearity",
"dark",
"ramp_fit",
"assign_wcs",
"flat_field",
"photom",
"source_catalog",
"tweakreg",
]:
fully_saturated_model.meta.cal_step[step_str] = "SKIPPED"

Expand Down
Empty file.
57 changes: 57 additions & 0 deletions romancal/pipeline/tests/test_exposure_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pytest
import roman_datamodels.datamodels as rdm
import roman_datamodels.maker_utils as mk

from romancal.associations.asn_from_list import asn_from_list
from romancal.datamodels.library import ModelLibrary
from romancal.pipeline import ExposurePipeline


@pytest.fixture(scope="function")
def input_value(request, tmp_path):
match request.param:
case "datamodel_fn":
model = mk.mk_datamodel(rdm.RampModel)
fn = tmp_path / "model.asdf"
model.save(fn)
return fn
case "datamodel":
return mk.mk_datamodel(rdm.RampModel)
case "asn_fn":
model = mk.mk_datamodel(rdm.RampModel)
model.meta.filename = "foo.asdf"
model.save(tmp_path / model.meta.filename)
asn = asn_from_list([model.meta.filename], product_name="foo_out")
base_fn, contents = asn.dump(format="json")
asn_filename = tmp_path / base_fn
with open(asn_filename, "w") as f:
f.write(contents)
return asn_filename
case "library":
return ModelLibrary([mk.mk_datamodel(rdm.RampModel)])
case value:
raise Exception(f"Invalid parametrization: {value}")


@pytest.mark.parametrize(
"input_value, expected_output_type",
[
("datamodel_fn", rdm.DataModel),
("datamodel", rdm.DataModel),
("asn_fn", ModelLibrary),
("library", ModelLibrary),
],
indirect=["input_value"],
)
def test_input_to_output(input_value, expected_output_type):
"""
Test that for a particular input_value (as parametrized indirectly
through the input_value fixtrue) the output is the expected type.
"""
pipeline = ExposurePipeline()
# don't fetch references
pipeline.prefetch_references = False
# skip all steps
[setattr(getattr(pipeline, k), "skip", True) for k in pipeline.step_defs]
output_value = pipeline(input_value)
assert isinstance(output_value, expected_output_type)
Loading

0 comments on commit b37aee9

Please sign in to comment.