Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Orbit-based fake data to a WorkUnit #423

Merged
merged 16 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[build-system]
requires = ["packaging>=21",
"setuptools>=42",
"setuptools_scm[toml]>=6.2",
"setuptools_scm[toml]>=6.2",
"pybind11>=2.10.0"]
build-backend = "setuptools.build_meta"

Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,7 @@ docs =
# pandoc
# pypandoc-binary
ipython
orbits =
pyoorb
tests =
black[jupyter]
2 changes: 2 additions & 0 deletions src/kbmod/fake_orbits/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Do not import modules here.
# Some of the modules have dependencies that are optional.
88 changes: 88 additions & 0 deletions src/kbmod/fake_orbits/insert_fake_orbit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from astropy.wcs import WCS

from kbmod.fake_orbits.pyoorb_helper import PyoorbOrbit
from kbmod.search import ImageStack, LayeredImage, PSF, RawImage
from kbmod.work_unit import WorkUnit


def safe_add_fake_detection(img, x, y, flux):
"""Add a fake detection to a LayeredImage.

Parameters
----------
img : `RawImage` or `LayeredImage`
The image to modify.
x : `float`
The x pixel location of the fake object.
y : `float`
The y pixel location of the fake object.
flux : `float`
The flux value.

Returns
-------
result : `bool`
A result indicating whether the detection was inserted within
the image and on an unmasked pixel.
"""
# Check the pixels are in bounds
if x < 0 or x >= img.get_width():
return False
if y < 0 or y >= img.get_height():
return False

# Check that no mask flags are set.
if img.get_mask().get_pixel(int(y), int(x)) != 0:
return False

sci = img.get_science()
psf = img.get_psf()
dim = psf.get_dim()

initial_x = x - psf.get_radius()
initial_y = y - psf.get_radius()
for i in range(dim):
for j in range(dim):
sci.interpolated_add(float(initial_x + i), float(initial_y + j), flux * psf.get_value(i, j))

return True


def insert_fake_orbit_into_work_unit(worku, orbit, flux, obscode):
"""Insert the predicted positions for an oorb orbit into the WorkUnit images.

Parameters
----------
worku : `WorkUnit`
The WorkUnit to modify.
orbit : `PyoorbOrbit`
The orbit to use.
flux : `float`
The object brightness in the image.
obscode : `str`
The observator code to use for predictions.

Returns
-------
results : `list`
A list of result tuples. If the detection was added to image j, the
results[j] is the (x, y) tuple of its central pixel position. Otherwise
results[j] is None.
"""
mjds = worku.get_all_obstimes()
predicted_pos = orbit.get_ephemerides(mjds, obscode)

results = []
for i in range(len(mjds)):
current_wcs = worku.get_wcs(i)
pixel_loc = current_wcs.world_to_pixel(predicted_pos[i])
x = pixel_loc[0].item()
y = pixel_loc[1].item()

img = worku.im_stack.get_single_image(i)
if safe_add_fake_detection(img, x, y, flux):
results.append((x, y))
else:
results.append(None)

return results
165 changes: 165 additions & 0 deletions src/kbmod/fake_orbits/pyoorb_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import numpy as np
import os
import pyoorb as oo

from astropy.coordinates import SkyCoord


class PyoorbOrbit(object):
"""A helper class for pyoorb that wraps functionality.

Uses an orbit in a fixed representation (Cartesian and UTC) at a fixed epoch (2000)
to simplify predictions and avoid conversions.
"""

_init_run = False

def __init__(self, cart_elements):
"""Create an orbit from cartesian elements.

Attributes
----------
cart_elements : `numpy.ndarray`
An array of orbit elements with the following values.
0 - The integer object id
1 - x
2 - y
3 - z
4 - dx
5 - dy
6 - dz
7 - orbital element type (must be 1 for Cartesian)
8 - Epoch in MJD (must be 51544.5 for MJD2000)
9 - time scale type (must be 1 for UTC)
10 - target absolute magnitude
12 - target slope parameter
"""
if len(cart_elements) != 12:
raise ValueError(f"Wrong number of elements: Expected 12. Found {len(self.cart_elements)})")
if cart_elements[7] != 1:
raise ValueError(f"Non-cartesian coordinates provided {self.cart_elements[7]}")
if cart_elements[8] != 51544.5:
raise ValueError(f"Invalid epoch used {self.cart_elements[8]}")
if cart_elements[9] != 1:
raise ValueError(f"Invalid time scale type {self.cart_elements[9]}")
self.cart_elements = cart_elements
self.orb_array = np.array([cart_elements], dtype=np.double, order="F")

@classmethod
def _safe_initialize(cls):
"""Initialize the pyoorb library. This needs to be done exactly
once for ALL objects.

For the initialization to work, pyoorb either needs to be installed with conda
or the data files need to downloaded manually (see
https://github.com/oorb/oorb/wiki/Installation). If the files were manually downloaded,
specify the path by calling PyoorbOrbit.specify_data_path() or setting the OORB_DATA
environmental variable.
"""
if not PyoorbOrbit._init_run:
if oo.pyoorb.oorb_init() != 0:
print(
"Error: Unable to initialize pyoorb. This may be because the data files\n"
"are missing. To run, pyoorb needs to be installed with conda or the data\n"
"files need to downloaded. See https://github.com/oorb/oorb/wiki/Installation\n"
"for instructions. If the files were manually downloaded, specify the path\n"
"by calling PyoorbOrbit.specify_data_path() or setting the OORB_DATA\n"
"environmental variable. prior to creating the first PyoorbOrbit object."
)
raise Exception("Unable to initialize pyoorb")
PyoorbOrbit._init_run = True

