-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
418 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
""" Test various utility functions """ | ||
import pytest | ||
|
||
from astropy import wcs as fitswcs | ||
import numpy as np | ||
|
||
from . helpers import make_gwcs | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def wcs_gwcs(): | ||
crval = (150.0, 2.0) | ||
crpix = (500.0, 500.0) | ||
shape = (1000, 1000) | ||
pscale = 0.06 / 3600 | ||
return make_gwcs(crpix, crval, pscale, shape) | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def wcs_fitswcs(wcs_gwcs): | ||
fits_wcs = fitswcs.WCS(wcs_gwcs.to_fits_sip()) | ||
fits_wcs.pixel_area = wcs_gwcs.pixel_area | ||
fits_wcs.pixel_scale = wcs_gwcs.pixel_scale | ||
return fits_wcs | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def wcs_slicedwcs(wcs_gwcs): | ||
xmin, xmax = 100, 500 | ||
slices = (slice(xmin, xmax), slice(xmin, xmax)) | ||
sliced_wcs = fitswcs.wcsapi.SlicedLowLevelWCS(wcs_gwcs, slices) | ||
sliced_wcs.pixel_area = wcs_gwcs.pixel_area | ||
sliced_wcs.pixel_scale = wcs_gwcs.pixel_scale | ||
return sliced_wcs | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
from astropy.nddata.bitmask import BitFlagNameMap | ||
from astropy import coordinates as coord | ||
from astropy.modeling import models as astmodels | ||
|
||
from gwcs import coordinate_frames as cf | ||
from gwcs.wcstools import wcs_from_fiducial | ||
import numpy as np | ||
|
||
from stcal.alignment import compute_s_region_imaging | ||
|
||
|
||
class JWST_DQ_FLAG_DEF(BitFlagNameMap): | ||
DO_NOT_USE = 1 | ||
SATURATED = 2 | ||
JUMP_DET = 4 | ||
|
||
|
||
def make_gwcs(crpix, crval, pscale, shape): | ||
""" Simulate a gwcs from FITS WCS parameters. | ||
crpix - tuple of floats | ||
crval - tuple of floats (RA, DEC) | ||
pscale - pixel scale in degrees | ||
shape - array shape (numpy's convention) | ||
""" | ||
prj = astmodels.Pix2Sky_TAN() | ||
fiducial = np.array(crval) | ||
|
||
pc = np.array([[-1., 0.], [0., 1.]]) | ||
pc_matrix = astmodels.AffineTransformation2D(pc, name='pc_rotation_matrix') | ||
scale = (astmodels.Scale(pscale, name='cdelt1') & | ||
astmodels.Scale(pscale, name='cdelt2')) | ||
transform = pc_matrix | scale | ||
|
||
out_frame = cf.CelestialFrame( | ||
name='world', | ||
axes_names=('lon', 'lat'), | ||
reference_frame=coord.ICRS() | ||
) | ||
input_frame = cf.Frame2D(name="detector") | ||
wnew = wcs_from_fiducial( | ||
fiducial, | ||
coordinate_frame=out_frame, | ||
projection=prj, | ||
transform=transform, | ||
input_frame=input_frame | ||
) | ||
|
||
output_bounding_box = ( | ||
(-0.5, float(shape[1]) - 0.5), | ||
(-0.5, float(shape[0]) - 0.5) | ||
) | ||
offset1, offset2 = crpix | ||
offsets = (astmodels.Shift(-offset1, name='crpix1') & | ||
astmodels.Shift(-offset2, name='crpix2')) | ||
|
||
wnew.insert_transform('detector', offsets, after=True) | ||
wnew.bounding_box = output_bounding_box | ||
|
||
tr = wnew.pipeline[0].transform | ||
pix_area = ( | ||
np.deg2rad(tr['cdelt1'].factor.value) * | ||
np.deg2rad(tr['cdelt2'].factor.value) | ||
) | ||
|
||
wnew.pixel_area = pix_area | ||
wnew.pixel_scale = pscale | ||
wnew.pixel_shape = shape[::-1] | ||
wnew.array_shape = shape | ||
|
||
return wnew | ||
|
||
|
||
def make_input_model(crpix, crval, pscale, shape, group_id=1, exptime=1): | ||
w = make_gwcs( | ||
crpix=crpix, | ||
crval=crval, | ||
pscale=pscale, | ||
shape=shape | ||
) | ||
|
||
model = { | ||
"data": np.zeros(shape, dtype=np.float32), | ||
"dq": np.zeros(shape, dtype=np.int32), | ||
|
||
# meta: | ||
"filename": "", | ||
"group_id": group_id, | ||
"s_region": compute_s_region_imaging(w), | ||
"wcs": w, | ||
"bunit_data": "MJy", | ||
|
||
"exposure_time": exptime, | ||
"start_time": 0.0, | ||
"end_time": exptime, | ||
"duration": exptime, | ||
"measurement_time": exptime, | ||
"effective_exposure_time": exptime, | ||
"elapsed_exposure_time": exptime, | ||
|
||
"pixelarea_steradians": w.pixel_area, | ||
"pixelarea_arcsecsq": w.pixel_area * (np.rad2deg(1) * 3600)**2, | ||
|
||
"level": 0.0, # sky level | ||
"subtracted": False, | ||
} | ||
|
||
for arr in ["var_flat", "var_rnoise", "var_poisson"]: | ||
model[arr] = np.ones(shape, dtype=np.float32) | ||
|
||
model["err"] = np.sqrt(3.0) * np.ones(shape, dtype=np.float32) | ||
|
||
return model |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from stcal.resample import Resample | ||
|
||
import numpy as np | ||
|
||
from . helpers import make_gwcs, make_input_model, JWST_DQ_FLAG_DEF | ||
|
||
|
||
def test_resample_defaults(): | ||
crval = (150.0, 2.0) | ||
crpix = (500.0, 500.0) | ||
shape = (1000, 1000) | ||
pscale = 0.06 / 3600 | ||
|
||
output_wcs = make_gwcs( | ||
crpix=(600, 600), | ||
crval=crval, | ||
pscale=pscale, | ||
shape=(1200, 1200) | ||
) | ||
|
||
nmodels = 4 | ||
|
||
resample = Resample(n_input_models=nmodels, output_wcs=output_wcs) | ||
resample.dq_flag_name_map = JWST_DQ_FLAG_DEF | ||
|
||
influx = 0.0 | ||
ttime = 0.0 | ||
|
||
for k in range(nmodels): | ||
im = make_input_model( | ||
crpix=tuple(i - 6 * k for i in crpix), | ||
crval=crval, | ||
pscale=pscale, | ||
shape=shape, | ||
group_id=k + 1, | ||
exptime=k + 1 | ||
) | ||
im["data"][:, :] = k + 0.5 | ||
influx += k + 0.5 | ||
ttime += k + 1 | ||
|
||
resample.add_model(im) | ||
|
||
resample.finalize() | ||
|
||
odata = resample.output_model["data"] | ||
oweight = resample.output_model["wht"] | ||
|
||
assert resample.output_model["pointings"] == nmodels | ||
assert resample.output_model["exposure_time"] == ttime | ||
|
||
# next assert assumes constant IVM | ||
assert abs(np.sum(odata * oweight) - influx * np.prod(shape)) < 1.0e-6 | ||
|
||
assert np.nansum(resample.output_model["var_flat"]) > 0.0 | ||
assert np.nansum(resample.output_model["var_poisson"]) > 0.0 | ||
assert np.nansum(resample.output_model["var_rnoise"]) > 0.0 | ||
Oops, something went wrong.