From cba8ed52c8999b69605f6dbea349492b1c7215c3 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 15 Nov 2024 13:44:24 -0500 Subject: [PATCH 01/13] switch elp pipeline to use ModelLibrary --- romancal/pipeline/exposure_pipeline.py | 207 ++++++++----------------- 1 file changed, 63 insertions(+), 144 deletions(-) diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index e4dd40833..63a6a152d 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -1,18 +1,12 @@ #!/usr/bin/env python import logging -from os.path import basename - -import numpy as np -from roman_datamodels import datamodels as rdm -from roman_datamodels.dqflags import group import romancal.datamodels.filetype as filetype # step imports from romancal.assign_wcs import AssignWcsStep -from romancal.associations.asn_from_list import asn_from_list -from romancal.associations.load_asn import load_asn from romancal.dark_current import DarkCurrentStep +from romancal.datamodels.library import ModelLibrary from romancal.dq_init import dq_init_step from romancal.flatfield import FlatFieldStep from romancal.lib.basic_utils import is_fully_saturated @@ -73,152 +67,77 @@ def process(self, input): self.source_catalog.return_updated_model = True # make sure we update source catalog coordinates afer running TweakRegStep self.tweakreg.update_source_catalog_coordinates = True + # make output filenames based on input filenames + self.output_use_model = True log.info("Starting Roman exposure calibration pipeline ...") - if isinstance(input, str): - input_filename = basename(input) - else: - input_filename = None # determine the input type file_type = filetype.check(input) - if file_type == "asn": - with open(input_filename) as f: - asn = load_asn(f) - elif file_type == "asdf": - try: - # set the product name based on the input filename - asn = asn_from_list([input], product_name=input_filename.split(".")[0]) - file_type = "asn" - except TypeError: - log.debug("Error opening file:") - return - - # Build a list of observations to process - expos_file = [] - n_members = 0 - # extract the members from the asn to run the files through the steps - results = [] - tweakreg_input = [] - - # populate expos_file - if file_type == "asn": - for product in asn["products"]: - # extract the members from the asn to run the files through the steps - n_members = len(product["members"]) - for member in product["members"]: - expos_file.append(member["expname"]) - elif file_type == "DataModel": - expos_file.append(input) - - for in_file in expos_file: - if isinstance(in_file, str): - input_filename = basename(in_file) - # Open the file - input = rdm.open(in_file) - elif isinstance(in_file, rdm.DataModel): - input_filename = in_file.meta.filename - else: - input_filename = None - - log.info(f"Processing a WFI exposure {input_filename}") - - self.dq_init.suffix = "dq_init" - result = self.dq_init.run(input) - - if input_filename: - result.meta.filename = input_filename - - result = self.saturation.run(result) - - # Test for fully saturated data - if is_fully_saturated(result): - # Return fully saturated image file (stopping pipeline) - log.info("All pixels are saturated. Returning a zeroed-out image.") - - # if is_fully_saturated(result): - # Set all subsequent steps to skipped - for step_str in [ - "assign_wcs", - "flat_field", - "photom", - "source_catalog", - "dark", - "refpix", - "linearity", - "ramp_fit", - "jump", - "tweakreg", - ]: - result.meta.cal_step[step_str] = "SKIPPED" - - results.append(result) - return results - - result = self.refpix.run(result) - result = self.linearity.run(result) - result = self.dark_current.run(result) - result = self.rampfit.run(result) - result = self.assign_wcs.run(result) - - if result.meta.exposure.type == "WFI_IMAGE": - result = self.flatfield.run(result) - result = self.photom.run(result) - result = self.source_catalog.run(result) - tweakreg_input.append(result) - log.info( - f"Number of models to tweakreg: {len(tweakreg_input), n_members}" - ) - else: - log.info("Flat Field step is being SKIPPED") - log.info("Photom step is being SKIPPED") - log.info("Source Detection step is being SKIPPED") - log.info("Tweakreg step is being SKIPPED") - result.meta.cal_step.flat_field = "SKIPPED" - result.meta.cal_step.photom = "SKIPPED" - result.meta.cal_step.source_catalog = "SKIPPED" - result.meta.cal_step.tweakreg = "SKIPPED" - - self.output_use_model = True - results.append(result) + if file_type == "ModelLibrary": + lib = input + elif file_type == "asn": + lib = ModelLibrary(input) + else: + lib = ModelLibrary([input]) + + with lib: + for model_index, model in enumerate(lib): + self.dq_init.suffix = "dq_init" + result = self.dq_init.run(model) + + del model + + result = self.saturation.run(result) + + # Test for fully saturated data + if is_fully_saturated(result): + # Return fully saturated image file (stopping pipeline) + log.info("All pixels are saturated. Returning a zeroed-out image.") + + # if is_fully_saturated(result): + # Set all subsequent steps to skipped + for step_str in [ + "assign_wcs", + "flat_field", + "photom", + "source_catalog", + "dark", + "refpix", + "linearity", + "ramp_fit", + "jump", + "tweakreg", + ]: + result.meta.cal_step[step_str] = "SKIPPED" + else: + result = self.refpix.run(result) + result = self.linearity.run(result) + result = self.dark_current.run(result) + result = self.rampfit.run(result) + result = self.assign_wcs.run(result) + + if result.meta.exposure.type == "WFI_IMAGE": + result = self.flatfield.run(result) + result = self.photom.run(result) + result = self.source_catalog.run(result) + else: + log.info("Flat Field step is being SKIPPED") + log.info("Photom step is being SKIPPED") + log.info("Source Detection step is being SKIPPED") + log.info("Tweakreg step is being SKIPPED") + result.meta.cal_step.flat_field = "SKIPPED" + result.meta.cal_step.photom = "SKIPPED" + result.meta.cal_step.source_catalog = "SKIPPED" + result.meta.cal_step.tweakreg = "SKIPPED" + + lib.shelve(result, model_index) # Now that all the exposures are collated, run tweakreg # Note: this does not cover the case where the asn mixes imaging and spectral # observations. This should not occur on-prem - result = self.tweakreg.run(results) + self.tweakreg.run(lib) log.info("Roman exposure calibration pipeline ending...") - return results - - def create_fully_saturated_zeroed_image(self, input_model): - """ - Create zeroed-out image file - """ - # The set order is: data, dq, var_poisson, var_rnoise, err - fully_saturated_model = ramp_fit_step.create_image_model( - input_model, - ( - np.zeros(input_model.data.shape[1:], dtype=input_model.data.dtype), - input_model.pixeldq | input_model.groupdq[0] | group.SATURATED, - np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype), - np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype), - np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype), - ), - ) - - # Set all subsequent steps to skipped - for step_str in [ - "linearity", - "dark", - "ramp_fit", - "assign_wcs", - "flat_field", - "photom", - "source_catalog", - "tweakreg", - ]: - fully_saturated_model.meta.cal_step[step_str] = "SKIPPED" - - # Return zeroed-out image file - return fully_saturated_model + return lib From 6868880cc5c7e70dbe1d1350ff496675debd9d61 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 15 Nov 2024 15:55:55 -0500 Subject: [PATCH 02/13] stop pipeline early on a saturated exposure --- romancal/pipeline/exposure_pipeline.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index 63a6a152d..5c8a4129c 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -95,7 +95,6 @@ def process(self, input): # Return fully saturated image file (stopping pipeline) log.info("All pixels are saturated. Returning a zeroed-out image.") - # if is_fully_saturated(result): # Set all subsequent steps to skipped for step_str in [ "assign_wcs", @@ -110,6 +109,11 @@ def process(self, input): "tweakreg", ]: result.meta.cal_step[step_str] = "SKIPPED" + + # Stop at this point and returned only the first model + # as a list to match prior behavior + lib.shelve(result, 0) + return [result] else: result = self.refpix.run(result) result = self.linearity.run(result) From 559ff28da2f9239376d274f28601a9f703547248 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 18 Nov 2024 11:10:02 -0500 Subject: [PATCH 03/13] update regtest to reflect cal_step status --- romancal/regtest/test_wfi_image_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/romancal/regtest/test_wfi_image_pipeline.py b/romancal/regtest/test_wfi_image_pipeline.py index 09e7c2fdc..28dfdd89f 100644 --- a/romancal/regtest/test_wfi_image_pipeline.py +++ b/romancal/regtest/test_wfi_image_pipeline.py @@ -300,5 +300,5 @@ def test_pipeline_suffix(rtdata, ignore_asdf_paths): assert model.meta.cal_step.flat_field == "COMPLETE" assert model.meta.cal_step.photom == "COMPLETE" assert model.meta.cal_step.source_catalog == "COMPLETE" - assert model.meta.cal_step.tweakreg == "INCOMPLETE" + assert model.meta.cal_step.tweakreg == "SKIPPED" assert model.meta.filename == output From eccd38668b63f2e1d289059c74e76d915b9a6750 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 18 Nov 2024 11:26:25 -0500 Subject: [PATCH 04/13] add changelog --- changes/1525.exposure_pipeline.rst | 1 + pyproject.toml | 5 +++++ 2 files changed, 6 insertions(+) create mode 100644 changes/1525.exposure_pipeline.rst diff --git a/changes/1525.exposure_pipeline.rst b/changes/1525.exposure_pipeline.rst new file mode 100644 index 000000000..8845393e4 --- /dev/null +++ b/changes/1525.exposure_pipeline.rst @@ -0,0 +1 @@ +Update exposure pipeline to use ModelLibrary. diff --git a/pyproject.toml b/pyproject.toml index 16fd4d7ae..d2d0e4613 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -264,6 +264,11 @@ directory = "scripts" name = "Scripts" showcontent = true +[[tool.towncrier.type]] +directory = "exposure_pipeline" +name = "``exposure_pipeline``" +showcontent = true + [[tool.towncrier.type]] directory = "mosaic_pipeline" name = "``mosaic_pipeline``" From e37c6bdd26da443ae96817426f53f0ee021608a9 Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 19 Nov 2024 14:55:12 -0500 Subject: [PATCH 05/13] restore unused create_fully_saturated_zeroed_image --- romancal/pipeline/exposure_pipeline.py | 35 ++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index 5c8a4129c..a9da9f74d 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -1,6 +1,9 @@ #!/usr/bin/env python import logging +import numpy as np +from roman_datamodels.dqflags import group + import romancal.datamodels.filetype as filetype # step imports @@ -145,3 +148,35 @@ def process(self, input): log.info("Roman exposure calibration pipeline ending...") return lib + + def create_fully_saturated_zeroed_image(self, input_model): + """ + Create zeroed-out image file + """ + # The set order is: data, dq, var_poisson, var_rnoise, err + fully_saturated_model = ramp_fit_step.create_image_model( + input_model, + ( + np.zeros(input_model.data.shape[1:], dtype=input_model.data.dtype), + input_model.pixeldq | input_model.groupdq[0] | group.SATURATED, + np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype), + np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype), + np.zeros(input_model.err.shape[1:], dtype=input_model.err.dtype), + ), + ) + + # Set all subsequent steps to skipped + for step_str in [ + "linearity", + "dark", + "ramp_fit", + "assign_wcs", + "flat_field", + "photom", + "source_detection", + "tweakreg", + ]: + fully_saturated_model.meta.cal_step[step_str] = "SKIPPED" + + # Return zeroed-out image file + return fully_saturated_model From aacb457f2892f745b3ce65c18e2d00a1f203c996 Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 19 Nov 2024 15:12:28 -0500 Subject: [PATCH 06/13] cleanup elp --- romancal/pipeline/exposure_pipeline.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index a9da9f74d..66f73c19d 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -33,10 +33,8 @@ class ExposurePipeline(RomanPipeline): """ ExposurePipeline: Apply all calibration steps to raw Roman WFI - ramps to produce a 2-D slope product. Included steps are: - dq_init, saturation, linearity, dark current, jump detection, ramp_fit, - assign_wcs, flatfield (only applied to WFI imaging data), photom, - and source_catalog. + ramps to produce a 2-D slope product. Included steps are documented + in the ``step_defs``. """ class_alias = "roman_elp" @@ -108,7 +106,6 @@ def process(self, input): "refpix", "linearity", "ramp_fit", - "jump", "tweakreg", ]: result.meta.cal_step[step_str] = "SKIPPED" @@ -136,7 +133,6 @@ def process(self, input): result.meta.cal_step.flat_field = "SKIPPED" result.meta.cal_step.photom = "SKIPPED" result.meta.cal_step.source_catalog = "SKIPPED" - result.meta.cal_step.tweakreg = "SKIPPED" lib.shelve(result, model_index) @@ -173,7 +169,6 @@ def create_fully_saturated_zeroed_image(self, input_model): "assign_wcs", "flat_field", "photom", - "source_detection", "tweakreg", ]: fully_saturated_model.meta.cal_step[step_str] = "SKIPPED" From f14c6ee41b9294d9639f4af5f662f56838376de3 Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 19 Nov 2024 15:20:35 -0500 Subject: [PATCH 07/13] use create zeroed model --- romancal/pipeline/exposure_pipeline.py | 26 +++----------------------- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index 66f73c19d..a52eb2dad 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -91,29 +91,9 @@ def process(self, input): result = self.saturation.run(result) - # Test for fully saturated data if is_fully_saturated(result): - # Return fully saturated image file (stopping pipeline) log.info("All pixels are saturated. Returning a zeroed-out image.") - - # Set all subsequent steps to skipped - for step_str in [ - "assign_wcs", - "flat_field", - "photom", - "source_catalog", - "dark", - "refpix", - "linearity", - "ramp_fit", - "tweakreg", - ]: - result.meta.cal_step[step_str] = "SKIPPED" - - # Stop at this point and returned only the first model - # as a list to match prior behavior - lib.shelve(result, 0) - return [result] + result = self.create_fully_saturated_zeroed_image(result) else: result = self.refpix.run(result) result = self.linearity.run(result) @@ -163,13 +143,13 @@ def create_fully_saturated_zeroed_image(self, input_model): # Set all subsequent steps to skipped for step_str in [ - "linearity", + "refpix" "linearity", "dark", "ramp_fit", "assign_wcs", "flat_field", "photom", - "tweakreg", + "source_catalog", ]: fully_saturated_model.meta.cal_step[step_str] = "SKIPPED" From 0a2164df7e9d2c0a4b29a101fcbb663607c0f8f6 Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 19 Nov 2024 16:37:52 -0500 Subject: [PATCH 08/13] try to untangle source catalog vs source detection --- romancal/pipeline/exposure_pipeline.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index a52eb2dad..dc671c2ec 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -112,7 +112,8 @@ def process(self, input): log.info("Tweakreg step is being SKIPPED") result.meta.cal_step.flat_field = "SKIPPED" result.meta.cal_step.photom = "SKIPPED" - result.meta.cal_step.source_catalog = "SKIPPED" + # FIXME source_catalog ain't in the schema + # result.meta.cal_step.source_catalog = "SKIPPED" lib.shelve(result, model_index) @@ -149,7 +150,8 @@ def create_fully_saturated_zeroed_image(self, input_model): "assign_wcs", "flat_field", "photom", - "source_catalog", + # FIXME source_catalog ain't in the schema + # "source_catalog", ]: fully_saturated_model.meta.cal_step[step_str] = "SKIPPED" From b1ba9d0d342ee4b4fa96180a8e4603a69228e17b Mon Sep 17 00:00:00 2001 From: Brett Date: Wed, 20 Nov 2024 10:49:56 -0500 Subject: [PATCH 09/13] skip tweakreg when a fully saturated input is found --- romancal/pipeline/exposure_pipeline.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index dc671c2ec..6b8f07b1b 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -82,6 +82,9 @@ def process(self, input): else: lib = ModelLibrary([input]) + # Flag to track if any of the input models are fully saturated + any_saturated = False + with lib: for model_index, model in enumerate(lib): self.dq_init.suffix = "dq_init" @@ -94,6 +97,12 @@ def process(self, input): if is_fully_saturated(result): log.info("All pixels are saturated. Returning a zeroed-out image.") result = self.create_fully_saturated_zeroed_image(result) + + # Track that we've seen a fully saturated input + any_saturated = True + log.warning( + "tweakreg will not be run due to a fully saturated input" + ) else: result = self.refpix.run(result) result = self.linearity.run(result) @@ -112,15 +121,20 @@ def process(self, input): log.info("Tweakreg step is being SKIPPED") result.meta.cal_step.flat_field = "SKIPPED" result.meta.cal_step.photom = "SKIPPED" - # FIXME source_catalog ain't in the schema - # result.meta.cal_step.source_catalog = "SKIPPED" + result.meta.cal_step.source_detection = "SKIPPED" + if any_saturated: + # the input association contains a fully saturated model + # where source_catalog can't be run which means we + # also can't run tweakreg. + result.meta.cal_step.tweakreg = "SKIPPED" lib.shelve(result, model_index) # Now that all the exposures are collated, run tweakreg # Note: this does not cover the case where the asn mixes imaging and spectral # observations. This should not occur on-prem - self.tweakreg.run(lib) + if not any_saturated: + self.tweakreg.run(lib) log.info("Roman exposure calibration pipeline ending...") @@ -144,14 +158,14 @@ def create_fully_saturated_zeroed_image(self, input_model): # Set all subsequent steps to skipped for step_str in [ - "refpix" "linearity", + "refpix", + "linearity", "dark", "ramp_fit", "assign_wcs", "flat_field", "photom", - # FIXME source_catalog ain't in the schema - # "source_catalog", + "source_detection", ]: fully_saturated_model.meta.cal_step[step_str] = "SKIPPED" From c7868e2337fdfa219cf1f2eda613e9195e5caa13 Mon Sep 17 00:00:00 2001 From: Brett Date: Wed, 20 Nov 2024 12:37:45 -0500 Subject: [PATCH 10/13] improve all_saturated tests --- romancal/regtest/test_wfi_image_pipeline.py | 74 ++++++++++++++++----- 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/romancal/regtest/test_wfi_image_pipeline.py b/romancal/regtest/test_wfi_image_pipeline.py index 28dfdd89f..bb6d85f2d 100644 --- a/romancal/regtest/test_wfi_image_pipeline.py +++ b/romancal/regtest/test_wfi_image_pipeline.py @@ -5,6 +5,7 @@ import roman_datamodels as rdm from gwcs.wcstools import grid_from_bounding_box from numpy.testing import assert_allclose +from roman_datamodels.datamodels import ImageModel from roman_datamodels.dqflags import pixel from romancal.assign_wcs.assign_wcs_step import AssignWcsStep @@ -227,12 +228,13 @@ def test_elp_input_dm(rtdata, ignore_asdf_paths): assert model.meta.cal_step.photom == "COMPLETE" -def test_processing_pipeline_all_saturated(rtdata, ignore_asdf_paths): - """Tests for fully saturated data skipping steps in the pipeline - - Note that this test mimics how the pipeline is run in OPS. - Any changes to this test should be coordinated with OPS. +@pytest.fixture(scope="module") +def run_all_saturated(rtdata_module): + """ + Test ELP handling of an all saturated input model """ + rtdata = rtdata_module + input_data = "r0000101001001001001_0001_wfi01_ALL_SATURATED_uncal.asdf" rtdata.get_data(f"WFI/image/{input_data}") rtdata.input = input_data @@ -245,22 +247,60 @@ def test_processing_pipeline_all_saturated(rtdata, ignore_asdf_paths): rtdata.input, ] ExposurePipeline.from_cmdline(args) + + # get truth file rtdata.get_truth(f"truth/WFI/image/{output}") + return rtdata - diff = compare_asdf(rtdata.output, rtdata.truth, **ignore_asdf_paths) + +@pytest.fixture(scope="module") +def all_saturated_model(run_all_saturated): + with rdm.open(run_all_saturated.output) as model: + yield model + + +def test_all_saturated_against_truth(run_all_saturated, ignore_asdf_paths): + diff = compare_asdf( + run_all_saturated.output, run_all_saturated.truth, **ignore_asdf_paths + ) assert diff.identical, diff.report() - # Ensure step completion is as expected - with rdm.open(rtdata.output) as model: - assert model.meta.cal_step.dq_init == "COMPLETE" - assert model.meta.cal_step.saturation == "COMPLETE" - assert model.meta.cal_step.linearity == "SKIPPED" - assert model.meta.cal_step.dark == "SKIPPED" - assert model.meta.cal_step.jump == "SKIPPED" - assert model.meta.cal_step.ramp_fit == "SKIPPED" - assert model.meta.cal_step.assign_wcs == "SKIPPED" - assert model.meta.cal_step.flat_field == "SKIPPED" - assert model.meta.cal_step.photom == "SKIPPED" + +@pytest.mark.parametrize( + "step_name, status", + [ + ("dq_init", "COMPLETE"), + ("saturation", "COMPLETE"), + ("linearity", "SKIPPED"), + ("dark", "SKIPPED"), + ("ramp_fit", "SKIPPED"), + ("assign_wcs", "SKIPPED"), + ("flat_field", "SKIPPED"), + ("photom", "SKIPPED"), + ("source_detection", "SKIPPED"), + ("tweakreg", "SKIPPED"), + ], +) +def test_all_saturated_status(all_saturated_model, step_name, status): + """ + For an all saturated input the pipeline should skip all steps after saturation. + """ + assert getattr(all_saturated_model.meta.cal_step, step_name) == status + + +def test_all_saturated_model_type(all_saturated_model): + """ + For an all saturated input the output model should be an ImageModel. + """ + assert isinstance(all_saturated_model, ImageModel) + + +@pytest.mark.parametrize("array_name", ["data", "err", "var_poisson", "var_rnoise"]) +def test_all_saturated_zeroed(all_saturated_model, array_name): + """ + For an all saturated input the output model should contain 0s for data and err arrays. + """ + np.testing.assert_array_equal(getattr(all_saturated_model, array_name), 0) def test_pipeline_suffix(rtdata, ignore_asdf_paths): From 7feff8ac13c006b34a521ddcb7b86d7e7d6c0224 Mon Sep 17 00:00:00 2001 From: Brett Date: Wed, 20 Nov 2024 14:05:51 -0500 Subject: [PATCH 11/13] add tests for exposure pipeline input/output --- romancal/pipeline/exposure_pipeline.py | 14 ++++- romancal/pipeline/tests/__init__.py | 0 .../pipeline/tests/test_exposure_pipeline.py | 57 +++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 romancal/pipeline/tests/__init__.py create mode 100644 romancal/pipeline/tests/test_exposure_pipeline.py diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index 6b8f07b1b..f4505522e 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -75,12 +75,16 @@ def process(self, input): # determine the input type file_type = filetype.check(input) + return_lib = True if file_type == "ModelLibrary": lib = input elif file_type == "asn": lib = ModelLibrary(input) else: + # for a non-asn non-library input process it as a library lib = ModelLibrary([input]) + # but return it as a datamodel + return_lib = False # Flag to track if any of the input models are fully saturated any_saturated = False @@ -138,7 +142,15 @@ def process(self, input): log.info("Roman exposure calibration pipeline ending...") - return lib + # return a ModelLibrary + if return_lib: + return lib + + # or a DataModel (for non-asn non-lib inputs) + with lib: + model = lib.borrow(0) + lib.shelve(model) + return model def create_fully_saturated_zeroed_image(self, input_model): """ diff --git a/romancal/pipeline/tests/__init__.py b/romancal/pipeline/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/romancal/pipeline/tests/test_exposure_pipeline.py b/romancal/pipeline/tests/test_exposure_pipeline.py new file mode 100644 index 000000000..5c75fb3ea --- /dev/null +++ b/romancal/pipeline/tests/test_exposure_pipeline.py @@ -0,0 +1,57 @@ +import pytest +import roman_datamodels.datamodels as rdm +import roman_datamodels.maker_utils as mk + +from romancal.associations.asn_from_list import asn_from_list +from romancal.datamodels.library import ModelLibrary +from romancal.pipeline import ExposurePipeline + + +@pytest.fixture(scope="function") +def input_value(request, tmp_path): + match request.param: + case "datamodel_fn": + model = mk.mk_datamodel(rdm.RampModel) + fn = tmp_path / "model.asdf" + model.save(fn) + return fn + case "datamodel": + return mk.mk_datamodel(rdm.RampModel) + case "asn_fn": + model = mk.mk_datamodel(rdm.RampModel) + model.meta.filename = "foo.asdf" + model.save(tmp_path / model.meta.filename) + asn = asn_from_list([model.meta.filename], product_name="foo_out") + base_fn, contents = asn.dump(format="json") + asn_filename = tmp_path / base_fn + with open(asn_filename, "w") as f: + f.write(contents) + return asn_filename + case "library": + return ModelLibrary([mk.mk_datamodel(rdm.RampModel)]) + case value: + raise Exception(f"Invalid parametrization: {value}") + + +@pytest.mark.parametrize( + "input_value, expected_output_type", + [ + ("datamodel_fn", rdm.DataModel), + ("datamodel", rdm.DataModel), + ("asn_fn", ModelLibrary), + ("library", ModelLibrary), + ], + indirect=["input_value"], +) +def test_input_to_output(input_value, expected_output_type): + """ + Test that for a particular input_value (as parametrized indirectly + through the input_value fixtrue) the output is the expected type. + """ + pipeline = ExposurePipeline() + # don't fetch references + pipeline.prefetch_references = False + # skip all steps + [setattr(getattr(pipeline, k), "skip", True) for k in pipeline.step_defs] + output_value = pipeline(input_value) + assert isinstance(output_value, expected_output_type) From 04c2fa0f9090b1499133f3f5e511e2e178341dfd Mon Sep 17 00:00:00 2001 From: Brett Date: Wed, 20 Nov 2024 16:41:42 -0500 Subject: [PATCH 12/13] update changelog --- .../{1525.exposure_pipeline.rst => 1525.exposure_pipeline.0.rst} | 0 changes/1525.exposure_pipeline.1.rst | 1 + 2 files changed, 1 insertion(+) rename changes/{1525.exposure_pipeline.rst => 1525.exposure_pipeline.0.rst} (100%) create mode 100644 changes/1525.exposure_pipeline.1.rst diff --git a/changes/1525.exposure_pipeline.rst b/changes/1525.exposure_pipeline.0.rst similarity index 100% rename from changes/1525.exposure_pipeline.rst rename to changes/1525.exposure_pipeline.0.rst diff --git a/changes/1525.exposure_pipeline.1.rst b/changes/1525.exposure_pipeline.1.rst new file mode 100644 index 000000000..7993dd97a --- /dev/null +++ b/changes/1525.exposure_pipeline.1.rst @@ -0,0 +1 @@ +Fix exposure pipeline handling of all saturated inputs. From b6f3799f9b80e6b1d22bc4e7d6e02205cfd0686c Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 3 Dec 2024 17:23:29 -0500 Subject: [PATCH 13/13] rename source_detection to source_catalog --- romancal/pipeline/exposure_pipeline.py | 4 ++-- romancal/regtest/test_wfi_image_pipeline.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index f4505522e..55d5acac2 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -125,7 +125,7 @@ def process(self, input): log.info("Tweakreg step is being SKIPPED") result.meta.cal_step.flat_field = "SKIPPED" result.meta.cal_step.photom = "SKIPPED" - result.meta.cal_step.source_detection = "SKIPPED" + result.meta.cal_step.source_catalog = "SKIPPED" if any_saturated: # the input association contains a fully saturated model @@ -177,7 +177,7 @@ def create_fully_saturated_zeroed_image(self, input_model): "assign_wcs", "flat_field", "photom", - "source_detection", + "source_catalog", ]: fully_saturated_model.meta.cal_step[step_str] = "SKIPPED" diff --git a/romancal/regtest/test_wfi_image_pipeline.py b/romancal/regtest/test_wfi_image_pipeline.py index bb6d85f2d..0fd85a297 100644 --- a/romancal/regtest/test_wfi_image_pipeline.py +++ b/romancal/regtest/test_wfi_image_pipeline.py @@ -277,7 +277,7 @@ def test_all_saturated_against_truth(run_all_saturated, ignore_asdf_paths): ("assign_wcs", "SKIPPED"), ("flat_field", "SKIPPED"), ("photom", "SKIPPED"), - ("source_detection", "SKIPPED"), + ("source_catalog", "SKIPPED"), ("tweakreg", "SKIPPED"), ], )