@classmethod
def specify_data_path(cls, data_path):
"""Override the default path for the data files.

Parameters
----------
data_path : `str`
The path to the data files.
"""
data_dir = data_path.rstrip("/")
os.environ["OORB_DATA"] = data_dir

@classmethod
def from_kepler_elements(cls, a, e, i, longnode, argper, mean_anomaly, abs_mag=10.0, slope_g=0.0):
"""Create an orbit from the Keplerian elements."""
cls._safe_initialize()

orbits_kepler = np.array(
[
[
0, # ID Number (unused)
a,
e,
i,
longnode,
argper,
mean_anomaly,
3, # Element types = Keplerian
51544.5, # epoch = MJD2000
1, # time scale = UTC
abs_mag,
slope_g,
]
],
dtype=np.double,
order="F",
)

# Convert the orbit to cartesian coordinates (Note in_element_type is the code for the
# output type).
orbits_cart, err = oo.pyoorb.oorb_element_transformation(in_orbits=orbits_kepler, in_element_type=1)
if err != 0:
raise Exception(f"Error in transformation {err}")

# Create a single
return PyoorbOrbit(orbits_cart[0])

@classmethod
def from_cartesian_elements(cls, x, y, z, dx, dy, dz, abs_mag=10.0, slope_g=0.0):
"""Create an orbit from the Cartesian elements."""
orbit_cart = np.array(
[
0, # ID Number (unused)
x,
y,
z,
dx,
dy,
dz,
1, # Element types = Cartesian
51544.5, # epoch = MJD2000
1, # time scale = UTC
abs_mag,
slope_g,
],
dtype=np.double,
order="F",
)
return PyoorbOrbit(orbit_cart)

def get_ephemerides(self, mjds, obscode):
"""Compute the object's position at given times.

Parameters
----------
mjds : `list` or `numpy.ndarray`
Observation times in MJD.
obscode : `str`
The observatory code.

Returns
-------
A list of SkyCoord (one for each time) of the predicted sky positions.
"""
timescales = [1] * len(mjds)
ephem_dates = np.array(list(zip(mjds, timescales)), dtype=np.double, order="F")

ephs, err = oo.pyoorb.oorb_ephemeris_basic(
in_orbits=self.orb_array, in_dynmodel="N", in_obscode=obscode, in_date_ephems=ephem_dates
)
if err != 0:
raise Exception(f"Error in ephem {err}")

return [SkyCoord(ephs[0, i, 1], ephs[0, i, 2], unit="deg") for i in range(len(mjds))]
4 changes: 4 additions & 0 deletions src/kbmod/work_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def get_wcs(self, img_num):

return self.per_image_wcs[img_num]

def get_all_obstimes(self):
"""Return a list of the observation times."""
return [self.im_stack.get_obstime(i) for i in range(self.im_stack.img_count())]

@classmethod
def from_fits(cls, filename):
"""Create a WorkUnit from a single FITS file.
Expand Down
77 changes: 77 additions & 0 deletions tests/manual_test_fake_orbits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""This file tests the functions in src/kbmod/fake_orbits. Since those files
include optional dependencies and data files, we do not include them as
part of the standard test suite.

TODO: Set up the test suite to pull in the optional data files.
"""

from astropy.wcs import WCS

import kbmod.search as kb

from kbmod.configuration import SearchConfiguration
from kbmod.fake_orbits.insert_fake_orbit import insert_fake_orbit_into_work_unit
from kbmod.fake_orbits.pyoorb_helper import PyoorbOrbit
from kbmod.work_unit import WorkUnit

# Set the image parameters.
width = 500
height = 700
num_images = 20
obs_cadence = 5

# If you have manually downloaded the data files, you need to add:
# PyoorbOrbit.specify_data_path(my_data_dir)
my_orb = PyoorbOrbit.from_kepler_elements(40.0, 0.05, 0.1, 0.0, 0.0, 0.0)

# Create time steps spaced 'obs_cadence' days apart.
times = [55144.5 + obs_cadence * i for i in range(num_images)]

# Predict the observations in (RA, dec) at those times.
# Using Gemini South as an observatory.
predicted_pos = my_orb.get_ephemerides(times, "I11")

# Create a fake WCS centered near where the object will be on 11th
# observation.
header_dict = {
"WCSAXES": 2,
"CTYPE1": "RA---TAN-SIP",
"CTYPE2": "DEC--TAN-SIP",
"CRVAL1": predicted_pos[10].ra.value,
"CRVAL2": predicted_pos[10].dec.value,
"CRPIX1": width / 2,
"CRPIX2": height / 2,
"CDELT1": 0.001,
"CDELT2": 0.001,
}
wcs = WCS(header_dict)

# Create the fake images. Set slightly different PSFs per image.
images = [None] * num_images
for i in range(num_images):
images[i] = kb.LayeredImage(
("layered_test_%i" % i),
width,
height,
2.0, # noise_level
4.0, # variance
times[i],
kb.PSF(1.0 + 0.01 * i),
)
im_stack = kb.ImageStack(images)

# Create the WorkUnit with a default config.
config = SearchConfiguration()
work = WorkUnit(im_stack, config, wcs)

# Compute the pixel positions
results = insert_fake_orbit_into_work_unit(work, my_orb, 100.0, "I11")
print("RESULTS:")
for i in range(len(results)):
# Do basic bounds checking.
if results[i] is not None:
assert results[i][0] >= 0
assert results[i][1] >= 0
assert results[i][0] < width
assert results[i][1] < height
print(f" {i}: {results[i]}")
Loading