diff --git a/docs/api-reference/irf/benchmarks.rst b/docs/api-reference/irf/benchmarks.rst
new file mode 100644
index 00000000000..c63701ef4dd
--- /dev/null
+++ b/docs/api-reference/irf/benchmarks.rst
@@ -0,0 +1,12 @@
+.. _benchmarks:
+
+**********
+Benchmarks
+**********
+
+
+Reference/ API
+==============
+
+.. automodapi:: ctapipe.irf.benchmarks
+ :no-inheritance-diagram:
diff --git a/docs/api-reference/irf/binning.rst b/docs/api-reference/irf/binning.rst
new file mode 100644
index 00000000000..cff1d01ebf3
--- /dev/null
+++ b/docs/api-reference/irf/binning.rst
@@ -0,0 +1,12 @@
+.. _binning:
+
+*******
+Binning
+*******
+
+
+Reference/ API
+==============
+
+.. automodapi:: ctapipe.irf.binning
+ :no-inheritance-diagram:
diff --git a/docs/api-reference/irf/index.rst b/docs/api-reference/irf/index.rst
new file mode 100644
index 00000000000..a8f2cc41f3b
--- /dev/null
+++ b/docs/api-reference/irf/index.rst
@@ -0,0 +1,44 @@
+.. _irf:
+
+**********************************************
+Instrument Response Functions (`~ctapipe.irf`)
+**********************************************
+
+.. currentmodule:: ctapipe.irf
+
+This module contains functionalities for generating instrument response functions.
+The simulated events used for this have to be selected based on their associated "gammaness"
+value and (optionally) their reconstructed angular offset from their point of origin.
+The code for doing this can found in :ref:`cut_optimization` and is intended for use via the
+`~ctapipe.tools.optimize_event_selection.EventSelectionOptimizer` tool.
+
+The generation of the irf components themselves is implemented in :ref:`irfs` and is intended for
+use via the `~ctapipe.tools.compute_irf.IrfTool` tool.
+This tool can optionally also compute some common benchmarks, which are implemented in :ref:`benchmarks`.
+
+The cut optimization as well as the calculations of the irf components and the benchmarks
+are done using the `pyirf `_ package.
+
+:ref:`binning`, :ref:`preprocessing`, and :ref:`spectra` contain helper functions and classes used by many of the
+other components in this module.
+
+
+Submodules
+==========
+
+.. toctree::
+ :maxdepth: 1
+
+ optimize
+ irfs
+ benchmarks
+ binning
+ preprocessing
+ spectra
+
+
+Reference/API
+=============
+
+.. automodapi:: ctapipe.irf
+ :no-inheritance-diagram:
diff --git a/docs/api-reference/irf/irfs.rst b/docs/api-reference/irf/irfs.rst
new file mode 100644
index 00000000000..9755f91ec70
--- /dev/null
+++ b/docs/api-reference/irf/irfs.rst
@@ -0,0 +1,12 @@
+.. _irfs:
+
+**************
+IRF components
+**************
+
+
+Reference/ API
+==============
+
+.. automodapi:: ctapipe.irf.irfs
+ :no-inheritance-diagram:
diff --git a/docs/api-reference/irf/optimize.rst b/docs/api-reference/irf/optimize.rst
new file mode 100644
index 00000000000..ad47192f8eb
--- /dev/null
+++ b/docs/api-reference/irf/optimize.rst
@@ -0,0 +1,12 @@
+.. _cut_optimization:
+
+********************************
+G/H (and Theta) Cut Optimization
+********************************
+
+
+Reference/ API
+==============
+
+.. automodapi:: ctapipe.irf.optimize
+ :no-inheritance-diagram:
diff --git a/docs/api-reference/irf/preprocessing.rst b/docs/api-reference/irf/preprocessing.rst
new file mode 100644
index 00000000000..9d57445cbe3
--- /dev/null
+++ b/docs/api-reference/irf/preprocessing.rst
@@ -0,0 +1,12 @@
+.. _preprocessing:
+
+*******************************
+Event Loading and Preprocessing
+*******************************
+
+
+Reference/ API
+==============
+
+.. automodapi:: ctapipe.irf.preprocessing
+ :no-inheritance-diagram:
diff --git a/docs/api-reference/irf/spectra.rst b/docs/api-reference/irf/spectra.rst
new file mode 100644
index 00000000000..08165e9acd5
--- /dev/null
+++ b/docs/api-reference/irf/spectra.rst
@@ -0,0 +1,12 @@
+.. _spectra:
+
+*************************************
+Spectra definitions for event weights
+*************************************
+
+
+Reference/ API
+==============
+
+.. automodapi:: ctapipe.irf.spectra
+ :no-inheritance-diagram:
diff --git a/docs/api-reference/tools/index.rst b/docs/api-reference/tools/index.rst
index 25efbb905e8..ad122d76dc1 100644
--- a/docs/api-reference/tools/index.rst
+++ b/docs/api-reference/tools/index.rst
@@ -94,7 +94,13 @@ Reference/API
:no-inheritance-diagram:
.. automodapi:: ctapipe.tools.train_disp_reconstructor
- :no-inheritance-diagram:
+ :no-inheritance-diagram:
.. automodapi:: ctapipe.tools.apply_models
:no-inheritance-diagram:
+
+.. automodapi:: ctapipe.tools.optimize_event_selection
+ :no-inheritance-diagram:
+
+.. automodapi:: ctapipe.tools.compute_irf
+ :no-inheritance-diagram:
diff --git a/docs/changes/2473.feature.rst b/docs/changes/2473.feature.rst
new file mode 100644
index 00000000000..cfcfd923862
--- /dev/null
+++ b/docs/changes/2473.feature.rst
@@ -0,0 +1,17 @@
+Add a ``ctapipe-optimize-event-selection`` tool to produce cut-selection files,
+based on a gamma, and optionally a proton and an electron DL2 file.
+Two components for calculating G/H and optionally theta cuts are added:
+``PercentileCuts`` keeps a certain percentage of gamma events in each bin and
+``PointSourceSensitivityOptimizer`` optimizes G/H cuts for maximum point source sensitivity and
+optionally calculates percentile theta cuts.
+
+Add a ``ctapipe-compute-irf`` tool to produce irfs given a cut-selection file, a gamma,
+and optionally a proton, and an electron DL2 input file.
+Given only a gamma file, the energy dispersion, effective area, and point spread function are calculated.
+Optionally, the bias and resolution of the energy reconstruction and the angular resolution can be calculated
+and saved in a separate output file.
+If a proton or a proton and an electron file is also given, a background model can be calculated,
+as well as the point source sensitivity.
+
+Both, full enclosure and point-like irf can be calculated.
+Only radially symmetric parameterizations of the irf components are implemented so far.
diff --git a/docs/conf.py b/docs/conf.py
index 378c02c3342..d7271896e7c 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -118,6 +118,7 @@ def setup(app):
("py:class", "t.Type"),
("py:class", "t.List"),
("py:class", "t.Tuple"),
+ ("py:class", "t.Sequence"),
("py:class", "Config"),
("py:class", "traitlets.config.configurable.Configurable"),
("py:class", "traitlets.traitlets.HasTraits"),
@@ -132,35 +133,34 @@ def setup(app):
("py:class", "traitlets.config.application.Application"),
("py:class", "traitlets.utils.sentinel.Sentinel"),
("py:class", "traitlets.traitlets.ObserveHandler"),
+ ("py:class", "traitlets.traitlets.T"),
+ ("py:class", "traitlets.traitlets.G"),
+ ("py:class", "Sentinel"),
+ ("py:class", "ObserveHandler"),
("py:class", "dict[K, V]"),
("py:class", "G"),
("py:class", "K"),
("py:class", "V"),
- ("py:class", "t.Sequence"),
("py:class", "StrDict"),
("py:class", "ClassesType"),
- ("py:class", "traitlets.traitlets.G"),
+ ("py:class", "re.Pattern"),
+ ("py:class", "re.Pattern[t.Any]"),
+ ("py:class", "astropy.coordinates.baseframe.BaseCoordinateFrame"),
+ ("py:class", "astropy.table.table.Table"),
+ ("py:class", "eventio.simtel.simtelfile.SimTelFile"),
+ ("py:class", "ctapipe.compat.StrEnum"),
+ ("py:class", "ctapipe.compat.StrEnum"),
+ ("py:obj", "traitlets.traitlets.T"),
("py:obj", "traitlets.traitlets.G"),
("py:obj", "traitlets.traitlets.S"),
- ("py:obj", "traitlets.traitlets.T"),
- ("py:class", "traitlets.traitlets.T"),
- ("py:class", "re.Pattern[t.Any]"),
- ("py:class", "re.Pattern"),
- ("py:class", "Sentinel"),
- ("py:class", "ObserveHandler"),
("py:obj", "traitlets.config.boolean_flag"),
("py:obj", "traitlets.TraitError"),
("py:obj", "-v"), # fix for wrong syntax in a traitlets docstring
+ ("py:obj", "cls"),
+ ("py:obj", "name"),
("py:meth", "MetaHasDescriptors.__init__"),
("py:meth", "HasTraits.__new__"),
("py:meth", "BaseDescriptor.instance_init"),
- ("py:obj", "cls"),
- ("py:obj", "name"),
- ("py:class", "astropy.coordinates.baseframe.BaseCoordinateFrame"),
- ("py:class", "astropy.table.table.Table"),
- ("py:class", "eventio.simtel.simtelfile.SimTelFile"),
- ("py:class", "ctapipe.compat.StrEnum"),
- ("py:class", "ctapipe.compat.StrEnum"),
]
# Sphinx gallery config
@@ -404,6 +404,7 @@ def setup(app):
"numpy": ("https://numpy.org/doc/stable", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/stable", None),
"psutil": ("https://psutil.readthedocs.io/en/stable", None),
+ "pyirf": ("https://pyirf.readthedocs.io/en/stable/", None),
"pytables": ("https://www.pytables.org", None),
"pytest": ("https://docs.pytest.org/en/stable", None),
"python": ("https://docs.python.org/3", None),
diff --git a/environment.yml b/environment.yml
index e640533a52f..304a115cb6c 100644
--- a/environment.yml
+++ b/environment.yml
@@ -25,6 +25,7 @@ dependencies:
- pypandoc
- pre-commit
- psutil
+ - pyirf
- pytables
- pytest
- pytest-cov
diff --git a/pyproject.toml b/pyproject.toml
index 83fb3ab6a25..9739ff3f757 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -55,6 +55,7 @@ all = [
"eventio >=1.9.1,<2.0.0a0",
"iminuit >=2",
"matplotlib ~=3.0",
+ "pyirf ~=0.12.0"
]
tests = [
@@ -104,6 +105,8 @@ ctapipe-dump-instrument = "ctapipe.tools.dump_instrument:main"
ctapipe-display-dl1 = "ctapipe.tools.display_dl1:main"
ctapipe-process = "ctapipe.tools.process:main"
ctapipe-merge = "ctapipe.tools.merge:main"
+ctapipe-optimize-event-selection = "ctapipe.tools.optimize_event_selection:main"
+ctapipe-compute-irf = "ctapipe.tools.compute_irf:main"
ctapipe-fileinfo = "ctapipe.tools.fileinfo:main"
ctapipe-quickstart = "ctapipe.tools.quickstart:main"
ctapipe-train-energy-regressor = "ctapipe.tools.train_energy_regressor:main"
diff --git a/src/ctapipe/conftest.py b/src/ctapipe/conftest.py
index d2a01331aad..ef133d771ea 100644
--- a/src/ctapipe/conftest.py
+++ b/src/ctapipe/conftest.py
@@ -1,6 +1,9 @@
"""
common pytest fixtures for tests in ctapipe
"""
+
+import importlib
+import json
import shutil
from copy import deepcopy
@@ -9,7 +12,7 @@
import pytest
import tables
from astropy.coordinates import EarthLocation
-from astropy.table import Table
+from astropy.table import Column, QTable, Table, hstack, vstack
from pytest_astropy_header.display import PYTEST_HEADER_MODULES
from ctapipe.core import run_tool
@@ -42,6 +45,10 @@
"VERITAS",
"Whipple490",
]
+collect_ignore = []
+
+if importlib.util.find_spec("pyirf") is None:
+ collect_ignore.append("irf")
@pytest.fixture(scope="function", params=camera_names)
@@ -707,3 +714,136 @@ def provenance(monkeypatch):
monkeypatch.setattr(prov, "_activities", [])
monkeypatch.setattr(prov, "_finished_activities", [])
return prov
+
+
+@pytest.fixture(scope="session")
+def irf_tmp_path(tmp_path_factory):
+ return tmp_path_factory.mktemp("irf")
+
+
+@pytest.fixture(scope="session")
+def gamma_diffuse_full_reco_file(
+ gamma_train_clf,
+ particle_classifier_path,
+ irf_tmp_path,
+):
+ """
+ Energy reconstruction and geometric origin reconstruction have already been done.
+ """
+ from ctapipe.tools.apply_models import ApplyModels
+
+ output_path = irf_tmp_path / "gamma_diffuse_full_reco.dl2.h5"
+ run_tool(
+ ApplyModels(),
+ argv=[
+ f"--input={gamma_train_clf}",
+ f"--output={output_path}",
+ f"--reconstructor={particle_classifier_path}",
+ "--no-dl1-parameters",
+ "--StereoMeanCombiner.weights=konrad",
+ ],
+ raises=True,
+ )
+ return output_path
+
+
+@pytest.fixture(scope="session")
+def proton_full_reco_file(
+ proton_train_clf,
+ particle_classifier_path,
+ irf_tmp_path,
+):
+ """
+ Energy reconstruction and geometric origin reconstruction have already been done.
+ """
+ from ctapipe.tools.apply_models import ApplyModels
+
+ output_path = irf_tmp_path / "proton_full_reco.dl2.h5"
+ run_tool(
+ ApplyModels(),
+ argv=[
+ f"--input={proton_train_clf}",
+ f"--output={output_path}",
+ f"--reconstructor={particle_classifier_path}",
+ "--no-dl1-parameters",
+ "--StereoMeanCombiner.weights=konrad",
+ ],
+ raises=True,
+ )
+ return output_path
+
+
+@pytest.fixture(scope="session")
+def irf_event_loader_test_config():
+ from traitlets.config import Config
+
+ return Config(
+ {
+ "EventPreProcessor": {
+ "energy_reconstructor": "ExtraTreesRegressor",
+ "geometry_reconstructor": "HillasReconstructor",
+ "gammaness_classifier": "ExtraTreesClassifier",
+ "quality_criteria": [
+ (
+ "multiplicity 4",
+ "np.count_nonzero(HillasReconstructor_telescopes,axis=1) >= 4",
+ ),
+ ("valid classifier", "ExtraTreesClassifier_is_valid"),
+ ("valid geom reco", "HillasReconstructor_is_valid"),
+ ("valid energy reco", "ExtraTreesRegressor_is_valid"),
+ ],
+ }
+ }
+ )
+
+
+@pytest.fixture(scope="session")
+def event_loader_config_path(irf_event_loader_test_config, irf_tmp_path):
+ config_path = irf_tmp_path / "event_loader_config.json"
+ with config_path.open("w") as f:
+ json.dump(irf_event_loader_test_config, f)
+
+ return config_path
+
+
+@pytest.fixture(scope="session")
+def irf_events_table():
+ from ctapipe.irf import EventPreProcessor
+
+ N1 = 1000
+ N2 = 100
+ N = N1 + N2
+ epp = EventPreProcessor()
+ tab = epp.make_empty_table()
+
+ ids, bulk, unitless = tab.colnames[:2], tab.colnames[2:-2], tab.colnames[-2:]
+
+ id_tab = QTable(
+ data=np.zeros((N, len(ids)), dtype=np.uint64),
+ names=ids,
+ units={c: tab[c].unit for c in ids},
+ )
+ bulk_tab = QTable(
+ data=np.zeros((N, len(bulk))) * np.nan,
+ names=bulk,
+ units={c: tab[c].unit for c in bulk},
+ )
+
+ # Setting values following pyirf test in pyirf/irf/tests/test_background.py
+ bulk_tab["reco_energy"] = np.append(np.full(N1, 1), np.full(N2, 2)) * u.TeV
+ bulk_tab["true_energy"] = np.append(np.full(N1, 0.9), np.full(N2, 2.1)) * u.TeV
+ bulk_tab["reco_source_fov_offset"] = (
+ np.append(np.full(N1, 0.1), np.full(N2, 0.05)) * u.deg
+ )
+ bulk_tab["true_source_fov_offset"] = (
+ np.append(np.full(N1, 0.11), np.full(N2, 0.04)) * u.deg
+ )
+ for name in unitless:
+ bulk_tab.add_column(
+ Column(name=name, unit=tab[name].unit, data=np.zeros(N) * np.nan)
+ )
+
+ e_tab = hstack([id_tab, bulk_tab])
+
+ ev = vstack([e_tab, tab], join_type="exact", metadata_conflicts="silent")
+ return ev
diff --git a/src/ctapipe/irf/__init__.py b/src/ctapipe/irf/__init__.py
new file mode 100644
index 00000000000..ad2a7d74ea5
--- /dev/null
+++ b/src/ctapipe/irf/__init__.py
@@ -0,0 +1,59 @@
+"""Top level module for the irf functionality"""
+
+from importlib.util import find_spec
+
+if find_spec("pyirf") is None:
+ from ..exceptions import OptionalDependencyMissing
+
+ raise OptionalDependencyMissing("pyirf") from None
+
+
+from .benchmarks import (
+ AngularResolution2dMaker,
+ EnergyBiasResolution2dMaker,
+ Sensitivity2dMaker,
+)
+from .binning import (
+ ResultValidRange,
+ check_bins_in_range,
+ make_bins_per_decade,
+)
+from .irfs import (
+ BackgroundRate2dMaker,
+ EffectiveArea2dMaker,
+ EnergyDispersion2dMaker,
+ Psf3dMaker,
+)
+from .optimize import (
+ GhPercentileCutCalculator,
+ OptimizationResult,
+ PercentileCuts,
+ PointSourceSensitivityOptimizer,
+ ThetaPercentileCutCalculator,
+)
+from .preprocessing import EventLoader, EventPreProcessor
+from .spectra import ENERGY_FLUX_UNIT, FLUX_UNIT, SPECTRA, Spectra
+
+__all__ = [
+ "AngularResolution2dMaker",
+ "EnergyBiasResolution2dMaker",
+ "Sensitivity2dMaker",
+ "Psf3dMaker",
+ "BackgroundRate2dMaker",
+ "EnergyDispersion2dMaker",
+ "EffectiveArea2dMaker",
+ "ResultValidRange",
+ "OptimizationResult",
+ "PointSourceSensitivityOptimizer",
+ "PercentileCuts",
+ "EventLoader",
+ "EventPreProcessor",
+ "Spectra",
+ "GhPercentileCutCalculator",
+ "ThetaPercentileCutCalculator",
+ "SPECTRA",
+ "ENERGY_FLUX_UNIT",
+ "FLUX_UNIT",
+ "check_bins_in_range",
+ "make_bins_per_decade",
+]
diff --git a/src/ctapipe/irf/benchmarks.py b/src/ctapipe/irf/benchmarks.py
new file mode 100644
index 00000000000..d336fe8d508
--- /dev/null
+++ b/src/ctapipe/irf/benchmarks.py
@@ -0,0 +1,293 @@
+"""Components to generate benchmarks"""
+
+from abc import abstractmethod
+
+import astropy.units as u
+import numpy as np
+from astropy.io.fits import BinTableHDU, Header
+from astropy.table import QTable
+from pyirf.benchmarks import angular_resolution, energy_bias_resolution
+from pyirf.binning import calculate_bin_indices, create_histogram_table, split_bin_lo_hi
+from pyirf.sensitivity import calculate_sensitivity, estimate_background
+
+from ..core.traits import Bool, Float
+from .binning import DefaultFoVOffsetBins, DefaultRecoEnergyBins, DefaultTrueEnergyBins
+from .spectra import ENERGY_FLUX_UNIT, FLUX_UNIT, SPECTRA, Spectra
+
+__all__ = [
+ "EnergyBiasResolutionMakerBase",
+ "EnergyBiasResolution2dMaker",
+ "AngularResolutionMakerBase",
+ "AngularResolution2dMaker",
+ "SensitivityMakerBase",
+ "Sensitivity2dMaker",
+]
+
+
+def _get_2d_result_table(
+ events: QTable, e_bins: u.Quantity, fov_bins: u.Quantity
+) -> tuple[QTable, np.ndarray, tuple[int, int]]:
+ result = QTable()
+ result["ENERG_LO"], result["ENERG_HI"] = split_bin_lo_hi(
+ e_bins[np.newaxis, :].to(u.TeV)
+ )
+ result["THETA_LO"], result["THETA_HI"] = split_bin_lo_hi(
+ fov_bins[np.newaxis, :].to(u.deg)
+ )
+ fov_bin_index, _ = calculate_bin_indices(events["true_source_fov_offset"], fov_bins)
+ mat_shape = (len(fov_bins) - 1, len(e_bins) - 1)
+ return result, fov_bin_index, mat_shape
+
+
+class EnergyBiasResolutionMakerBase(DefaultTrueEnergyBins):
+ """
+ Base class for calculating the bias and resolution of the energy prediciton.
+ """
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ @abstractmethod
+ def make_bias_resolution_hdu(
+ self, events: QTable, extname: str = "ENERGY BIAS RESOLUTION"
+ ) -> BinTableHDU:
+ """
+ Calculate the bias and resolution of the energy prediction.
+
+ Parameters
+ ----------
+ events: astropy.table.QTable
+ Reconstructed events to be used.
+ extname: str
+ Name of the BinTableHDU.
+
+ Returns
+ -------
+ BinTableHDU
+ """
+
+
+class EnergyBiasResolution2dMaker(EnergyBiasResolutionMakerBase, DefaultFoVOffsetBins):
+ """
+ Calculates the bias and the resolution of the energy prediction in bins of
+ true energy and fov offset.
+ """
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ def make_bias_resolution_hdu(
+ self, events: QTable, extname: str = "ENERGY BIAS RESOLUTION"
+ ) -> BinTableHDU:
+ result, fov_bin_idx, mat_shape = _get_2d_result_table(
+ events=events,
+ e_bins=self.true_energy_bins,
+ fov_bins=self.fov_offset_bins,
+ )
+ result["N_EVENTS"] = np.zeros(mat_shape)[np.newaxis, ...]
+ result["BIAS"] = np.full(mat_shape, np.nan)[np.newaxis, ...]
+ result["RESOLUTION"] = np.full(mat_shape, np.nan)[np.newaxis, ...]
+
+ for i in range(len(self.fov_offset_bins) - 1):
+ bias_resolution = energy_bias_resolution(
+ events=events[fov_bin_idx == i],
+ energy_bins=self.true_energy_bins,
+ bias_function=np.mean,
+ energy_type="true",
+ )
+ result["N_EVENTS"][:, i, :] = bias_resolution["n_events"]
+ result["BIAS"][:, i, :] = bias_resolution["bias"]
+ result["RESOLUTION"][:, i, :] = bias_resolution["resolution"]
+
+ return BinTableHDU(result, name=extname)
+
+
+class AngularResolutionMakerBase(DefaultTrueEnergyBins, DefaultRecoEnergyBins):
+ """
+ Base class for calculating the angular resolution.
+ """
+
+ # Use reconstructed energy by default for the sake of current pipeline comparisons
+ use_true_energy = Bool(
+ False,
+ help="Use true energy instead of reconstructed energy for energy binning.",
+ ).tag(config=True)
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ @abstractmethod
+ def make_angular_resolution_hdu(
+ self, events: QTable, extname: str = "ANGULAR RESOLUTION"
+ ) -> BinTableHDU:
+ """
+ Calculate the angular resolution.
+
+ Parameters
+ ----------
+ events: astropy.table.QTable
+ Reconstructed events to be used.
+ extname: str
+ Name of the BinTableHDU.
+
+ Returns
+ -------
+ BinTableHDU
+ """
+
+
+class AngularResolution2dMaker(AngularResolutionMakerBase, DefaultFoVOffsetBins):
+ """
+ Calculates the angular resolution in bins of either true or reconstructed energy
+ and fov offset.
+ """
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ def make_angular_resolution_hdu(
+ self, events: QTable, extname: str = "ANGULAR RESOLUTION"
+ ) -> BinTableHDU:
+ if self.use_true_energy:
+ e_bins = self.true_energy_bins
+ energy_type = "true"
+ else:
+ e_bins = self.reco_energy_bins
+ energy_type = "reco"
+
+ result, fov_bin_idx, mat_shape = _get_2d_result_table(
+ events=events,
+ e_bins=e_bins,
+ fov_bins=self.fov_offset_bins,
+ )
+ result["N_EVENTS"] = np.zeros(mat_shape)[np.newaxis, ...]
+ result["ANGULAR_RESOLUTION"] = u.Quantity(
+ np.full(mat_shape, np.nan)[np.newaxis, ...], events["theta"].unit
+ )
+
+ for i in range(len(self.fov_offset_bins) - 1):
+ ang_res = angular_resolution(
+ events=events[fov_bin_idx == i],
+ energy_bins=e_bins,
+ energy_type=energy_type,
+ )
+ result["N_EVENTS"][:, i, :] = ang_res["n_events"]
+ result["ANGULAR_RESOLUTION"][:, i, :] = ang_res["angular_resolution_68"]
+
+ header = Header()
+ header["E_TYPE"] = energy_type.upper()
+ return BinTableHDU(result, header=header, name=extname)
+
+
+class SensitivityMakerBase(DefaultRecoEnergyBins):
+ """Base class for calculating the point source sensitivity."""
+
+ alpha = Float(
+ default_value=0.2,
+ help="Size ratio of on region / off region.",
+ ).tag(config=True)
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ @abstractmethod
+ def make_sensitivity_hdu(
+ self,
+ signal_events: QTable,
+ background_events: QTable,
+ theta_cut: QTable,
+ gamma_spectrum: Spectra,
+ extname: str = "SENSITIVITY",
+ ) -> BinTableHDU:
+ """
+ Calculate the point source sensitivity
+ based on ``pyirf.sensitivity.calculate_sensitivity``.
+
+ Parameters
+ ----------
+ signal_events: astropy.table.QTable
+ Reconstructed signal events to be used.
+ background_events: astropy.table.QTable
+ Reconstructed background events to be used.
+ theta_cut: QTable
+ Direction cut that was applied on ``signal_events``.
+ gamma_spectrum: ctapipe.irf.Spectra
+ Spectra by which to scale the relative sensitivity to get the flux sensitivity.
+ extname: str
+ Name of the BinTableHDU.
+
+ Returns
+ -------
+ BinTableHDU
+ """
+
+
+class Sensitivity2dMaker(SensitivityMakerBase, DefaultFoVOffsetBins):
+ """
+ Calculates the point source sensitivity in bins of reconstructed energy
+ and fov offset.
+ """
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ def make_sensitivity_hdu(
+ self,
+ signal_events: QTable,
+ background_events: QTable,
+ theta_cut: QTable,
+ gamma_spectrum: Spectra,
+ extname: str = "SENSITIVITY",
+ ) -> BinTableHDU:
+ source_spectrum = SPECTRA[gamma_spectrum]
+ result, fov_bin_idx, mat_shape = _get_2d_result_table(
+ events=signal_events,
+ e_bins=self.reco_energy_bins,
+ fov_bins=self.fov_offset_bins,
+ )
+ result["N_SIGNAL"] = np.zeros(mat_shape)[np.newaxis, ...]
+ result["N_SIGNAL_WEIGHTED"] = np.zeros(mat_shape)[np.newaxis, ...]
+ result["N_BACKGROUND"] = np.zeros(mat_shape)[np.newaxis, ...]
+ result["N_BACKGROUND_WEIGHTED"] = np.zeros(mat_shape)[np.newaxis, ...]
+ result["SIGNIFICANCE"] = np.full(mat_shape, np.nan)[np.newaxis, ...]
+ result["RELATIVE_SENSITIVITY"] = np.full(mat_shape, np.nan)[np.newaxis, ...]
+ result["FLUX_SENSITIVITY"] = u.Quantity(
+ np.full(mat_shape, np.nan)[np.newaxis, ...], FLUX_UNIT
+ )
+ result["ENERGY_FLUX_SENSITIVITY"] = u.Quantity(
+ np.full(mat_shape, np.nan)[np.newaxis, ...], ENERGY_FLUX_UNIT
+ )
+ for i in range(len(self.fov_offset_bins) - 1):
+ signal_hist = create_histogram_table(
+ events=signal_events[fov_bin_idx == i], bins=self.reco_energy_bins
+ )
+ bkg_hist = estimate_background(
+ events=background_events,
+ reco_energy_bins=self.reco_energy_bins,
+ theta_cuts=theta_cut,
+ alpha=self.alpha,
+ fov_offset_min=self.fov_offset_bins[i],
+ fov_offset_max=self.fov_offset_bins[i + 1],
+ )
+ sens = calculate_sensitivity(
+ signal_hist=signal_hist, background_hist=bkg_hist, alpha=self.alpha
+ )
+ result["N_SIGNAL"][:, i, :] = sens["n_signal"]
+ result["N_SIGNAL_WEIGHTED"][:, i, :] = sens["n_signal_weighted"]
+ result["N_BACKGROUND"][:, i, :] = sens["n_background"]
+ result["N_BACKGROUND_WEIGHTED"][:, i, :] = sens["n_background_weighted"]
+ result["SIGNIFICANCE"][:, i, :] = sens["significance"]
+ result["RELATIVE_SENSITIVITY"][:, i, :] = sens["relative_sensitivity"]
+ result["FLUX_SENSITIVITY"][:, i, :] = (
+ sens["relative_sensitivity"]
+ * source_spectrum(sens["reco_energy_center"])
+ ).to(FLUX_UNIT)
+ result["ENERGY_FLUX_SENSITIVITY"][:, i, :] = (
+ sens["relative_sensitivity"]
+ * source_spectrum(sens["reco_energy_center"])
+ * sens["reco_energy_center"] ** 2
+ ).to(ENERGY_FLUX_UNIT)
+
+ header = Header()
+ header["ALPHA"] = self.alpha
+ return BinTableHDU(result, header=header, name=extname)
diff --git a/src/ctapipe/irf/binning.py b/src/ctapipe/irf/binning.py
new file mode 100644
index 00000000000..b0c28cfedb0
--- /dev/null
+++ b/src/ctapipe/irf/binning.py
@@ -0,0 +1,187 @@
+"""Collection of binning related functionality for the irf tools"""
+
+import logging
+from dataclasses import dataclass
+
+import astropy.units as u
+import numpy as np
+
+from ..compat import COPY_IF_NEEDED
+from ..core import Component
+from ..core.traits import AstroQuantity, Integer
+
+__all__ = [
+ "ResultValidRange",
+ "check_bins_in_range",
+ "make_bins_per_decade",
+ "DefaultTrueEnergyBins",
+ "DefaultRecoEnergyBins",
+ "DefaultFoVOffsetBins",
+]
+
+logger = logging.getLogger(__name__)
+
+
+def check_bins_in_range(bins, valid_range, source="result", raise_error=True):
+ """
+ Check whether ``bins`` are within a ``valid_range`` and either warn
+ or raise an error if not.
+
+ Parameters
+ ----------
+ bins: u.Quantity
+ The bins to be checked.
+ valid_range: ctapipe.irf.ResultValidRange
+ Range for which bins are valid.
+ E.g. the range in which G/H cuts are calculated.
+ source: str
+ Description of which bins are being checked to give useful
+ warnings/ error messages.
+ raise_error: bool
+ Whether to raise an error (True) or give a warning (False) if
+ ``bins`` exceed ``valid_range``.
+ """
+ low = bins >= valid_range.min
+ hig = bins <= valid_range.max
+
+ if not all(low & hig):
+ with np.printoptions(edgeitems=2, threshold=6, precision=4):
+ bins = np.array2string(bins)
+ min_val = np.array2string(valid_range.min)
+ max_val = np.array2string(valid_range.max)
+ if raise_error:
+ raise ValueError(
+ f"Valid range for {source} is {min_val} to {max_val}, got {bins}"
+ )
+ else:
+ logger.warning(
+ f"Valid range for {source} is {min_val} to {max_val}, got {bins}",
+ )
+
+
+@u.quantity_input(e_min=u.TeV, e_max=u.TeV)
+def make_bins_per_decade(e_min, e_max, n_bins_per_decade=5):
+ """
+ Create energy bins with at least ``n_bins_per_decade`` bins per decade.
+ The number of bins is calculated as
+ ``n_bins = ceil((log10(e_max) - log10(e_min)) * n_bins_per_decade)``.
+
+ Parameters
+ ----------
+ e_min: u.Quantity[energy]
+ Minimum energy, inclusive
+ e_max: u.Quantity[energy]
+ Maximum energy, inclusive
+ n_bins_per_decade: int
+ Minimum number of bins per decade
+
+ Returns
+ -------
+ bins: u.Quantity[energy]
+ The created bin array, will have units of ``e_min``
+ """
+ unit = e_min.unit
+ log_lower = np.log10(e_min.to_value(unit))
+ log_upper = np.log10(e_max.to_value(unit))
+
+ n_bins = int(np.ceil((log_upper - log_lower) * n_bins_per_decade))
+
+ return u.Quantity(
+ np.logspace(log_lower, log_upper, n_bins + 1), unit, copy=COPY_IF_NEEDED
+ )
+
+
+@dataclass
+class ResultValidRange:
+ min: u.Quantity
+ max: u.Quantity
+
+
+class DefaultTrueEnergyBins(Component):
+ """Base class for creating irfs or benchmarks binned in true energy."""
+
+ true_energy_min = AstroQuantity(
+ help="Minimum value for True Energy bins",
+ default_value=u.Quantity(0.015, u.TeV),
+ physical_type=u.physical.energy,
+ ).tag(config=True)
+
+ true_energy_max = AstroQuantity(
+ help="Maximum value for True Energy bins",
+ default_value=u.Quantity(150, u.TeV),
+ physical_type=u.physical.energy,
+ ).tag(config=True)
+
+ true_energy_n_bins_per_decade = Integer(
+ help="Number of bins per decade for True Energy bins",
+ default_value=10,
+ ).tag(config=True)
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+ self.true_energy_bins = make_bins_per_decade(
+ self.true_energy_min.to(u.TeV),
+ self.true_energy_max.to(u.TeV),
+ self.true_energy_n_bins_per_decade,
+ )
+
+
+class DefaultRecoEnergyBins(Component):
+ """Base class for creating irfs or benchmarks binned in reconstructed energy."""
+
+ reco_energy_min = AstroQuantity(
+ help="Minimum value for Reco Energy bins",
+ default_value=u.Quantity(0.015, u.TeV),
+ physical_type=u.physical.energy,
+ ).tag(config=True)
+
+ reco_energy_max = AstroQuantity(
+ help="Maximum value for Reco Energy bins",
+ default_value=u.Quantity(150, u.TeV),
+ physical_type=u.physical.energy,
+ ).tag(config=True)
+
+ reco_energy_n_bins_per_decade = Integer(
+ help="Number of bins per decade for Reco Energy bins",
+ default_value=5,
+ ).tag(config=True)
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+ self.reco_energy_bins = make_bins_per_decade(
+ self.reco_energy_min.to(u.TeV),
+ self.reco_energy_max.to(u.TeV),
+ self.reco_energy_n_bins_per_decade,
+ )
+
+
+class DefaultFoVOffsetBins(Component):
+ """Base class for creating radially symmetric irfs or benchmarks."""
+
+ fov_offset_min = AstroQuantity(
+ help="Minimum value for FoV Offset bins",
+ default_value=u.Quantity(0, u.deg),
+ physical_type=u.physical.angle,
+ ).tag(config=True)
+
+ fov_offset_max = AstroQuantity(
+ help="Maximum value for FoV offset bins",
+ default_value=u.Quantity(5, u.deg),
+ physical_type=u.physical.angle,
+ ).tag(config=True)
+
+ fov_offset_n_bins = Integer(
+ help="Number of FoV offset bins",
+ default_value=1,
+ ).tag(config=True)
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+ self.fov_offset_bins = u.Quantity(
+ np.linspace(
+ self.fov_offset_min.to_value(u.deg),
+ self.fov_offset_max.to_value(u.deg),
+ self.fov_offset_n_bins + 1,
+ ),
+ u.deg,
+ )
diff --git a/src/ctapipe/irf/irfs.py b/src/ctapipe/irf/irfs.py
new file mode 100644
index 00000000000..23def128aca
--- /dev/null
+++ b/src/ctapipe/irf/irfs.py
@@ -0,0 +1,336 @@
+"""Components to generate IRFs"""
+
+from abc import abstractmethod
+
+import astropy.units as u
+import numpy as np
+from astropy.io.fits import BinTableHDU
+from astropy.table import QTable
+from pyirf.io import (
+ create_aeff2d_hdu,
+ create_background_2d_hdu,
+ create_energy_dispersion_hdu,
+ create_psf_table_hdu,
+)
+from pyirf.irf import (
+ background_2d,
+ effective_area_per_energy,
+ effective_area_per_energy_and_fov,
+ energy_dispersion,
+ psf_table,
+)
+from pyirf.simulations import SimulatedEventsInfo
+
+from ..core.traits import AstroQuantity, Bool, Float, Integer
+from .binning import DefaultFoVOffsetBins, DefaultRecoEnergyBins, DefaultTrueEnergyBins
+
+__all__ = [
+ "BackgroundRateMakerBase",
+ "BackgroundRate2dMaker",
+ "EffectiveAreaMakerBase",
+ "EffectiveArea2dMaker",
+ "EnergyDispersionMakerBase",
+ "EnergyDispersion2dMaker",
+ "PsfMakerBase",
+ "Psf3dMaker",
+]
+
+
+class PsfMakerBase(DefaultTrueEnergyBins):
+ """Base class for calculating the point spread function."""
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ @abstractmethod
+ def make_psf_hdu(self, events: QTable, extname: str = "PSF") -> BinTableHDU:
+ """
+ Calculate the psf and create a fits binary table HDU in GADF format.
+
+ Parameters
+ ----------
+ events: astropy.table.QTable
+ Reconstructed events to be used.
+ extname: str
+ Name for the BinTableHDU.
+
+ Returns
+ -------
+ BinTableHDU
+ """
+
+
+class BackgroundRateMakerBase(DefaultRecoEnergyBins):
+ """Base class for calculating the background rate."""
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ @abstractmethod
+ def make_bkg_hdu(
+ self, events: QTable, obs_time: u.Quantity, extname: str = "BACKGROUND"
+ ) -> BinTableHDU:
+ """
+ Calculate the background rate and create a fits binary table HDU
+ in GADF format.
+
+ Parameters
+ ----------
+ events: astropy.table.QTable
+ Reconstructed events to be used.
+ obs_time: astropy.units.Quantity[time]
+ Observation time. This must match with how the individual event
+ weights are calculated.
+ extname: str
+ Name for the BinTableHDU.
+
+ Returns
+ -------
+ BinTableHDU
+ """
+
+
+class EnergyDispersionMakerBase(DefaultTrueEnergyBins):
+ """Base class for calculating the energy dispersion."""
+
+ energy_migration_min = Float(
+ help="Minimum value of energy migration ratio",
+ default_value=0.2,
+ ).tag(config=True)
+
+ energy_migration_max = Float(
+ help="Maximum value of energy migration ratio",
+ default_value=5,
+ ).tag(config=True)
+
+ energy_migration_n_bins = Integer(
+ help="Number of bins in log scale for energy migration ratio",
+ default_value=30,
+ ).tag(config=True)
+
+ energy_migration_linear_bins = Bool(
+ help="Bin energy migration ratio using linear bins",
+ default_value=False,
+ ).tag(config=True)
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+ bin_func = np.geomspace
+ if self.energy_migration_linear_bins:
+ bin_func = np.linspace
+ self.migration_bins = bin_func(
+ self.energy_migration_min,
+ self.energy_migration_max,
+ self.energy_migration_n_bins + 1,
+ )
+
+ @abstractmethod
+ def make_edisp_hdu(
+ self, events: QTable, point_like: bool, extname: str = "ENERGY MIGRATION"
+ ) -> BinTableHDU:
+ """
+ Calculate the energy dispersion and create a fits binary table HDU
+ in GADF format.
+
+ Parameters
+ ----------
+ events: astropy.table.QTable
+ Reconstructed events to be used.
+ point_like: bool
+ If a direction cut was applied on ``events``, pass ``True``, else ``False``
+ for a full-enclosure energy dispersion.
+ extname: str
+ Name for the BinTableHDU.
+
+ Returns
+ -------
+ BinTableHDU
+ """
+
+
+class EffectiveAreaMakerBase(DefaultTrueEnergyBins):
+ """Base class for calculating the effective area."""
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ @abstractmethod
+ def make_aeff_hdu(
+ self,
+ events: QTable,
+ point_like: bool,
+ signal_is_point_like: bool,
+ sim_info: SimulatedEventsInfo,
+ extname: str = "EFFECTIVE AREA",
+ ) -> BinTableHDU:
+ """
+ Calculate the effective area and create a fits binary table HDU
+ in GADF format.
+
+ Parameters
+ ----------
+ events: astropy.table.QTable
+ Reconstructed events to be used.
+ point_like: bool
+ If a direction cut was applied on ``events``, pass ``True``, else ``False``
+ for a full-enclosure effective area.
+ signal_is_point_like: bool
+ If ``events`` were simulated only at a single point in the field of view,
+ pass ``True``, else ``False``.
+ sim_info: pyirf.simulations.SimulatedEventsInfoa
+ The overall statistics of the simulated events.
+ extname: str
+ Name of the BinTableHDU.
+
+ Returns
+ -------
+ BinTableHDU
+ """
+
+
+class EffectiveArea2dMaker(EffectiveAreaMakerBase, DefaultFoVOffsetBins):
+ """
+ Creates a radially symmetric parameterization of the effective area in equidistant
+ bins of logarithmic true energy and field of view offset.
+ """
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ def make_aeff_hdu(
+ self,
+ events: QTable,
+ point_like: bool,
+ signal_is_point_like: bool,
+ sim_info: SimulatedEventsInfo,
+ extname: str = "EFFECTIVE AREA",
+ ) -> BinTableHDU:
+ # For point-like gammas the effective area can only be calculated
+ # at one point in the FoV.
+ if signal_is_point_like:
+ aeff = effective_area_per_energy(
+ selected_events=events,
+ simulation_info=sim_info,
+ true_energy_bins=self.true_energy_bins,
+ )
+ # +1 dimension for FOV offset
+ aeff = aeff[..., np.newaxis]
+ else:
+ aeff = effective_area_per_energy_and_fov(
+ selected_events=events,
+ simulation_info=sim_info,
+ true_energy_bins=self.true_energy_bins,
+ fov_offset_bins=self.fov_offset_bins,
+ )
+
+ return create_aeff2d_hdu(
+ effective_area=aeff,
+ true_energy_bins=self.true_energy_bins,
+ fov_offset_bins=self.fov_offset_bins,
+ point_like=point_like,
+ extname=extname,
+ )
+
+
+class EnergyDispersion2dMaker(EnergyDispersionMakerBase, DefaultFoVOffsetBins):
+ """
+ Creates a radially symmetric parameterization of the energy dispersion in
+ equidistant bins of logarithmic true energy and field of view offset.
+ """
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ def make_edisp_hdu(
+ self, events: QTable, point_like: bool, extname: str = "ENERGY DISPERSION"
+ ) -> BinTableHDU:
+ edisp = energy_dispersion(
+ selected_events=events,
+ true_energy_bins=self.true_energy_bins,
+ fov_offset_bins=self.fov_offset_bins,
+ migration_bins=self.migration_bins,
+ )
+ return create_energy_dispersion_hdu(
+ energy_dispersion=edisp,
+ true_energy_bins=self.true_energy_bins,
+ migration_bins=self.migration_bins,
+ fov_offset_bins=self.fov_offset_bins,
+ point_like=point_like,
+ extname=extname,
+ )
+
+
+class BackgroundRate2dMaker(BackgroundRateMakerBase, DefaultFoVOffsetBins):
+ """
+ Creates a radially symmetric parameterization of the background rate in equidistant
+ bins of logarithmic reconstructed energy and field of view offset.
+ """
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+
+ def make_bkg_hdu(
+ self, events: QTable, obs_time: u.Quantity, extname: str = "BACKGROUND"
+ ) -> BinTableHDU:
+ background_rate = background_2d(
+ events=events,
+ reco_energy_bins=self.reco_energy_bins,
+ fov_offset_bins=self.fov_offset_bins,
+ t_obs=obs_time,
+ )
+ return create_background_2d_hdu(
+ background_2d=background_rate,
+ reco_energy_bins=self.reco_energy_bins,
+ fov_offset_bins=self.fov_offset_bins,
+ extname=extname,
+ )
+
+
+class Psf3dMaker(PsfMakerBase, DefaultFoVOffsetBins):
+ """
+ Creates a radially symmetric point spread function calculated in equidistant bins
+ of source offset, logarithmic true energy, and field of view offset.
+ """
+
+ source_offset_min = AstroQuantity(
+ help="Minimum value for Source offset",
+ default_value=u.Quantity(0, u.deg),
+ physical_type=u.physical.angle,
+ ).tag(config=True)
+
+ source_offset_max = AstroQuantity(
+ help="Maximum value for Source offset",
+ default_value=u.Quantity(1, u.deg),
+ physical_type=u.physical.angle,
+ ).tag(config=True)
+
+ source_offset_n_bins = Integer(
+ help="Number of bins for Source offset",
+ default_value=100,
+ ).tag(config=True)
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+ self.source_offset_bins = u.Quantity(
+ np.linspace(
+ self.source_offset_min.to_value(u.deg),
+ self.source_offset_max.to_value(u.deg),
+ self.source_offset_n_bins + 1,
+ ),
+ u.deg,
+ )
+
+ def make_psf_hdu(self, events: QTable, extname: str = "PSF") -> BinTableHDU:
+ psf = psf_table(
+ events=events,
+ true_energy_bins=self.true_energy_bins,
+ fov_offset_bins=self.fov_offset_bins,
+ source_offset_bins=self.source_offset_bins,
+ )
+ return create_psf_table_hdu(
+ psf=psf,
+ true_energy_bins=self.true_energy_bins,
+ fov_offset_bins=self.fov_offset_bins,
+ source_offset_bins=self.source_offset_bins,
+ extname=extname,
+ )
diff --git a/src/ctapipe/irf/optimize.py b/src/ctapipe/irf/optimize.py
new file mode 100644
index 00000000000..218a2d159d4
--- /dev/null
+++ b/src/ctapipe/irf/optimize.py
@@ -0,0 +1,487 @@
+"""module containing optimization related functions and classes"""
+
+import operator
+from abc import abstractmethod
+from collections.abc import Sequence
+
+import astropy.units as u
+import numpy as np
+from astropy.io import fits
+from astropy.table import QTable, Table
+from pyirf.cut_optimization import optimize_gh_cut
+from pyirf.cuts import calculate_percentile_cut, evaluate_binned_cut
+
+from ..core import Component, QualityQuery
+from ..core.traits import AstroQuantity, Float, Integer, Path
+from .binning import ResultValidRange, make_bins_per_decade
+from .preprocessing import EventPreProcessor
+
+__all__ = [
+ "CutOptimizerBase",
+ "GhPercentileCutCalculator",
+ "OptimizationResult",
+ "PercentileCuts",
+ "PointSourceSensitivityOptimizer",
+ "ThetaPercentileCutCalculator",
+]
+
+
+class OptimizationResult:
+ """Result of an optimization of G/H and theta cuts or only G/H cuts."""
+
+ def __init__(
+ self,
+ valid_energy_min: u.Quantity,
+ valid_energy_max: u.Quantity,
+ valid_offset_min: u.Quantity,
+ valid_offset_max: u.Quantity,
+ gh_cuts: QTable,
+ clf_prefix: str,
+ theta_cuts: QTable | None = None,
+ precuts: QualityQuery | Sequence | None = None,
+ ) -> None:
+ if precuts:
+ if isinstance(precuts, QualityQuery):
+ if len(precuts.quality_criteria) == 0:
+ precuts.quality_criteria = [
+ (" ", " ")
+ ] # Ensures table serialises properly
+
+ self.precuts = precuts
+ elif isinstance(precuts, list):
+ self.precuts = QualityQuery(quality_criteria=precuts)
+ else:
+ self.precuts = QualityQuery(quality_criteria=list(precuts))
+ else:
+ self.precuts = QualityQuery(quality_criteria=[(" ", " ")])
+
+ self.valid_energy = ResultValidRange(min=valid_energy_min, max=valid_energy_max)
+ self.valid_offset = ResultValidRange(min=valid_offset_min, max=valid_offset_max)
+ self.gh_cuts = gh_cuts
+ self.clf_prefix = clf_prefix
+ self.theta_cuts = theta_cuts
+
+ def __repr__(self):
+ if self.theta_cuts is not None:
+ return (
+ f""
+ )
+ else:
+ return (
+ f""
+ )
+
+ def write(self, output_name: Path | str, overwrite: bool = False) -> None:
+ """Write an ``OptimizationResult`` to a file in FITS format."""
+
+ cut_expr_tab = Table(
+ rows=self.precuts.quality_criteria,
+ names=["name", "cut_expr"],
+ dtype=[np.str_, np.str_],
+ )
+ cut_expr_tab.meta["EXTNAME"] = "QUALITY_CUTS_EXPR"
+
+ self.gh_cuts.meta["EXTNAME"] = "GH_CUTS"
+ self.gh_cuts.meta["CLFNAME"] = self.clf_prefix
+
+ energy_lim_tab = QTable(
+ rows=[[self.valid_energy.min, self.valid_energy.max]],
+ names=["energy_min", "energy_max"],
+ )
+ energy_lim_tab.meta["EXTNAME"] = "VALID_ENERGY"
+
+ offset_lim_tab = QTable(
+ rows=[[self.valid_offset.min, self.valid_offset.max]],
+ names=["offset_min", "offset_max"],
+ )
+ offset_lim_tab.meta["EXTNAME"] = "VALID_OFFSET"
+
+ results = [cut_expr_tab, self.gh_cuts, energy_lim_tab, offset_lim_tab]
+
+ if self.theta_cuts is not None:
+ self.theta_cuts.meta["EXTNAME"] = "RAD_MAX"
+ results.append(self.theta_cuts)
+
+ # Overwrite if needed and allowed
+ results[0].write(output_name, format="fits", overwrite=overwrite)
+
+ for table in results[1:]:
+ table.write(output_name, format="fits", append=True)
+
+ @classmethod
+ def read(cls, file_name):
+ """Read an ``OptimizationResult`` from a file in FITS format."""
+
+ with fits.open(file_name) as hdul:
+ cut_expr_tab = Table.read(hdul[1])
+ cut_expr_lst = [(name, expr) for name, expr in cut_expr_tab.iterrows()]
+ if (" ", " ") in cut_expr_lst:
+ cut_expr_lst.remove((" ", " "))
+
+ precuts = QualityQuery(quality_criteria=cut_expr_lst)
+ gh_cuts = QTable.read(hdul[2])
+ valid_energy = QTable.read(hdul[3])
+ valid_offset = QTable.read(hdul[4])
+ theta_cuts = QTable.read(hdul[5]) if len(hdul) > 5 else None
+
+ return cls(
+ precuts=precuts,
+ valid_energy_min=valid_energy["energy_min"],
+ valid_energy_max=valid_energy["energy_max"],
+ valid_offset_min=valid_offset["offset_min"],
+ valid_offset_max=valid_offset["offset_max"],
+ gh_cuts=gh_cuts,
+ clf_prefix=gh_cuts.meta["CLFNAME"],
+ theta_cuts=theta_cuts,
+ )
+
+
+class CutOptimizerBase(Component):
+ """Base class for cut optimization algorithms."""
+
+ reco_energy_min = AstroQuantity(
+ help="Minimum value for Reco Energy bins",
+ default_value=u.Quantity(0.015, u.TeV),
+ physical_type=u.physical.energy,
+ ).tag(config=True)
+
+ reco_energy_max = AstroQuantity(
+ help="Maximum value for Reco Energy bins",
+ default_value=u.Quantity(150, u.TeV),
+ physical_type=u.physical.energy,
+ ).tag(config=True)
+
+ reco_energy_n_bins_per_decade = Integer(
+ help="Number of bins per decade for Reco Energy bins",
+ default_value=5,
+ ).tag(config=True)
+
+ min_bkg_fov_offset = AstroQuantity(
+ help=(
+ "Minimum distance from the fov center for background events "
+ "to be taken into account"
+ ),
+ default_value=u.Quantity(0, u.deg),
+ physical_type=u.physical.angle,
+ ).tag(config=True)
+
+ max_bkg_fov_offset = AstroQuantity(
+ help=(
+ "Maximum distance from the fov center for background events "
+ "to be taken into account"
+ ),
+ default_value=u.Quantity(5, u.deg),
+ physical_type=u.physical.angle,
+ ).tag(config=True)
+
+ @abstractmethod
+ def optimize_cuts(
+ self,
+ signal: QTable,
+ background: QTable,
+ precuts: EventPreProcessor,
+ clf_prefix: str,
+ ) -> OptimizationResult:
+ """
+ Optimize G/H (and optionally theta) cuts
+ and fill them in an ``OptimizationResult``.
+
+ Parameters
+ ----------
+ signal: astropy.table.QTable
+ Table containing signal events
+ background: astropy.table.QTable
+ Table containing background events
+ precuts: ctapipe.irf.EventPreProcessor
+ ``ctapipe.core.QualityQuery`` subclass containing preselection
+ criteria for events
+ clf_prefix: str
+ Prefix of the output from the G/H classifier for which the
+ cut will be optimized
+ """
+
+
+class GhPercentileCutCalculator(Component):
+ """Computes a percentile cut on gammaness."""
+
+ min_counts = Integer(
+ default_value=10,
+ help="Minimum number of events in a bin to attempt to find a cut value",
+ ).tag(config=True)
+
+ smoothing = Float(
+ default_value=None,
+ allow_none=True,
+ help="When given, the width (in units of bins) of gaussian smoothing applied",
+ ).tag(config=True)
+
+ target_percentile = Integer(
+ default_value=68,
+ help="Percent of events in each energy bin to keep after the G/H cut",
+ ).tag(config=True)
+
+ def calculate_gh_cut(self, gammaness, reco_energy, reco_energy_bins):
+ if self.smoothing and self.smoothing < 0:
+ self.smoothing = None
+
+ return calculate_percentile_cut(
+ gammaness,
+ reco_energy,
+ reco_energy_bins,
+ smoothing=self.smoothing,
+ percentile=100 - self.target_percentile,
+ fill_value=gammaness.max(),
+ min_events=self.min_counts,
+ )
+
+
+class ThetaPercentileCutCalculator(Component):
+ """Computes a percentile cut on theta."""
+
+ theta_min_angle = AstroQuantity(
+ default_value=u.Quantity(-1, u.deg),
+ physical_type=u.physical.angle,
+ help="Smallest angular cut value allowed (-1 means no cut)",
+ ).tag(config=True)
+
+ theta_max_angle = AstroQuantity(
+ default_value=u.Quantity(0.32, u.deg),
+ physical_type=u.physical.angle,
+ help="Largest angular cut value allowed",
+ ).tag(config=True)
+
+ min_counts = Integer(
+ default_value=10,
+ help="Minimum number of events in a bin to attempt to find a cut value",
+ ).tag(config=True)
+
+ theta_fill_value = AstroQuantity(
+ default_value=u.Quantity(0.32, u.deg),
+ physical_type=u.physical.angle,
+ help="Angular cut value used for bins with too few events",
+ ).tag(config=True)
+
+ smoothing = Float(
+ default_value=None,
+ allow_none=True,
+ help="When given, the width (in units of bins) of gaussian smoothing applied",
+ ).tag(config=True)
+
+ target_percentile = Integer(
+ default_value=68,
+ help="Percent of events in each energy bin to keep after the theta cut",
+ ).tag(config=True)
+
+ def calculate_theta_cut(self, theta, reco_energy, reco_energy_bins):
+ if self.theta_min_angle < 0 * u.deg:
+ theta_min_angle = None
+ else:
+ theta_min_angle = self.theta_min_angle
+
+ if self.theta_max_angle < 0 * u.deg:
+ theta_max_angle = None
+ else:
+ theta_max_angle = self.theta_max_angle
+
+ if self.smoothing and self.smoothing < 0:
+ self.smoothing = None
+
+ return calculate_percentile_cut(
+ theta,
+ reco_energy,
+ reco_energy_bins,
+ min_value=theta_min_angle,
+ max_value=theta_max_angle,
+ smoothing=self.smoothing,
+ percentile=self.target_percentile,
+ fill_value=self.theta_fill_value,
+ min_events=self.min_counts,
+ )
+
+
+class PercentileCuts(CutOptimizerBase):
+ """
+ Calculates G/H separation cut based on the percentile of signal events
+ to keep in each bin.
+ Optionally also calculates a percentile cut on theta based on the signal
+ events surviving this G/H cut.
+ """
+
+ classes = [GhPercentileCutCalculator, ThetaPercentileCutCalculator]
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+ self.gh = GhPercentileCutCalculator(parent=self)
+ self.theta = ThetaPercentileCutCalculator(parent=self)
+
+ def optimize_cuts(
+ self,
+ signal: QTable,
+ background: QTable,
+ precuts: EventPreProcessor,
+ clf_prefix: str,
+ ) -> OptimizationResult:
+ reco_energy_bins = make_bins_per_decade(
+ self.reco_energy_min.to(u.TeV),
+ self.reco_energy_max.to(u.TeV),
+ self.reco_energy_n_bins_per_decade,
+ )
+ gh_cuts = self.gh.calculate_gh_cut(
+ signal["gh_score"],
+ signal["reco_energy"],
+ reco_energy_bins,
+ )
+ gh_mask = evaluate_binned_cut(
+ signal["gh_score"],
+ signal["reco_energy"],
+ gh_cuts,
+ op=operator.ge,
+ )
+ theta_cuts = self.theta.calculate_theta_cut(
+ signal["theta"][gh_mask],
+ signal["reco_energy"][gh_mask],
+ reco_energy_bins,
+ )
+
+ result = OptimizationResult(
+ precuts=precuts,
+ gh_cuts=gh_cuts,
+ clf_prefix=clf_prefix,
+ valid_energy_min=self.reco_energy_min,
+ valid_energy_max=self.reco_energy_max,
+ # A single set of cuts is calculated for the whole fov atm
+ valid_offset_min=0 * u.deg,
+ valid_offset_max=np.inf * u.deg,
+ theta_cuts=theta_cuts,
+ )
+ return result
+
+
+class PointSourceSensitivityOptimizer(CutOptimizerBase):
+ """
+ Optimizes a G/H cut for maximum point source sensitivity and
+ calculates a percentile cut on theta.
+ """
+
+ classes = [ThetaPercentileCutCalculator]
+
+ initial_gh_cut_efficency = Float(
+ default_value=0.4, help="Start value of gamma efficiency before optimization"
+ ).tag(config=True)
+
+ max_gh_cut_efficiency = Float(
+ default_value=0.8, help="Maximum gamma efficiency requested"
+ ).tag(config=True)
+
+ gh_cut_efficiency_step = Float(
+ default_value=0.1,
+ help="Stepsize used for scanning after optimal gammaness cut",
+ ).tag(config=True)
+
+ alpha = Float(
+ default_value=0.2,
+ help="Size ratio of on region / off region.",
+ ).tag(config=True)
+
+ def __init__(self, parent=None, **kwargs):
+ super().__init__(parent=parent, **kwargs)
+ self.theta = ThetaPercentileCutCalculator(parent=self)
+
+ def optimize_cuts(
+ self,
+ signal: QTable,
+ background: QTable,
+ precuts: EventPreProcessor,
+ clf_prefix: str,
+ ) -> OptimizationResult:
+ reco_energy_bins = make_bins_per_decade(
+ self.reco_energy_min.to(u.TeV),
+ self.reco_energy_max.to(u.TeV),
+ self.reco_energy_n_bins_per_decade,
+ )
+
+ initial_gh_cuts = calculate_percentile_cut(
+ signal["gh_score"],
+ signal["reco_energy"],
+ bins=reco_energy_bins,
+ fill_value=0.0,
+ percentile=100 * (1 - self.initial_gh_cut_efficency),
+ min_events=10,
+ smoothing=1,
+ )
+ initial_gh_mask = evaluate_binned_cut(
+ signal["gh_score"],
+ signal["reco_energy"],
+ initial_gh_cuts,
+ op=operator.gt,
+ )
+
+ theta_cuts = self.theta.calculate_theta_cut(
+ signal["theta"][initial_gh_mask],
+ signal["reco_energy"][initial_gh_mask],
+ reco_energy_bins,
+ )
+ self.log.info("Optimizing G/H separation cut for best sensitivity")
+
+ gh_cut_efficiencies = np.arange(
+ self.gh_cut_efficiency_step,
+ self.max_gh_cut_efficiency + self.gh_cut_efficiency_step / 2,
+ self.gh_cut_efficiency_step,
+ )
+ opt_sens, gh_cuts = optimize_gh_cut(
+ signal,
+ background,
+ reco_energy_bins=reco_energy_bins,
+ gh_cut_efficiencies=gh_cut_efficiencies,
+ op=operator.ge,
+ theta_cuts=theta_cuts,
+ alpha=self.alpha,
+ fov_offset_max=self.max_bkg_fov_offset,
+ fov_offset_min=self.min_bkg_fov_offset,
+ )
+ valid_energy = self._get_valid_energy_range(opt_sens)
+
+ # Re-calculate theta cut with optimized g/h cut
+ signal["selected_gh"] = evaluate_binned_cut(
+ signal["gh_score"],
+ signal["reco_energy"],
+ gh_cuts,
+ operator.ge,
+ )
+ theta_cuts_opt = self.theta.calculate_theta_cut(
+ signal[signal["selected_gh"]]["theta"],
+ signal[signal["selected_gh"]]["reco_energy"],
+ reco_energy_bins,
+ )
+
+ result = OptimizationResult(
+ precuts=precuts,
+ gh_cuts=gh_cuts,
+ clf_prefix=clf_prefix,
+ valid_energy_min=valid_energy[0],
+ valid_energy_max=valid_energy[1],
+ # A single set of cuts is calculated for the whole fov atm
+ valid_offset_min=self.min_bkg_fov_offset,
+ valid_offset_max=self.max_bkg_fov_offset,
+ theta_cuts=theta_cuts_opt,
+ )
+ return result
+
+ def _get_valid_energy_range(self, opt_sens):
+ keep_mask = np.isfinite(opt_sens["significance"])
+
+ count = np.arange(start=0, stop=len(keep_mask), step=1)
+ if all(np.diff(count[keep_mask]) == 1):
+ return [
+ opt_sens["reco_energy_low"][keep_mask][0],
+ opt_sens["reco_energy_high"][keep_mask][-1],
+ ]
+ else:
+ raise ValueError("Optimal significance curve has internal NaN bins")
diff --git a/src/ctapipe/irf/preprocessing.py b/src/ctapipe/irf/preprocessing.py
new file mode 100644
index 00000000000..7c59e2cc27d
--- /dev/null
+++ b/src/ctapipe/irf/preprocessing.py
@@ -0,0 +1,321 @@
+"""Module containing classes related to event loading and preprocessing"""
+
+from pathlib import Path
+
+import astropy.units as u
+import numpy as np
+from astropy.coordinates import AltAz, SkyCoord
+from astropy.table import Column, QTable, Table, vstack
+from pyirf.simulations import SimulatedEventsInfo
+from pyirf.spectral import (
+ DIFFUSE_FLUX_UNIT,
+ POINT_SOURCE_FLUX_UNIT,
+ PowerLaw,
+ calculate_event_weights,
+)
+from pyirf.utils import calculate_source_fov_offset, calculate_theta
+from tables import NoSuchNodeError
+
+from ..compat import COPY_IF_NEEDED
+from ..containers import CoordinateFrameType
+from ..coordinates import NominalFrame
+from ..core import Component, QualityQuery
+from ..core.traits import List, Tuple, Unicode
+from ..io import TableLoader
+from .spectra import SPECTRA, Spectra
+
+__all__ = ["EventLoader", "EventPreProcessor"]
+
+
+class EventPreProcessor(QualityQuery):
+ """Defines preselection cuts and the necessary renaming of columns."""
+
+ energy_reconstructor = Unicode(
+ default_value="RandomForestRegressor",
+ help="Prefix of the reco `_energy` column",
+ ).tag(config=True)
+
+ geometry_reconstructor = Unicode(
+ default_value="HillasReconstructor",
+ help="Prefix of the `_alt` and `_az` reco geometry columns",
+ ).tag(config=True)
+
+ gammaness_classifier = Unicode(
+ default_value="RandomForestClassifier",
+ help="Prefix of the classifier `_prediction` column",
+ ).tag(config=True)
+
+ quality_criteria = List(
+ Tuple(Unicode(), Unicode()),
+ default_value=[
+ (
+ "multiplicity 4",
+ "np.count_nonzero(HillasReconstructor_telescopes,axis=1) >= 4",
+ ),
+ ("valid classifier", "RandomForestClassifier_is_valid"),
+ ("valid geom reco", "HillasReconstructor_is_valid"),
+ ("valid energy reco", "RandomForestRegressor_is_valid"),
+ ],
+ help=QualityQuery.quality_criteria.help,
+ ).tag(config=True)
+
+ def normalise_column_names(self, events: Table) -> QTable:
+ if events["subarray_pointing_lat"].std() > 1e-3:
+ raise NotImplementedError(
+ "No support for making irfs from varying pointings yet"
+ )
+ if any(events["subarray_pointing_frame"] != CoordinateFrameType.ALTAZ.value):
+ raise NotImplementedError(
+ "At the moment only pointing in altaz is supported."
+ )
+
+ keep_columns = [
+ "obs_id",
+ "event_id",
+ "true_energy",
+ "true_az",
+ "true_alt",
+ ]
+ rename_from = [
+ f"{self.energy_reconstructor}_energy",
+ f"{self.geometry_reconstructor}_az",
+ f"{self.geometry_reconstructor}_alt",
+ f"{self.gammaness_classifier}_prediction",
+ "subarray_pointing_lat",
+ "subarray_pointing_lon",
+ ]
+ rename_to = [
+ "reco_energy",
+ "reco_az",
+ "reco_alt",
+ "gh_score",
+ "pointing_alt",
+ "pointing_az",
+ ]
+ keep_columns.extend(rename_from)
+ for c in keep_columns:
+ if c not in events.colnames:
+ raise ValueError(
+ "Input files must conform to the ctapipe DL2 data model. "
+ f"Required column {c} is missing."
+ )
+
+ events = QTable(events[keep_columns], copy=COPY_IF_NEEDED)
+ events.rename_columns(rename_from, rename_to)
+ return events
+
+ def make_empty_table(self) -> QTable:
+ """
+ This function defines the columns later functions expect to be present
+ in the event table.
+ """
+ columns = [
+ Column(name="obs_id", dtype=np.uint64, description="Observation block ID"),
+ Column(name="event_id", dtype=np.uint64, description="Array event ID"),
+ Column(
+ name="true_energy",
+ unit=u.TeV,
+ description="Simulated energy",
+ ),
+ Column(
+ name="true_az",
+ unit=u.deg,
+ description="Simulated azimuth",
+ ),
+ Column(
+ name="true_alt",
+ unit=u.deg,
+ description="Simulated altitude",
+ ),
+ Column(
+ name="reco_energy",
+ unit=u.TeV,
+ description="Reconstructed energy",
+ ),
+ Column(
+ name="reco_az",
+ unit=u.deg,
+ description="Reconstructed azimuth",
+ ),
+ Column(
+ name="reco_alt",
+ unit=u.deg,
+ description="Reconstructed altitude",
+ ),
+ Column(
+ name="reco_fov_lat",
+ unit=u.deg,
+ description="Reconstructed field of view lat",
+ ),
+ Column(
+ name="reco_fov_lon",
+ unit=u.deg,
+ description="Reconstructed field of view lon",
+ ),
+ Column(name="pointing_az", unit=u.deg, description="Pointing azimuth"),
+ Column(name="pointing_alt", unit=u.deg, description="Pointing altitude"),
+ Column(
+ name="theta",
+ unit=u.deg,
+ description="Reconstructed angular offset from source position",
+ ),
+ Column(
+ name="true_source_fov_offset",
+ unit=u.deg,
+ description="Simulated angular offset from pointing direction",
+ ),
+ Column(
+ name="reco_source_fov_offset",
+ unit=u.deg,
+ description="Reconstructed angular offset from pointing direction",
+ ),
+ Column(
+ name="gh_score",
+ unit=u.dimensionless_unscaled,
+ description="prediction of the classifier, defined between [0,1],"
+ " where values close to 1 mean that the positive class"
+ " (e.g. gamma in gamma-ray analysis) is more likely",
+ ),
+ Column(
+ name="weight",
+ unit=u.dimensionless_unscaled,
+ description="Event weight",
+ ),
+ ]
+
+ return QTable(columns)
+
+
+class EventLoader(Component):
+ """
+ Contains functions to load events and simulation information from a file
+ and derive some additional columns needed for irf calculation.
+ """
+
+ classes = [EventPreProcessor]
+
+ def __init__(self, kind: str, file: Path, target_spectrum: Spectra, **kwargs):
+ super().__init__(**kwargs)
+
+ self.epp = EventPreProcessor(parent=self)
+ self.target_spectrum = SPECTRA[target_spectrum]
+ self.kind = kind
+ self.file = file
+
+ def load_preselected_events(
+ self, chunk_size: int, obs_time: u.Quantity
+ ) -> tuple[QTable, int, dict]:
+ opts = dict(dl2=True, simulated=True, observation_info=True)
+ with TableLoader(self.file, parent=self, **opts) as load:
+ header = self.epp.make_empty_table()
+ sim_info, spectrum = self.get_simulation_information(load, obs_time)
+ meta = {"sim_info": sim_info, "spectrum": spectrum}
+ bits = [header]
+ n_raw_events = 0
+ for _, _, events in load.read_subarray_events_chunked(chunk_size, **opts):
+ selected = events[self.epp.get_table_mask(events)]
+ selected = self.epp.normalise_column_names(selected)
+ selected = self.make_derived_columns(selected)
+ bits.append(selected)
+ n_raw_events += len(events)
+
+ bits.append(header) # Putting it last ensures the correct metadata is used
+ table = vstack(bits, join_type="exact", metadata_conflicts="silent")
+ return table, n_raw_events, meta
+
+ def get_simulation_information(
+ self, loader: TableLoader, obs_time: u.Quantity
+ ) -> tuple[SimulatedEventsInfo, PowerLaw]:
+ sim = loader.read_simulation_configuration()
+ try:
+ show = loader.read_shower_distribution()
+ except NoSuchNodeError:
+ # Fall back to using the run header
+ show = Table([sim["n_showers"]], names=["n_entries"], dtype=[np.int64])
+
+ for itm in ["spectral_index", "energy_range_min", "energy_range_max"]:
+ if len(np.unique(sim[itm])) > 1:
+ raise NotImplementedError(
+ f"Unsupported: '{itm}' differs across simulation runs"
+ )
+
+ sim_info = SimulatedEventsInfo(
+ n_showers=show["n_entries"].sum(),
+ energy_min=sim["energy_range_min"].quantity[0],
+ energy_max=sim["energy_range_max"].quantity[0],
+ max_impact=sim["max_scatter_range"].quantity[0],
+ spectral_index=sim["spectral_index"][0],
+ viewcone_max=sim["max_viewcone_radius"].quantity[0],
+ viewcone_min=sim["min_viewcone_radius"].quantity[0],
+ )
+
+ return sim_info, PowerLaw.from_simulation(sim_info, obstime=obs_time)
+
+ def make_derived_columns(self, events: QTable) -> QTable:
+ events["weight"] = (
+ 1.0 * u.dimensionless_unscaled
+ ) # defer calculation of proper weights to later
+ events["gh_score"].unit = u.dimensionless_unscaled
+ events["theta"] = calculate_theta(
+ events,
+ assumed_source_az=events["true_az"],
+ assumed_source_alt=events["true_alt"],
+ )
+ events["true_source_fov_offset"] = calculate_source_fov_offset(
+ events, prefix="true"
+ )
+ events["reco_source_fov_offset"] = calculate_source_fov_offset(
+ events, prefix="reco"
+ )
+
+ altaz = AltAz()
+ pointing = SkyCoord(
+ alt=events["pointing_alt"], az=events["pointing_az"], frame=altaz
+ )
+ reco = SkyCoord(
+ alt=events["reco_alt"],
+ az=events["reco_az"],
+ frame=altaz,
+ )
+ nominal = NominalFrame(origin=pointing)
+ reco_nominal = reco.transform_to(nominal)
+ events["reco_fov_lon"] = u.Quantity(-reco_nominal.fov_lon) # minus for GADF
+ events["reco_fov_lat"] = u.Quantity(reco_nominal.fov_lat)
+ return events
+
+ def make_event_weights(
+ self,
+ events: QTable,
+ spectrum: PowerLaw,
+ fov_offset_bins: u.Quantity | None = None,
+ ) -> QTable:
+ if (
+ self.kind == "gammas"
+ and self.target_spectrum.normalization.unit.is_equivalent(
+ POINT_SOURCE_FLUX_UNIT
+ )
+ and spectrum.normalization.unit.is_equivalent(DIFFUSE_FLUX_UNIT)
+ ):
+ if fov_offset_bins is None:
+ raise ValueError(
+ "gamma_target_spectrum is point-like, but no fov offset bins "
+ "for the integration of the simulated diffuse spectrum were given."
+ )
+
+ for low, high in zip(fov_offset_bins[:-1], fov_offset_bins[1:]):
+ fov_mask = events["true_source_fov_offset"] >= low
+ fov_mask &= events["true_source_fov_offset"] < high
+
+ events["weight"][fov_mask] = calculate_event_weights(
+ events[fov_mask]["true_energy"],
+ target_spectrum=self.target_spectrum,
+ simulated_spectrum=spectrum.integrate_cone(low, high),
+ )
+ else:
+ events["weight"] = calculate_event_weights(
+ events["true_energy"],
+ target_spectrum=self.target_spectrum,
+ simulated_spectrum=spectrum,
+ )
+
+ return events
diff --git a/src/ctapipe/irf/spectra.py b/src/ctapipe/irf/spectra.py
new file mode 100644
index 00000000000..75106112b97
--- /dev/null
+++ b/src/ctapipe/irf/spectra.py
@@ -0,0 +1,26 @@
+"""Definition of spectra to be used to calculate event weights for irf computation"""
+
+from enum import Enum
+
+import astropy.units as u
+from pyirf.spectral import CRAB_HEGRA, IRFDOC_ELECTRON_SPECTRUM, IRFDOC_PROTON_SPECTRUM
+
+__all__ = ["ENERGY_FLUX_UNIT", "FLUX_UNIT", "SPECTRA", "Spectra"]
+
+ENERGY_FLUX_UNIT = (1 * u.erg / u.s / u.cm**2).unit
+FLUX_UNIT = (1 / u.erg / u.s / u.cm**2).unit
+
+
+class Spectra(Enum):
+ """Spectra for calculating event weights"""
+
+ CRAB_HEGRA = 1
+ IRFDOC_ELECTRON_SPECTRUM = 2
+ IRFDOC_PROTON_SPECTRUM = 3
+
+
+SPECTRA = {
+ Spectra.CRAB_HEGRA: CRAB_HEGRA,
+ Spectra.IRFDOC_ELECTRON_SPECTRUM: IRFDOC_ELECTRON_SPECTRUM,
+ Spectra.IRFDOC_PROTON_SPECTRUM: IRFDOC_PROTON_SPECTRUM,
+}
diff --git a/src/ctapipe/irf/tests/test_benchmarks.py b/src/ctapipe/irf/tests/test_benchmarks.py
new file mode 100644
index 00000000000..19d731a9fc0
--- /dev/null
+++ b/src/ctapipe/irf/tests/test_benchmarks.py
@@ -0,0 +1,131 @@
+import astropy.units as u
+from astropy.table import QTable
+
+
+def test_make_2d_energy_bias_res(irf_events_table):
+ from ctapipe.irf import EnergyBiasResolution2dMaker
+ from ctapipe.irf.tests.test_irfs import _check_boundaries_in_hdu
+
+ bias_res_maker = EnergyBiasResolution2dMaker(
+ fov_offset_n_bins=3,
+ fov_offset_max=3 * u.deg,
+ true_energy_n_bins_per_decade=7,
+ true_energy_max=155 * u.TeV,
+ )
+
+ bias_res_hdu = bias_res_maker.make_bias_resolution_hdu(events=irf_events_table)
+ # min 7 bins per decade between 0.015 TeV and 155 TeV -> 7 * 4 + 1 = 29 bins
+ assert (
+ bias_res_hdu.data["N_EVENTS"].shape
+ == bias_res_hdu.data["BIAS"].shape
+ == bias_res_hdu.data["RESOLUTION"].shape
+ == (1, 3, 29)
+ )
+ _check_boundaries_in_hdu(
+ bias_res_hdu,
+ lo_vals=[0 * u.deg, 0.015 * u.TeV],
+ hi_vals=[3 * u.deg, 155 * u.TeV],
+ )
+
+
+def test_make_2d_ang_res(irf_events_table):
+ from ctapipe.irf import AngularResolution2dMaker
+ from ctapipe.irf.tests.test_irfs import _check_boundaries_in_hdu
+
+ ang_res_maker = AngularResolution2dMaker(
+ fov_offset_n_bins=3,
+ fov_offset_max=3 * u.deg,
+ true_energy_n_bins_per_decade=7,
+ true_energy_max=155 * u.TeV,
+ reco_energy_n_bins_per_decade=6,
+ reco_energy_min=0.03 * u.TeV,
+ )
+
+ ang_res_hdu = ang_res_maker.make_angular_resolution_hdu(events=irf_events_table)
+ assert (
+ ang_res_hdu.data["N_EVENTS"].shape
+ == ang_res_hdu.data["ANGULAR_RESOLUTION"].shape
+ == (1, 3, 23)
+ )
+ _check_boundaries_in_hdu(
+ ang_res_hdu,
+ lo_vals=[0 * u.deg, 0.03 * u.TeV],
+ hi_vals=[3 * u.deg, 150 * u.TeV],
+ )
+
+ ang_res_maker.use_true_energy = True
+ ang_res_hdu = ang_res_maker.make_angular_resolution_hdu(events=irf_events_table)
+ assert (
+ ang_res_hdu.data["N_EVENTS"].shape
+ == ang_res_hdu.data["ANGULAR_RESOLUTION"].shape
+ == (1, 3, 29)
+ )
+ _check_boundaries_in_hdu(
+ ang_res_hdu,
+ lo_vals=[0 * u.deg, 0.015 * u.TeV],
+ hi_vals=[3 * u.deg, 155 * u.TeV],
+ )
+
+
+def test_make_2d_sensitivity(
+ gamma_diffuse_full_reco_file, proton_full_reco_file, irf_event_loader_test_config
+):
+ from ctapipe.irf import EventLoader, Sensitivity2dMaker, Spectra
+ from ctapipe.irf.tests.test_irfs import _check_boundaries_in_hdu
+
+ gamma_loader = EventLoader(
+ config=irf_event_loader_test_config,
+ kind="gammas",
+ file=gamma_diffuse_full_reco_file,
+ target_spectrum=Spectra.CRAB_HEGRA,
+ )
+ gamma_events, _, _ = gamma_loader.load_preselected_events(
+ chunk_size=10000,
+ obs_time=u.Quantity(50, u.h),
+ )
+ proton_loader = EventLoader(
+ config=irf_event_loader_test_config,
+ kind="protons",
+ file=proton_full_reco_file,
+ target_spectrum=Spectra.IRFDOC_PROTON_SPECTRUM,
+ )
+ proton_events, _, _ = proton_loader.load_preselected_events(
+ chunk_size=10000,
+ obs_time=u.Quantity(50, u.h),
+ )
+
+ sens_maker = Sensitivity2dMaker(
+ fov_offset_n_bins=3,
+ fov_offset_max=3 * u.deg,
+ reco_energy_n_bins_per_decade=7,
+ reco_energy_max=155 * u.TeV,
+ )
+ # Create a dummy theta cut since `pyirf.sensitivity.estimate_background`
+ # needs a theta cut atm.
+ theta_cuts = QTable()
+ theta_cuts["center"] = 0.5 * (
+ sens_maker.reco_energy_bins[:-1] + sens_maker.reco_energy_bins[1:]
+ )
+ theta_cuts["cut"] = sens_maker.fov_offset_max
+
+ sens_hdu = sens_maker.make_sensitivity_hdu(
+ signal_events=gamma_events,
+ background_events=proton_events,
+ theta_cut=theta_cuts,
+ gamma_spectrum=Spectra.CRAB_HEGRA,
+ )
+ assert (
+ sens_hdu.data["N_SIGNAL"].shape
+ == sens_hdu.data["N_SIGNAL_WEIGHTED"].shape
+ == sens_hdu.data["N_BACKGROUND"].shape
+ == sens_hdu.data["N_BACKGROUND_WEIGHTED"].shape
+ == sens_hdu.data["SIGNIFICANCE"].shape
+ == sens_hdu.data["RELATIVE_SENSITIVITY"].shape
+ == sens_hdu.data["FLUX_SENSITIVITY"].shape
+ == (1, 3, 29)
+ )
+ _check_boundaries_in_hdu(
+ sens_hdu,
+ lo_vals=[0 * u.deg, 0.015 * u.TeV],
+ hi_vals=[3 * u.deg, 155 * u.TeV],
+ )
diff --git a/src/ctapipe/irf/tests/test_binning.py b/src/ctapipe/irf/tests/test_binning.py
new file mode 100644
index 00000000000..4925f8855ca
--- /dev/null
+++ b/src/ctapipe/irf/tests/test_binning.py
@@ -0,0 +1,107 @@
+import logging
+
+import astropy.units as u
+import numpy as np
+import pytest
+
+
+def test_check_bins_in_range(tmp_path):
+ from ctapipe.irf import ResultValidRange, check_bins_in_range
+
+ valid_range = ResultValidRange(min=0.03 * u.TeV, max=200 * u.TeV)
+ errormessage = "Valid range for result is 0.03 to 200., got"
+
+ # bins are in range
+ bins = u.Quantity(np.logspace(-1, 2, 10), u.TeV)
+ check_bins_in_range(bins, valid_range)
+
+ # bins are too small
+ bins = u.Quantity(np.logspace(-2, 2, 10), u.TeV)
+ with pytest.raises(ValueError, match=errormessage):
+ check_bins_in_range(bins, valid_range)
+
+ # bins are too big
+ bins = u.Quantity(np.logspace(-1, 3, 10), u.TeV)
+ with pytest.raises(ValueError, match=errormessage):
+ check_bins_in_range(bins, valid_range)
+
+ # bins are too big and too small
+ bins = u.Quantity(np.logspace(-2, 3, 10), u.TeV)
+ with pytest.raises(ValueError, match=errormessage):
+ check_bins_in_range(bins, valid_range)
+
+ logger = logging.getLogger("ctapipe.irf.binning")
+ logpath = tmp_path / "test_check_bins_in_range.log"
+ handler = logging.FileHandler(logpath)
+ logger.addHandler(handler)
+
+ check_bins_in_range(bins, valid_range, raise_error=False)
+ assert "Valid range for result is" in logpath.read_text()
+
+
+def test_make_bins_per_decade():
+ from ctapipe.irf import make_bins_per_decade
+
+ bins = make_bins_per_decade(100 * u.GeV, 100 * u.TeV)
+ assert bins.unit == u.GeV
+ assert len(bins) == 16
+ assert bins[0] == 100 * u.GeV
+ assert np.allclose(np.diff(np.log10(bins.to_value(u.GeV))), 0.2)
+
+ bins = make_bins_per_decade(100 * u.GeV, 100 * u.TeV, 10)
+ assert len(bins) == 31
+ assert np.allclose(np.diff(np.log10(bins.to_value(u.GeV))), 0.1)
+
+ # respect boundaries over n_bins_per_decade
+ bins = make_bins_per_decade(100 * u.GeV, 105 * u.TeV)
+ assert len(bins) == 17
+ assert np.isclose(bins[-1], 105 * u.TeV, rtol=1e-9)
+
+
+def test_true_energy_bins_base():
+ from ctapipe.irf.binning import DefaultTrueEnergyBins
+
+ binning = DefaultTrueEnergyBins(
+ true_energy_min=0.02 * u.TeV,
+ true_energy_max=200 * u.TeV,
+ true_energy_n_bins_per_decade=7,
+ )
+ assert len(binning.true_energy_bins) == 29
+ assert binning.true_energy_bins.unit == u.TeV
+ assert np.isclose(binning.true_energy_bins[0], binning.true_energy_min, rtol=1e-9)
+ assert np.isclose(binning.true_energy_bins[-1], binning.true_energy_max, rtol=1e-9)
+ assert np.allclose(
+ np.diff(np.log10(binning.true_energy_bins.to_value(u.TeV))), 1 / 7
+ )
+
+
+def test_reco_energy_bins_base():
+ from ctapipe.irf.binning import DefaultRecoEnergyBins
+
+ binning = DefaultRecoEnergyBins(
+ reco_energy_min=0.02 * u.TeV,
+ reco_energy_max=200 * u.TeV,
+ reco_energy_n_bins_per_decade=4,
+ )
+ assert len(binning.reco_energy_bins) == 17
+ assert binning.reco_energy_bins.unit == u.TeV
+ assert np.isclose(binning.reco_energy_bins[0], binning.reco_energy_min, rtol=1e-9)
+ assert np.isclose(binning.reco_energy_bins[-1], binning.reco_energy_max, rtol=1e-9)
+ assert np.allclose(
+ np.diff(np.log10(binning.reco_energy_bins.to_value(u.TeV))), 0.25
+ )
+
+
+def test_fov_offset_bins_base():
+ from ctapipe.irf.binning import DefaultFoVOffsetBins
+
+ binning = DefaultFoVOffsetBins(
+ # use default for fov_offset_min
+ fov_offset_max=3 * u.deg,
+ fov_offset_n_bins=3,
+ )
+ assert len(binning.fov_offset_bins) == 4
+ assert binning.fov_offset_bins.unit == u.deg
+ assert np.isclose(binning.fov_offset_bins[0], binning.fov_offset_min, rtol=1e-9)
+ assert np.isclose(binning.fov_offset_bins[-1], binning.fov_offset_max, rtol=1e-9)
+ assert np.allclose(np.diff(binning.fov_offset_bins.to_value(u.deg)), 1)
diff --git a/src/ctapipe/irf/tests/test_irfs.py b/src/ctapipe/irf/tests/test_irfs.py
new file mode 100644
index 00000000000..8da1f2dac2e
--- /dev/null
+++ b/src/ctapipe/irf/tests/test_irfs.py
@@ -0,0 +1,128 @@
+import astropy.units as u
+from astropy.io.fits import BinTableHDU
+from pyirf.simulations import SimulatedEventsInfo
+
+
+def _check_boundaries_in_hdu(
+ hdu: BinTableHDU,
+ lo_vals: list,
+ hi_vals: list,
+ colnames: list[str] = ["THETA", "ENERG"],
+):
+ for col, val in zip(colnames, lo_vals):
+ assert u.isclose(
+ u.Quantity(hdu.data[f"{col}_LO"][0][0], hdu.columns[f"{col}_LO"].unit), val
+ )
+ for col, val in zip(colnames, hi_vals):
+ assert u.isclose(
+ u.Quantity(hdu.data[f"{col}_HI"][0][-1], hdu.columns[f"{col}_HI"].unit), val
+ )
+
+
+def test_make_2d_bkg(irf_events_table):
+ from ctapipe.irf import BackgroundRate2dMaker
+
+ bkg_maker = BackgroundRate2dMaker(
+ fov_offset_n_bins=3,
+ fov_offset_max=3 * u.deg,
+ reco_energy_n_bins_per_decade=7,
+ reco_energy_max=155 * u.TeV,
+ )
+
+ bkg_hdu = bkg_maker.make_bkg_hdu(events=irf_events_table, obs_time=1 * u.s)
+ # min 7 bins per decade between 0.015 TeV and 155 TeV -> 7 * 4 + 1 = 29 bins
+ assert bkg_hdu.data["BKG"].shape == (1, 3, 29)
+
+ _check_boundaries_in_hdu(
+ bkg_hdu, lo_vals=[0 * u.deg, 0.015 * u.TeV], hi_vals=[3 * u.deg, 155 * u.TeV]
+ )
+
+
+def test_make_2d_energy_migration(irf_events_table):
+ from ctapipe.irf import EnergyDispersion2dMaker
+
+ edisp_maker = EnergyDispersion2dMaker(
+ fov_offset_n_bins=3,
+ fov_offset_max=3 * u.deg,
+ true_energy_n_bins_per_decade=7,
+ true_energy_max=155 * u.TeV,
+ energy_migration_n_bins=20,
+ energy_migration_min=0.1,
+ energy_migration_max=10,
+ )
+ edisp_hdu = edisp_maker.make_edisp_hdu(events=irf_events_table, point_like=False)
+ # min 7 bins per decade between 0.015 TeV and 155 TeV -> 7 * 4 + 1 = 29 bins
+ assert edisp_hdu.data["MATRIX"].shape == (1, 3, 20, 29)
+
+ _check_boundaries_in_hdu(
+ edisp_hdu,
+ lo_vals=[0 * u.deg, 0.015 * u.TeV, 0.1],
+ hi_vals=[3 * u.deg, 155 * u.TeV, 10],
+ colnames=["THETA", "ENERG", "MIGRA"],
+ )
+
+
+def test_make_2d_eff_area(irf_events_table):
+ from ctapipe.irf import EffectiveArea2dMaker
+
+ eff_area_maker = EffectiveArea2dMaker(
+ fov_offset_n_bins=3,
+ fov_offset_max=3 * u.deg,
+ true_energy_n_bins_per_decade=7,
+ true_energy_max=155 * u.TeV,
+ )
+ sim_info = SimulatedEventsInfo(
+ n_showers=3000,
+ energy_min=0.01 * u.TeV,
+ energy_max=10 * u.TeV,
+ max_impact=1000 * u.m,
+ spectral_index=-1.9,
+ viewcone_min=0 * u.deg,
+ viewcone_max=10 * u.deg,
+ )
+ eff_area_hdu = eff_area_maker.make_aeff_hdu(
+ events=irf_events_table,
+ point_like=False,
+ signal_is_point_like=False,
+ sim_info=sim_info,
+ )
+ # min 7 bins per decade between 0.015 TeV and 155 TeV -> 7 * 4 + 1 = 29 bins
+ assert eff_area_hdu.data["EFFAREA"].shape == (1, 3, 29)
+
+ _check_boundaries_in_hdu(
+ eff_area_hdu,
+ lo_vals=[0 * u.deg, 0.015 * u.TeV],
+ hi_vals=[3 * u.deg, 155 * u.TeV],
+ )
+
+ # point like data -> only 1 fov offset bin
+ eff_area_hdu = eff_area_maker.make_aeff_hdu(
+ events=irf_events_table,
+ point_like=False,
+ signal_is_point_like=True,
+ sim_info=sim_info,
+ )
+ assert eff_area_hdu.data["EFFAREA"].shape == (1, 1, 29)
+
+
+def test_make_3d_psf(irf_events_table):
+ from ctapipe.irf import Psf3dMaker
+
+ psf_maker = Psf3dMaker(
+ fov_offset_n_bins=3,
+ fov_offset_max=3 * u.deg,
+ true_energy_n_bins_per_decade=7,
+ true_energy_max=155 * u.TeV,
+ source_offset_n_bins=110,
+ source_offset_max=2 * u.deg,
+ )
+ psf_hdu = psf_maker.make_psf_hdu(events=irf_events_table)
+ # min 7 bins per decade between 0.015 TeV and 155 TeV -> 7 * 4 + 1 = 29 bins
+ assert psf_hdu.data["RPSF"].shape == (1, 110, 3, 29)
+
+ _check_boundaries_in_hdu(
+ psf_hdu,
+ lo_vals=[0 * u.deg, 0.015 * u.TeV, 0 * u.deg],
+ hi_vals=[3 * u.deg, 155 * u.TeV, 2 * u.deg],
+ colnames=["THETA", "ENERG", "RAD"],
+ )
diff --git a/src/ctapipe/irf/tests/test_optimize.py b/src/ctapipe/irf/tests/test_optimize.py
new file mode 100644
index 00000000000..5299d57eb4c
--- /dev/null
+++ b/src/ctapipe/irf/tests/test_optimize.py
@@ -0,0 +1,124 @@
+import astropy.units as u
+import numpy as np
+import pytest
+from astropy.table import QTable
+
+from ctapipe.core import QualityQuery, non_abstract_children
+from ctapipe.irf.optimize import CutOptimizerBase
+
+
+def test_optimization_result(tmp_path, irf_event_loader_test_config):
+ from ctapipe.irf import (
+ EventPreProcessor,
+ OptimizationResult,
+ ResultValidRange,
+ )
+
+ result_path = tmp_path / "result.h5"
+ epp = EventPreProcessor(irf_event_loader_test_config)
+ gh_cuts = QTable(
+ data=[[0.2, 0.8, 1.5] * u.TeV, [0.8, 1.5, 10] * u.TeV, [0.82, 0.91, 0.88]],
+ names=["low", "high", "cut"],
+ )
+ result = OptimizationResult(
+ precuts=epp,
+ gh_cuts=gh_cuts,
+ clf_prefix="ExtraTreesClassifier",
+ valid_energy_min=0.2 * u.TeV,
+ valid_energy_max=10 * u.TeV,
+ valid_offset_min=0 * u.deg,
+ valid_offset_max=np.inf * u.deg,
+ theta_cuts=None,
+ )
+ result.write(result_path)
+ assert result_path.exists()
+
+ loaded = OptimizationResult.read(result_path)
+ assert isinstance(loaded, OptimizationResult)
+ assert isinstance(loaded.precuts, QualityQuery)
+ assert isinstance(loaded.valid_energy, ResultValidRange)
+ assert isinstance(loaded.valid_offset, ResultValidRange)
+ assert isinstance(loaded.gh_cuts, QTable)
+ assert loaded.clf_prefix == "ExtraTreesClassifier"
+
+
+def test_gh_percentile_cut_calculator():
+ from ctapipe.irf import GhPercentileCutCalculator
+
+ calc = GhPercentileCutCalculator(
+ target_percentile=75,
+ min_counts=1,
+ smoothing=-1,
+ )
+ cuts = calc.calculate_gh_cut(
+ gammaness=np.array([0.1, 0.6, 0.45, 0.98, 0.32, 0.95, 0.25, 0.87]),
+ reco_energy=[0.17, 0.36, 0.47, 0.22, 1.2, 5, 4.2, 9.1] * u.TeV,
+ reco_energy_bins=[0, 1, 10] * u.TeV,
+ )
+ assert len(cuts) == 2
+ assert np.isclose(cuts["cut"][0], 0.3625)
+ assert np.isclose(cuts["cut"][1], 0.3025)
+ assert calc.smoothing is None
+
+
+def test_theta_percentile_cut_calculator():
+ from ctapipe.irf import ThetaPercentileCutCalculator
+
+ calc = ThetaPercentileCutCalculator(
+ target_percentile=75,
+ min_counts=1,
+ smoothing=-1,
+ )
+ cuts = calc.calculate_theta_cut(
+ theta=[0.1, 0.07, 0.21, 0.4, 0.03, 0.08, 0.11, 0.18] * u.deg,
+ reco_energy=[0.17, 0.36, 0.47, 0.22, 1.2, 5, 4.2, 9.1] * u.TeV,
+ reco_energy_bins=[0, 1, 10] * u.TeV,
+ )
+ assert len(cuts) == 2
+ assert np.isclose(cuts["cut"][0], 0.2575 * u.deg)
+ assert np.isclose(cuts["cut"][1], 0.1275 * u.deg)
+ assert calc.smoothing is None
+
+
+@pytest.mark.parametrize("Optimizer", non_abstract_children(CutOptimizerBase))
+def test_cut_optimizer(
+ Optimizer,
+ gamma_diffuse_full_reco_file,
+ proton_full_reco_file,
+ irf_event_loader_test_config,
+):
+ from ctapipe.irf import EventLoader, OptimizationResult, Spectra
+
+ gamma_loader = EventLoader(
+ config=irf_event_loader_test_config,
+ kind="gammas",
+ file=gamma_diffuse_full_reco_file,
+ target_spectrum=Spectra.CRAB_HEGRA,
+ )
+ gamma_events, _, _ = gamma_loader.load_preselected_events(
+ chunk_size=10000,
+ obs_time=u.Quantity(50, u.h),
+ )
+ proton_loader = EventLoader(
+ config=irf_event_loader_test_config,
+ kind="protons",
+ file=proton_full_reco_file,
+ target_spectrum=Spectra.IRFDOC_PROTON_SPECTRUM,
+ )
+ proton_events, _, _ = proton_loader.load_preselected_events(
+ chunk_size=10000,
+ obs_time=u.Quantity(50, u.h),
+ )
+
+ optimizer = Optimizer()
+ result = optimizer.optimize_cuts(
+ signal=gamma_events,
+ background=proton_events,
+ precuts=gamma_loader.epp, # identical precuts for all particle types
+ clf_prefix="ExtraTreesClassifier",
+ )
+ assert isinstance(result, OptimizationResult)
+ assert result.clf_prefix == "ExtraTreesClassifier"
+ assert result.valid_energy.min >= result.gh_cuts["low"][0]
+ assert result.valid_energy.max <= result.gh_cuts["high"][-1]
+ assert result.theta_cuts["cut"].unit == u.deg
diff --git a/src/ctapipe/irf/tests/test_preprocessing.py b/src/ctapipe/irf/tests/test_preprocessing.py
new file mode 100644
index 00000000000..53e964a86d5
--- /dev/null
+++ b/src/ctapipe/irf/tests/test_preprocessing.py
@@ -0,0 +1,105 @@
+import astropy.units as u
+import numpy as np
+import pytest
+from astropy.table import Table
+from pyirf.simulations import SimulatedEventsInfo
+from pyirf.spectral import PowerLaw
+
+
+@pytest.fixture(scope="module")
+def dummy_table():
+ """Dummy table to test column renaming."""
+ return Table(
+ {
+ "obs_id": [1, 1, 1, 2, 3, 3],
+ "event_id": [1, 2, 3, 1, 1, 2],
+ "true_energy": [0.99, 10, 0.37, 2.1, 73.4, 1] * u.TeV,
+ "dummy_energy": [1, 10, 0.4, 2.5, 73, 1] * u.TeV,
+ "classifier_prediction": [1, 0.3, 0.87, 0.93, 0, 0.1],
+ "true_alt": [60, 60, 60, 60, 60, 60] * u.deg,
+ "geom_alt": [58.5, 61.2, 59, 71.6, 60, 62] * u.deg,
+ "true_az": [13, 13, 13, 13, 13, 13] * u.deg,
+ "geom_az": [12.5, 13, 11.8, 15.1, 14.7, 12.8] * u.deg,
+ "subarray_pointing_frame": np.zeros(6),
+ "subarray_pointing_lat": np.full(6, 20) * u.deg,
+ "subarray_pointing_lon": np.full(6, 0) * u.deg,
+ }
+ )
+
+
+def test_normalise_column_names(dummy_table):
+ from ctapipe.irf import EventPreProcessor
+
+ epp = EventPreProcessor(
+ energy_reconstructor="dummy",
+ geometry_reconstructor="geom",
+ gammaness_classifier="classifier",
+ )
+ norm_table = epp.normalise_column_names(dummy_table)
+
+ needed_cols = [
+ "obs_id",
+ "event_id",
+ "true_energy",
+ "true_alt",
+ "true_az",
+ "reco_energy",
+ "reco_alt",
+ "reco_az",
+ "gh_score",
+ "pointing_alt",
+ "pointing_az",
+ ]
+ for c in needed_cols:
+ assert c in norm_table.colnames
+
+ with pytest.raises(ValueError, match="Required column geom_alt is missing."):
+ dummy_table.rename_column("geom_alt", "alt_geom")
+ epp = EventPreProcessor(
+ energy_reconstructor="dummy",
+ geometry_reconstructor="geom",
+ gammaness_classifier="classifier",
+ )
+ _ = epp.normalise_column_names(dummy_table)
+
+
+def test_event_loader(gamma_diffuse_full_reco_file, irf_event_loader_test_config):
+ from ctapipe.irf import EventLoader, Spectra
+
+ loader = EventLoader(
+ config=irf_event_loader_test_config,
+ kind="gammas",
+ file=gamma_diffuse_full_reco_file,
+ target_spectrum=Spectra.CRAB_HEGRA,
+ )
+ events, count, meta = loader.load_preselected_events(
+ chunk_size=10000,
+ obs_time=u.Quantity(50, u.h),
+ )
+
+ columns = [
+ "obs_id",
+ "event_id",
+ "true_energy",
+ "true_az",
+ "true_alt",
+ "reco_energy",
+ "reco_az",
+ "reco_alt",
+ "reco_fov_lat",
+ "reco_fov_lon",
+ "gh_score",
+ "pointing_az",
+ "pointing_alt",
+ "theta",
+ "true_source_fov_offset",
+ "reco_source_fov_offset",
+ ]
+ assert columns.sort() == events.colnames.sort()
+
+ assert isinstance(count, int)
+ assert isinstance(meta["sim_info"], SimulatedEventsInfo)
+ assert isinstance(meta["spectrum"], PowerLaw)
+
+ events = loader.make_event_weights(events, meta["spectrum"], (0 * u.deg, 1 * u.deg))
+ assert "weight" in events.colnames
diff --git a/src/ctapipe/tools/compute_irf.py b/src/ctapipe/tools/compute_irf.py
new file mode 100644
index 00000000000..41ec291e642
--- /dev/null
+++ b/src/ctapipe/tools/compute_irf.py
@@ -0,0 +1,614 @@
+"""Tool to generate IRFs"""
+
+from importlib.util import find_spec
+
+if find_spec("pyirf") is None:
+ from ..exceptions import OptionalDependencyMissing
+
+ raise OptionalDependencyMissing("pyirf") from None
+
+import operator
+from functools import partial
+
+import astropy.units as u
+import numpy as np
+from astropy.io import fits
+from astropy.table import vstack
+from pyirf.cuts import evaluate_binned_cut
+from pyirf.io import create_rad_max_hdu
+
+from ..core import Provenance, Tool, ToolConfigurationError, traits
+from ..core.traits import AstroQuantity, Bool, Integer, classes_with_traits, flag
+from ..irf import (
+ EventLoader,
+ EventPreProcessor,
+ OptimizationResult,
+ Spectra,
+ check_bins_in_range,
+)
+from ..irf.benchmarks import (
+ AngularResolutionMakerBase,
+ EnergyBiasResolutionMakerBase,
+ SensitivityMakerBase,
+)
+from ..irf.irfs import (
+ BackgroundRateMakerBase,
+ EffectiveAreaMakerBase,
+ EnergyDispersionMakerBase,
+ PsfMakerBase,
+)
+
+__all__ = ["IrfTool"]
+
+
+class IrfTool(Tool):
+ "Tool to create IRF files in GADF format"
+
+ name = "ctapipe-compute-irf"
+ description = __doc__
+ examples = """
+ ctapipe-compute-irf \\
+ --cuts cuts.fits \\
+ --gamma-file gamma.dl2.h5 \\
+ --proton-file proton.dl2.h5 \\
+ --electron-file electron.dl2.h5 \\
+ --output irf.fits.gz \\
+ --benchmark-output benchmarks.fits.gz
+ """
+
+ do_background = Bool(
+ True,
+ help="Compute background rate using supplied files.",
+ ).tag(config=True)
+
+ range_check_error = Bool(
+ False,
+ help="Raise error if asking for IRFs outside range where cut optimisation is valid.",
+ ).tag(config=True)
+
+ cuts_file = traits.Path(
+ default_value=None,
+ directory_ok=False,
+ help="Path to optimized cuts input file.",
+ ).tag(config=True)
+
+ gamma_file = traits.Path(
+ default_value=None, directory_ok=False, help="Gamma input filename and path."
+ ).tag(config=True)
+
+ gamma_target_spectrum = traits.UseEnum(
+ Spectra,
+ default_value=Spectra.CRAB_HEGRA,
+ help="Name of the spectrum used for weights of gamma events.",
+ ).tag(config=True)
+
+ proton_file = traits.Path(
+ default_value=None,
+ allow_none=True,
+ directory_ok=False,
+ help="Proton input filename and path.",
+ ).tag(config=True)
+
+ proton_target_spectrum = traits.UseEnum(
+ Spectra,
+ default_value=Spectra.IRFDOC_PROTON_SPECTRUM,
+ help="Name of the spectrum used for weights of proton events.",
+ ).tag(config=True)
+
+ electron_file = traits.Path(
+ default_value=None,
+ allow_none=True,
+ directory_ok=False,
+ help="Electron input filename and path.",
+ ).tag(config=True)
+
+ electron_target_spectrum = traits.UseEnum(
+ Spectra,
+ default_value=Spectra.IRFDOC_ELECTRON_SPECTRUM,
+ help="Name of the spectrum used for weights of electron events.",
+ ).tag(config=True)
+
+ chunk_size = Integer(
+ default_value=100000,
+ allow_none=True,
+ help="How many subarray events to load at once while selecting.",
+ ).tag(config=True)
+
+ output_path = traits.Path(
+ default_value=None,
+ allow_none=False,
+ directory_ok=False,
+ help="Output file",
+ ).tag(config=True)
+
+ benchmarks_output_path = traits.Path(
+ default_value=None,
+ allow_none=True,
+ directory_ok=False,
+ help="Optional second output file for benchmarks.",
+ ).tag(config=True)
+
+ obs_time = AstroQuantity(
+ default_value=u.Quantity(50, u.hour),
+ physical_type=u.physical.time,
+ help=(
+ "Observation time in the form `` ``."
+ " This is used for flux normalization and estimating a background rate."
+ ),
+ ).tag(config=True)
+
+ edisp_maker = traits.ComponentName(
+ EnergyDispersionMakerBase,
+ default_value="EnergyDispersion2dMaker",
+ help="The parameterization of the energy dispersion to be used.",
+ ).tag(config=True)
+
+ aeff_maker = traits.ComponentName(
+ EffectiveAreaMakerBase,
+ default_value="EffectiveArea2dMaker",
+ help="The parameterization of the effective area to be used.",
+ ).tag(config=True)
+
+ psf_maker = traits.ComponentName(
+ PsfMakerBase,
+ default_value="Psf3dMaker",
+ help="The parameterization of the point spread function to be used.",
+ ).tag(config=True)
+
+ bkg_maker = traits.ComponentName(
+ BackgroundRateMakerBase,
+ default_value="BackgroundRate2dMaker",
+ help="The parameterization of the background rate to be used.",
+ ).tag(config=True)
+
+ energy_bias_resolution_maker = traits.ComponentName(
+ EnergyBiasResolutionMakerBase,
+ default_value="EnergyBiasResolution2dMaker",
+ help=(
+ "The parameterization of the bias and resolution benchmark "
+ "for the energy prediction."
+ ),
+ ).tag(config=True)
+
+ angular_resolution_maker = traits.ComponentName(
+ AngularResolutionMakerBase,
+ default_value="AngularResolution2dMaker",
+ help="The parameterization of the angular resolution benchmark.",
+ ).tag(config=True)
+
+ sensitivity_maker = traits.ComponentName(
+ SensitivityMakerBase,
+ default_value="Sensitivity2dMaker",
+ help="The parameterization of the point source sensitivity benchmark.",
+ ).tag(config=True)
+
+ point_like = Bool(
+ False,
+ help=(
+ "Compute a point-like IRF by applying a theta cut (``RAD_MAX``) "
+ "which makes calculating a point spread function unnecessary."
+ ),
+ ).tag(config=True)
+
+ aliases = {
+ "cuts": "IrfTool.cuts_file",
+ "gamma-file": "IrfTool.gamma_file",
+ "proton-file": "IrfTool.proton_file",
+ "electron-file": "IrfTool.electron_file",
+ "output": "IrfTool.output_path",
+ "benchmark-output": "IrfTool.benchmarks_output_path",
+ "chunk_size": "IrfTool.chunk_size",
+ }
+
+ flags = {
+ **flag(
+ "do-background",
+ "IrfTool.do_background",
+ "Compute background rate.",
+ "Do not compute background rate.",
+ ),
+ **flag(
+ "point-like",
+ "IrfTool.point_like",
+ "Compute a point-like IRF.",
+ "Compute a full-enclosure IRF.",
+ ),
+ }
+
+ classes = (
+ [
+ EventLoader,
+ ]
+ + classes_with_traits(BackgroundRateMakerBase)
+ + classes_with_traits(EffectiveAreaMakerBase)
+ + classes_with_traits(EnergyDispersionMakerBase)
+ + classes_with_traits(PsfMakerBase)
+ + classes_with_traits(AngularResolutionMakerBase)
+ + classes_with_traits(EnergyBiasResolutionMakerBase)
+ + classes_with_traits(SensitivityMakerBase)
+ )
+
+ def setup(self):
+ """
+ Initialize components from config and load g/h (and theta) cuts.
+ """
+ self.opt_result = OptimizationResult.read(self.cuts_file)
+ if self.point_like and self.opt_result.theta_cuts is None:
+ raise ToolConfigurationError(
+ "Computing a point-like IRF requires an (optimized) theta cut."
+ )
+
+ check_e_bins = partial(
+ check_bins_in_range,
+ valid_range=self.opt_result.valid_energy,
+ raise_error=self.range_check_error,
+ )
+ self.particles = [
+ EventLoader(
+ parent=self,
+ kind="gammas",
+ file=self.gamma_file,
+ target_spectrum=self.gamma_target_spectrum,
+ ),
+ ]
+ if self.do_background:
+ if not self.proton_file or (
+ self.proton_file and not self.proton_file.exists()
+ ):
+ raise ValueError(
+ "At least a proton file required when specifying `do_background`."
+ )
+
+ self.particles.append(
+ EventLoader(
+ parent=self,
+ kind="protons",
+ file=self.proton_file,
+ target_spectrum=self.proton_target_spectrum,
+ )
+ )
+ if self.electron_file and self.electron_file.exists():
+ self.particles.append(
+ EventLoader(
+ parent=self,
+ kind="electrons",
+ file=self.electron_file,
+ target_spectrum=self.electron_target_spectrum,
+ )
+ )
+ else:
+ self.log.warning("Estimating background without electron file.")
+
+ self.bkg = BackgroundRateMakerBase.from_name(self.bkg_maker, parent=self)
+ check_e_bins(
+ bins=self.bkg.reco_energy_bins, source="background reco energy"
+ )
+
+ self.edisp = EnergyDispersionMakerBase.from_name(self.edisp_maker, parent=self)
+ self.aeff = EffectiveAreaMakerBase.from_name(self.aeff_maker, parent=self)
+
+ if not self.point_like:
+ self.psf = PsfMakerBase.from_name(self.psf_maker, parent=self)
+
+ if self.benchmarks_output_path is not None:
+ self.angular_resolution = AngularResolutionMakerBase.from_name(
+ self.angular_resolution_maker, parent=self
+ )
+ if not self.angular_resolution.use_true_energy:
+ check_e_bins(
+ bins=self.angular_resolution.reco_energy_bins,
+ source="Angular resolution energy",
+ )
+
+ self.bias_resolution = EnergyBiasResolutionMakerBase.from_name(
+ self.energy_bias_resolution_maker, parent=self
+ )
+ self.sensitivity = SensitivityMakerBase.from_name(
+ self.sensitivity_maker, parent=self
+ )
+ check_e_bins(
+ bins=self.sensitivity.reco_energy_bins, source="Sensitivity reco energy"
+ )
+
+ def calculate_selections(self, reduced_events: dict) -> dict:
+ """
+ Add the selection columns to the signal and optionally background tables.
+
+ Parameters
+ ----------
+ reduced_events: dict
+ dict containing the signal (``"gammas"``) and optionally background
+ tables (``"protons"``, ``"electrons"``)
+
+ Returns
+ -------
+ dict
+ ``reduced_events`` with selection columns added.
+ """
+ reduced_events["gammas"]["selected_gh"] = evaluate_binned_cut(
+ reduced_events["gammas"]["gh_score"],
+ reduced_events["gammas"]["reco_energy"],
+ self.opt_result.gh_cuts,
+ operator.ge,
+ )
+ if self.point_like:
+ reduced_events["gammas"]["selected_theta"] = evaluate_binned_cut(
+ reduced_events["gammas"]["theta"],
+ reduced_events["gammas"]["reco_energy"],
+ self.opt_result.theta_cuts,
+ operator.le,
+ )
+ reduced_events["gammas"]["selected"] = (
+ reduced_events["gammas"]["selected_theta"]
+ & reduced_events["gammas"]["selected_gh"]
+ )
+ else:
+ reduced_events["gammas"]["selected"] = reduced_events["gammas"][
+ "selected_gh"
+ ]
+
+ if self.do_background:
+ bkgs = ["protons", "electrons"] if self.electron_file else ["protons"]
+ n_sel = {"protons": 0, "electrons": 0}
+ for bg_type in bkgs:
+ reduced_events[bg_type]["selected_gh"] = evaluate_binned_cut(
+ reduced_events[bg_type]["gh_score"],
+ reduced_events[bg_type]["reco_energy"],
+ self.opt_result.gh_cuts,
+ operator.ge,
+ )
+ n_sel[bg_type] = np.count_nonzero(
+ reduced_events[bg_type]["selected_gh"]
+ )
+
+ self.log.info(
+ "Keeping %d signal, %d proton events, and %d electron events"
+ % (
+ np.count_nonzero(reduced_events["gammas"]["selected"]),
+ n_sel["protons"],
+ n_sel["electrons"],
+ )
+ )
+ else:
+ self.log.info(
+ "Keeping %d signal events"
+ % (np.count_nonzero(reduced_events["gammas"]["selected"]))
+ )
+ return reduced_events
+
+ def _make_signal_irf_hdus(self, hdus, sim_info):
+ hdus.append(
+ self.aeff.make_aeff_hdu(
+ events=self.signal_events[self.signal_events["selected"]],
+ point_like=self.point_like,
+ signal_is_point_like=self.signal_is_point_like,
+ sim_info=sim_info,
+ )
+ )
+ hdus.append(
+ self.edisp.make_edisp_hdu(
+ events=self.signal_events[self.signal_events["selected"]],
+ point_like=self.point_like,
+ )
+ )
+ if not self.point_like:
+ hdus.append(
+ self.psf.make_psf_hdu(
+ events=self.signal_events[self.signal_events["selected"]]
+ )
+ )
+ else:
+ # TODO: Support fov binning
+ self.log.debug(
+ "Currently multiple fov bins is not supported for RAD_MAX. "
+ "Using `fov_offset_bins = [valid_offset.min, valid_offset.max]`."
+ )
+ hdus.append(
+ create_rad_max_hdu(
+ rad_max=self.opt_result.theta_cuts["cut"].reshape(-1, 1),
+ reco_energy_bins=np.append(
+ self.opt_result.theta_cuts["low"],
+ self.opt_result.theta_cuts["high"][-1],
+ ),
+ fov_offset_bins=u.Quantity(
+ [
+ self.opt_result.valid_offset.min,
+ self.opt_result.valid_offset.max,
+ ]
+ ).reshape(-1),
+ )
+ )
+ return hdus
+
+ def _make_benchmark_hdus(self, hdus):
+ hdus.append(
+ self.bias_resolution.make_bias_resolution_hdu(
+ events=self.signal_events[self.signal_events["selected"]],
+ )
+ )
+ hdus.append(
+ self.angular_resolution.make_angular_resolution_hdu(
+ events=self.signal_events[self.signal_events["selected_gh"]],
+ )
+ )
+ if self.do_background:
+ if self.opt_result.theta_cuts is None:
+ raise ValueError(
+ "Calculating the point-source sensitivity requires "
+ f"theta cuts, but {self.cuts_file} does not contain any."
+ )
+
+ hdus.append(
+ self.sensitivity.make_sensitivity_hdu(
+ signal_events=self.signal_events[self.signal_events["selected"]],
+ background_events=self.background_events[
+ self.background_events["selected_gh"]
+ ],
+ theta_cut=self.opt_result.theta_cuts,
+ gamma_spectrum=self.gamma_target_spectrum,
+ )
+ )
+ return hdus
+
+ def start(self):
+ """
+ Load events and calculate the irf (and the benchmarks).
+ """
+ reduced_events = dict()
+ for sel in self.particles:
+ if sel.epp.gammaness_classifier != self.opt_result.clf_prefix:
+ raise RuntimeError(
+ "G/H cuts are only valid for gammaness scores predicted by "
+ "the same classifier model. Requested model: %s. "
+ "Model used for g/h cuts: %s."
+ % (
+ sel.epp.gammaness_classifier,
+ self.opt_result.clf_prefix,
+ )
+ )
+
+ if sel.epp.quality_criteria != self.opt_result.precuts.quality_criteria:
+ self.log.warning(
+ "Precuts are different from precuts used for calculating "
+ "g/h / theta cuts. Provided precuts:\n%s. "
+ "\nUsing the same precuts as g/h / theta cuts:\n%s. "
+ % (
+ sel.epp.to_table(functions=True)["criteria", "func"],
+ self.opt_result.precuts.to_table(functions=True)[
+ "criteria", "func"
+ ],
+ )
+ )
+ sel.epp = EventPreProcessor(
+ parent=sel,
+ quality_criteria=self.opt_result.precuts.quality_criteria,
+ )
+
+ self.log.debug("%s Precuts: %s" % (sel.kind, sel.epp.quality_criteria))
+ evs, cnt, meta = sel.load_preselected_events(self.chunk_size, self.obs_time)
+ # Only calculate event weights if background or sensitivity should be calculated.
+ if self.do_background:
+ # Sensitivity is only calculated, if do_background is true
+ # and benchmarks_output_path is given.
+ if self.benchmarks_output_path is not None:
+ evs = sel.make_event_weights(
+ evs, meta["spectrum"], self.sensitivity.fov_offset_bins
+ )
+ # If only background should be calculated,
+ # only calculate weights for protons and electrons.
+ elif sel.kind in ("protons", "electrons"):
+ evs = sel.make_event_weights(evs, meta["spectrum"])
+
+ reduced_events[sel.kind] = evs
+ reduced_events[f"{sel.kind}_count"] = cnt
+ reduced_events[f"{sel.kind}_meta"] = meta
+ self.log.debug(
+ "Loaded %d %s events" % (reduced_events[f"{sel.kind}_count"], sel.kind)
+ )
+ if sel.kind == "gammas":
+ self.signal_is_point_like = (
+ meta["sim_info"].viewcone_max - meta["sim_info"].viewcone_min
+ ).value == 0
+
+ if self.signal_is_point_like:
+ errormessage = """The gamma input file contains point-like simulations.
+ Therefore, the IRF can only be calculated at a single point
+ in the FoV, but `fov_offset_n_bins > 1`."""
+
+ if self.edisp.fov_offset_n_bins > 1 or self.aeff.fov_offset_n_bins > 1:
+ raise ToolConfigurationError(errormessage)
+
+ if not self.point_like and self.psf.fov_offset_n_bins > 1:
+ raise ToolConfigurationError(errormessage)
+
+ if self.do_background and self.bkg.fov_offset_n_bins > 1:
+ raise ToolConfigurationError(errormessage)
+
+ if self.benchmarks_output_path is not None and (
+ self.angular_resolution.fov_offset_n_bins > 1
+ or self.bias_resolution.fov_offset_n_bins > 1
+ or self.sensitivity.fov_offset_n_bins > 1
+ ):
+ raise ToolConfigurationError(errormessage)
+
+ reduced_events = self.calculate_selections(reduced_events)
+
+ self.signal_events = reduced_events["gammas"]
+ if self.do_background:
+ if self.electron_file:
+ self.background_events = vstack(
+ [reduced_events["protons"], reduced_events["electrons"]]
+ )
+ else:
+ self.background_events = reduced_events["protons"]
+
+ hdus = [fits.PrimaryHDU()]
+ hdus = self._make_signal_irf_hdus(
+ hdus, reduced_events["gammas_meta"]["sim_info"]
+ )
+ if self.do_background:
+ hdus.append(
+ self.bkg.make_bkg_hdu(
+ self.background_events[self.background_events["selected_gh"]],
+ self.obs_time,
+ )
+ )
+ if "protons" in reduced_events.keys():
+ hdus.append(
+ self.aeff.make_aeff_hdu(
+ events=reduced_events["protons"][
+ reduced_events["protons"]["selected_gh"]
+ ],
+ point_like=self.point_like,
+ signal_is_point_like=False,
+ sim_info=reduced_events["protons_meta"]["sim_info"],
+ extname="EFFECTIVE AREA PROTONS",
+ )
+ )
+ if "electrons" in reduced_events.keys():
+ hdus.append(
+ self.aeff.make_aeff_hdu(
+ events=reduced_events["electrons"][
+ reduced_events["electrons"]["selected_gh"]
+ ],
+ point_like=self.point_like,
+ signal_is_point_like=False,
+ sim_info=reduced_events["electrons_meta"]["sim_info"],
+ extname="EFFECTIVE AREA ELECTRONS",
+ )
+ )
+ self.hdus = hdus
+
+ if self.benchmarks_output_path is not None:
+ b_hdus = [fits.PrimaryHDU()]
+ b_hdus = self._make_benchmark_hdus(b_hdus)
+ self.b_hdus = b_hdus
+
+ def finish(self):
+ """
+ Write the irf (and the benchmarks) to the (respective) output file(s).
+ """
+ self.log.info("Writing outputfile '%s'" % self.output_path)
+ fits.HDUList(self.hdus).writeto(
+ self.output_path,
+ overwrite=self.overwrite,
+ )
+ Provenance().add_output_file(self.output_path, role="IRF")
+ if self.benchmarks_output_path is not None:
+ self.log.info(
+ "Writing benchmark file to '%s'" % self.benchmarks_output_path
+ )
+ fits.HDUList(self.b_hdus).writeto(
+ self.benchmarks_output_path,
+ overwrite=self.overwrite,
+ )
+ Provenance().add_output_file(self.benchmarks_output_path, role="Benchmark")
+
+
+def main():
+ tool = IrfTool()
+ tool.run()
+
+
+if __name__ == "main":
+ main()
diff --git a/src/ctapipe/tools/conftest.py b/src/ctapipe/tools/conftest.py
new file mode 100644
index 00000000000..39effc92055
--- /dev/null
+++ b/src/ctapipe/tools/conftest.py
@@ -0,0 +1,4 @@
+from importlib.util import find_spec
+
+if find_spec("pyirf") is None:
+ collect_ignore = ["compute_irf.py", "optimize_event_selection.py"]
diff --git a/src/ctapipe/tools/info.py b/src/ctapipe/tools/info.py
index 70aee3529ab..da0f65f2305 100644
--- a/src/ctapipe/tools/info.py
+++ b/src/ctapipe/tools/info.py
@@ -1,5 +1,6 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
-""" print information about ctapipe and its command-line tools. """
+"""print information about ctapipe and its command-line tools."""
+
import logging
import os
import sys
@@ -123,19 +124,15 @@ def _info_tools():
print("the following can be executed by typing ctapipe-:")
print("")
- # TODO: how to get a one-line description or
- # full help text from the docstring or ArgumentParser?
- # This is the function names, we want the command-line names
- # that are defined in setup.py !???
from textwrap import TextWrapper
from ctapipe.tools.utils import get_all_descriptions
- wrapper = TextWrapper(width=80, subsequent_indent=" " * 35)
+ wrapper = TextWrapper(width=80, subsequent_indent=" " * 37)
scripts = get_all_descriptions()
for name, desc in sorted(scripts.items()):
- text = f"{name:<30s} - {desc}"
+ text = f"{name:<33s} - {desc}"
print(wrapper.fill(text))
print("")
print("")
diff --git a/src/ctapipe/tools/optimize_event_selection.py b/src/ctapipe/tools/optimize_event_selection.py
new file mode 100644
index 00000000000..f82165bf852
--- /dev/null
+++ b/src/ctapipe/tools/optimize_event_selection.py
@@ -0,0 +1,233 @@
+"""Tool to generate selections for IRFs production"""
+
+import astropy.units as u
+from astropy.table import vstack
+
+from ..core import Provenance, Tool, traits
+from ..core.traits import AstroQuantity, Integer, classes_with_traits
+from ..irf import EventLoader, Spectra
+from ..irf.optimize import CutOptimizerBase
+
+__all__ = ["EventSelectionOptimizer"]
+
+
+class EventSelectionOptimizer(Tool):
+ "Tool to create optimized cuts for IRF generation"
+
+ name = "ctapipe-optimize-event-selection"
+ description = __doc__
+ examples = """
+ ctapipe-optimize-event-selection \\
+ --gamma-file gamma.dl2.h5 \\
+ --proton-file proton.dl2.h5 \\
+ --electron-file electron.dl2.h5 \\
+ --output cuts.fits
+ """
+
+ gamma_file = traits.Path(
+ default_value=None, directory_ok=False, help="Gamma input filename and path"
+ ).tag(config=True)
+
+ gamma_target_spectrum = traits.UseEnum(
+ Spectra,
+ default_value=Spectra.CRAB_HEGRA,
+ help="Name of the spectrum used for weights of gamma events.",
+ ).tag(config=True)
+
+ proton_file = traits.Path(
+ default_value=None,
+ directory_ok=False,
+ allow_none=True,
+ help=(
+ "Proton input filename and path. "
+ "Not needed, if ``optimization_algorithm = 'PercentileCuts'``."
+ ),
+ ).tag(config=True)
+
+ proton_target_spectrum = traits.UseEnum(
+ Spectra,
+ default_value=Spectra.IRFDOC_PROTON_SPECTRUM,
+ help="Name of the spectrum used for weights of proton events.",
+ ).tag(config=True)
+
+ electron_file = traits.Path(
+ default_value=None,
+ directory_ok=False,
+ allow_none=True,
+ help=(
+ "Electron input filename and path. "
+ "Not needed, if ``optimization_algorithm = 'PercentileCuts'``."
+ ),
+ ).tag(config=True)
+
+ electron_target_spectrum = traits.UseEnum(
+ Spectra,
+ default_value=Spectra.IRFDOC_ELECTRON_SPECTRUM,
+ help="Name of the spectrum used for weights of electron events.",
+ ).tag(config=True)
+
+ chunk_size = Integer(
+ default_value=100000,
+ allow_none=True,
+ help="How many subarray events to load at once when preselecting events.",
+ ).tag(config=True)
+
+ output_path = traits.Path(
+ default_value="./Selection_Cuts.fits",
+ allow_none=False,
+ directory_ok=False,
+ help="Output file storing optimization result",
+ ).tag(config=True)
+
+ obs_time = AstroQuantity(
+ default_value=u.Quantity(50, u.hour),
+ physical_type=u.physical.time,
+ help=(
+ "Observation time in the form `` ``."
+ " This is used for flux normalization when calculating sensitivities."
+ ),
+ ).tag(config=True)
+
+ optimization_algorithm = traits.ComponentName(
+ CutOptimizerBase,
+ default_value="PointSourceSensitivityOptimizer",
+ help="The cut optimization algorithm to be used.",
+ ).tag(config=True)
+
+ aliases = {
+ "gamma-file": "EventSelectionOptimizer.gamma_file",
+ "proton-file": "EventSelectionOptimizer.proton_file",
+ "electron-file": "EventSelectionOptimizer.electron_file",
+ "output": "EventSelectionOptimizer.output_path",
+ "chunk_size": "EventSelectionOptimizer.chunk_size",
+ }
+
+ classes = [EventLoader] + classes_with_traits(CutOptimizerBase)
+
+ def setup(self):
+ """
+ Initialize components from config.
+ """
+ self.optimizer = CutOptimizerBase.from_name(
+ self.optimization_algorithm, parent=self
+ )
+ self.particles = [
+ EventLoader(
+ parent=self,
+ kind="gammas",
+ file=self.gamma_file,
+ target_spectrum=self.gamma_target_spectrum,
+ )
+ ]
+ if self.optimization_algorithm != "PercentileCuts":
+ if not self.proton_file or (
+ self.proton_file and not self.proton_file.exists()
+ ):
+ raise ValueError(
+ "Need a proton file for cut optimization "
+ f"using {self.optimization_algorithm}."
+ )
+
+ self.particles.append(
+ EventLoader(
+ parent=self,
+ kind="protons",
+ file=self.proton_file,
+ target_spectrum=self.proton_target_spectrum,
+ )
+ )
+ if self.electron_file and self.electron_file.exists():
+ self.particles.append(
+ EventLoader(
+ parent=self,
+ kind="electrons",
+ file=self.electron_file,
+ target_spectrum=self.electron_target_spectrum,
+ )
+ )
+ else:
+ self.log.warning("Optimizing cuts without electron file.")
+
+ def start(self):
+ """
+ Load events and optimize g/h (and theta) cuts.
+ """
+ reduced_events = dict()
+ for sel in self.particles:
+ evs, cnt, meta = sel.load_preselected_events(self.chunk_size, self.obs_time)
+ if self.optimization_algorithm == "PointSourceSensitivityOptimizer":
+ evs = sel.make_event_weights(
+ evs,
+ meta["spectrum"],
+ (
+ self.optimizer.min_bkg_fov_offset,
+ self.optimizer.max_bkg_fov_offset,
+ ),
+ )
+
+ reduced_events[sel.kind] = evs
+ reduced_events[f"{sel.kind}_count"] = cnt
+ if sel.kind == "gammas":
+ self.sim_info = meta["sim_info"]
+ self.gamma_spectrum = meta["spectrum"]
+
+ self.signal_events = reduced_events["gammas"]
+
+ if self.optimization_algorithm == "PercentileCuts":
+ self.log.debug("Loaded %d gammas" % reduced_events["gammas_count"])
+ self.log.debug("Keeping %d gammas" % len(reduced_events["gammas"]))
+ self.log.info("Optimizing cuts using %d signal" % len(self.signal_events))
+ else:
+ if "electrons" not in reduced_events.keys():
+ reduced_events["electrons"] = []
+ reduced_events["electrons_count"] = 0
+ self.log.debug(
+ "Loaded %d gammas, %d protons, %d electrons"
+ % (
+ reduced_events["gammas_count"],
+ reduced_events["protons_count"],
+ reduced_events["electrons_count"],
+ )
+ )
+ self.log.debug(
+ "Keeping %d gammas, %d protons, %d electrons"
+ % (
+ len(reduced_events["gammas"]),
+ len(reduced_events["protons"]),
+ len(reduced_events["electrons"]),
+ )
+ )
+ self.background_events = vstack(
+ [reduced_events["protons"], reduced_events["electrons"]]
+ )
+ self.log.info(
+ "Optimizing cuts using %d signal and %d background events"
+ % (len(self.signal_events), len(self.background_events)),
+ )
+
+ result = self.optimizer.optimize_cuts(
+ signal=self.signal_events,
+ background=self.background_events
+ if self.optimization_algorithm != "PercentileCuts"
+ else None,
+ precuts=self.particles[0].epp, # identical precuts for all particle types
+ clf_prefix=self.particles[0].epp.gammaness_classifier,
+ )
+ self.result = result
+
+ def finish(self):
+ """
+ Write optimized cuts to the output file.
+ """
+ self.log.info("Writing results to %s" % self.output_path)
+ Provenance().add_output_file(self.output_path, role="Optimization Result")
+ self.result.write(self.output_path, self.overwrite)
+
+
+def main():
+ tool = EventSelectionOptimizer()
+ tool.run()
+
+
+if __name__ == "main":
+ main()
diff --git a/src/ctapipe/tools/tests/test_compute_irf.py b/src/ctapipe/tools/tests/test_compute_irf.py
new file mode 100644
index 00000000000..da169f8769d
--- /dev/null
+++ b/src/ctapipe/tools/tests/test_compute_irf.py
@@ -0,0 +1,284 @@
+import json
+import logging
+import os
+
+import pytest
+from astropy.io import fits
+
+from ctapipe.core import ToolConfigurationError, run_tool
+
+pytest.importorskip("pyirf")
+
+
+@pytest.fixture(scope="module")
+def dummy_cuts_file(
+ gamma_diffuse_full_reco_file,
+ proton_full_reco_file,
+ event_loader_config_path,
+ irf_tmp_path,
+):
+ from ctapipe.tools.optimize_event_selection import EventSelectionOptimizer
+
+ output_path = irf_tmp_path / "dummy_cuts.fits"
+ run_tool(
+ EventSelectionOptimizer(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--proton-file={proton_full_reco_file}",
+ # Use diffuse gammas weighted to electron spectrum as stand-in
+ f"--electron-file={gamma_diffuse_full_reco_file}",
+ f"--output={output_path}",
+ f"--config={event_loader_config_path}",
+ ],
+ )
+ return output_path
+
+
+@pytest.mark.parametrize("include_bkg", (False, True))
+@pytest.mark.parametrize("point_like", (True, False))
+def test_irf_tool(
+ gamma_diffuse_full_reco_file,
+ proton_full_reco_file,
+ event_loader_config_path,
+ dummy_cuts_file,
+ tmp_path,
+ include_bkg,
+ point_like,
+):
+ from ctapipe.tools.compute_irf import IrfTool
+
+ output_path = tmp_path / "irf.fits.gz"
+ output_benchmarks_path = tmp_path / "benchmarks.fits.gz"
+
+ argv = [
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--cuts={dummy_cuts_file}",
+ f"--output={output_path}",
+ f"--config={event_loader_config_path}",
+ ]
+ if point_like:
+ argv.append("--point-like")
+
+ if include_bkg:
+ argv.append(f"--proton-file={proton_full_reco_file}")
+ # Use diffuse gammas weighted to electron spectrum as stand-in
+ argv.append(f"--electron-file={gamma_diffuse_full_reco_file}")
+ else:
+ argv.append("--no-do-background")
+
+ ret = run_tool(IrfTool(), argv=argv)
+ assert ret == 0
+
+ assert output_path.exists()
+ assert not output_benchmarks_path.exists()
+ # Readability by gammapy is tested by pyirf tests, so not repeated here
+ with fits.open(output_path) as hdul:
+ assert isinstance(hdul["ENERGY DISPERSION"], fits.BinTableHDU)
+ assert isinstance(hdul["EFFECTIVE AREA"], fits.BinTableHDU)
+ if point_like:
+ assert isinstance(hdul["RAD_MAX"], fits.BinTableHDU)
+ else:
+ assert isinstance(hdul["PSF"], fits.BinTableHDU)
+
+ if include_bkg:
+ assert isinstance(hdul["BACKGROUND"], fits.BinTableHDU)
+
+ os.remove(output_path) # Delete output file
+
+ # Include benchmarks
+ argv.append(f"--benchmark-output={output_benchmarks_path}")
+ ret = run_tool(IrfTool(), argv=argv)
+ assert ret == 0
+
+ assert output_path.exists()
+ assert output_benchmarks_path.exists()
+ with fits.open(output_benchmarks_path) as hdul:
+ assert isinstance(hdul["ENERGY BIAS RESOLUTION"], fits.BinTableHDU)
+ assert isinstance(hdul["ANGULAR RESOLUTION"], fits.BinTableHDU)
+ if include_bkg:
+ assert isinstance(hdul["SENSITIVITY"], fits.BinTableHDU)
+
+
+def test_irf_tool_no_electrons(
+ gamma_diffuse_full_reco_file,
+ proton_full_reco_file,
+ event_loader_config_path,
+ dummy_cuts_file,
+ tmp_path,
+):
+ from ctapipe.tools.compute_irf import IrfTool
+
+ output_path = tmp_path / "irf.fits.gz"
+ output_benchmarks_path = tmp_path / "benchmarks.fits.gz"
+ logpath = tmp_path / "test_irf_tool_no_electrons.log"
+ logger = logging.getLogger("ctapipe.tools.compute_irf")
+ logger.addHandler(logging.FileHandler(logpath))
+
+ ret = run_tool(
+ IrfTool(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--proton-file={proton_full_reco_file}",
+ f"--cuts={dummy_cuts_file}",
+ f"--output={output_path}",
+ f"--benchmark-output={output_benchmarks_path}",
+ f"--config={event_loader_config_path}",
+ f"--log-file={logpath}",
+ ],
+ )
+ assert ret == 0
+ assert output_path.exists()
+ assert output_benchmarks_path.exists()
+ assert "Estimating background without electron file." in logpath.read_text()
+
+
+def test_irf_tool_only_gammas(
+ gamma_diffuse_full_reco_file, event_loader_config_path, dummy_cuts_file, tmp_path
+):
+ from ctapipe.tools.compute_irf import IrfTool
+
+ output_path = tmp_path / "irf.fits.gz"
+ output_benchmarks_path = tmp_path / "benchmarks.fits.gz"
+
+ with pytest.raises(
+ ValueError,
+ match="At least a proton file required when specifying `do_background`.",
+ ):
+ run_tool(
+ IrfTool(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--cuts={dummy_cuts_file}",
+ f"--output={output_path}",
+ f"--benchmark-output={output_benchmarks_path}",
+ f"--config={event_loader_config_path}",
+ ],
+ raises=True,
+ )
+
+ ret = run_tool(
+ IrfTool(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--cuts={dummy_cuts_file}",
+ f"--output={output_path}",
+ f"--benchmark-output={output_benchmarks_path}",
+ f"--config={event_loader_config_path}",
+ "--no-do-background",
+ ],
+ )
+ assert ret == 0
+ assert output_path.exists()
+ assert output_benchmarks_path.exists()
+
+
+# TODO: Add test using point-like gammas
+
+
+def test_point_like_irf_no_theta_cut(
+ gamma_diffuse_full_reco_file,
+ proton_full_reco_file,
+ event_loader_config_path,
+ dummy_cuts_file,
+ tmp_path,
+):
+ from ctapipe.irf import OptimizationResult
+ from ctapipe.tools.compute_irf import IrfTool
+
+ gh_cuts_path = tmp_path / "gh_cuts.fits"
+ cuts = OptimizationResult.read(dummy_cuts_file)
+ cuts.theta_cuts = None
+ cuts.write(gh_cuts_path)
+ assert gh_cuts_path.exists()
+
+ output_path = tmp_path / "irf.fits.gz"
+ output_benchmarks_path = tmp_path / "benchmarks.fits.gz"
+
+ with pytest.raises(
+ ToolConfigurationError,
+ match=r"Computing a point-like IRF requires an \(optimized\) theta cut.",
+ ):
+ run_tool(
+ IrfTool(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--proton-file={proton_full_reco_file}",
+ # Use diffuse gammas weighted to electron spectrum as stand-in
+ f"--electron-file={gamma_diffuse_full_reco_file}",
+ f"--cuts={gh_cuts_path}",
+ f"--output={output_path}",
+ f"--benchmark-output={output_benchmarks_path}",
+ f"--config={event_loader_config_path}",
+ "--point-like",
+ ],
+ raises=True,
+ )
+
+
+def test_irf_tool_wrong_cuts(
+ gamma_diffuse_full_reco_file, proton_full_reco_file, dummy_cuts_file, tmp_path
+):
+ from ctapipe.tools.compute_irf import IrfTool
+
+ output_path = tmp_path / "irf.fits.gz"
+ output_benchmarks_path = tmp_path / "benchmarks.fits.gz"
+
+ with pytest.raises(RuntimeError):
+ run_tool(
+ IrfTool(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--proton-file={proton_full_reco_file}",
+ # Use diffuse gammas weighted to electron spectrum as stand-in
+ f"--electron-file={gamma_diffuse_full_reco_file}",
+ f"--cuts={dummy_cuts_file}",
+ f"--output={output_path}",
+ f"--benchmark-output={output_benchmarks_path}",
+ ],
+ raises=True,
+ )
+
+ config_path = tmp_path / "config.json"
+ with config_path.open("w") as f:
+ json.dump(
+ {
+ "EventPreProcessor": {
+ "energy_reconstructor": "ExtraTreesRegressor",
+ "geometry_reconstructor": "HillasReconstructor",
+ "gammaness_classifier": "ExtraTreesClassifier",
+ "quality_criteria": [
+ # No criteria for minimum event multiplicity
+ ("valid classifier", "ExtraTreesClassifier_is_valid"),
+ ("valid geom reco", "HillasReconstructor_is_valid"),
+ ("valid energy reco", "ExtraTreesRegressor_is_valid"),
+ ],
+ }
+ },
+ f,
+ )
+
+ logpath = tmp_path / "test_irf_tool_wrong_cuts.log"
+ logger = logging.getLogger("ctapipe.tools.compute_irf")
+ logger.addHandler(logging.FileHandler(logpath))
+
+ ret = run_tool(
+ IrfTool(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--proton-file={proton_full_reco_file}",
+ # Use diffuse gammas weighted to electron spectrum as stand-in
+ f"--electron-file={gamma_diffuse_full_reco_file}",
+ f"--cuts={dummy_cuts_file}",
+ f"--output={output_path}",
+ f"--benchmark-output={output_benchmarks_path}",
+ f"--config={config_path}",
+ f"--log-file={logpath}",
+ ],
+ )
+ assert ret == 0
+ assert output_path.exists()
+ assert output_benchmarks_path.exists()
+ assert (
+ "Precuts are different from precuts used for calculating g/h / theta cuts."
+ in logpath.read_text()
+ )
diff --git a/src/ctapipe/tools/tests/test_optimize_event_selection.py b/src/ctapipe/tools/tests/test_optimize_event_selection.py
new file mode 100644
index 00000000000..226043a66c5
--- /dev/null
+++ b/src/ctapipe/tools/tests/test_optimize_event_selection.py
@@ -0,0 +1,116 @@
+import logging
+
+import astropy.units as u
+import pytest
+from astropy.table import QTable
+
+from ctapipe.core import QualityQuery, run_tool
+
+pytest.importorskip("pyirf")
+
+
+def test_cuts_optimization(
+ gamma_diffuse_full_reco_file,
+ proton_full_reco_file,
+ event_loader_config_path,
+ tmp_path,
+):
+ from ctapipe.irf import (
+ OptimizationResult,
+ ResultValidRange,
+ )
+ from ctapipe.tools.optimize_event_selection import EventSelectionOptimizer
+
+ output_path = tmp_path / "cuts.fits"
+
+ argv = [
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--proton-file={proton_full_reco_file}",
+ # Use diffuse gammas weighted to electron spectrum as stand-in
+ f"--electron-file={gamma_diffuse_full_reco_file}",
+ f"--output={output_path}",
+ f"--config={event_loader_config_path}",
+ ]
+ ret = run_tool(EventSelectionOptimizer(), argv=argv)
+ assert ret == 0
+
+ result = OptimizationResult.read(output_path)
+ assert isinstance(result, OptimizationResult)
+ assert isinstance(result.precuts, QualityQuery)
+ assert isinstance(result.valid_energy, ResultValidRange)
+ assert isinstance(result.valid_offset, ResultValidRange)
+ assert isinstance(result.gh_cuts, QTable)
+ assert result.clf_prefix == "ExtraTreesClassifier"
+ assert "cut" in result.gh_cuts.colnames
+ assert isinstance(result.theta_cuts, QTable)
+ assert "cut" in result.theta_cuts.colnames
+
+ for c in ["low", "center", "high"]:
+ assert c in result.gh_cuts.colnames
+ assert result.gh_cuts[c].unit == u.TeV
+ assert c in result.theta_cuts.colnames
+ assert result.theta_cuts[c].unit == u.TeV
+
+
+def test_cuts_opt_no_electrons(
+ gamma_diffuse_full_reco_file,
+ proton_full_reco_file,
+ event_loader_config_path,
+ tmp_path,
+):
+ from ctapipe.tools.optimize_event_selection import EventSelectionOptimizer
+
+ output_path = tmp_path / "cuts.fits"
+ logpath = tmp_path / "test_cuts_opt_no_electrons.log"
+ logger = logging.getLogger("ctapipe.tools.optimize_event_selection")
+ logger.addHandler(logging.FileHandler(logpath))
+
+ ret = run_tool(
+ EventSelectionOptimizer(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--proton-file={proton_full_reco_file}",
+ f"--output={output_path}",
+ f"--config={event_loader_config_path}",
+ f"--log-file={logpath}",
+ ],
+ )
+ assert ret == 0
+ assert "Optimizing cuts without electron file." in logpath.read_text()
+
+
+def test_cuts_opt_only_gammas(
+ gamma_diffuse_full_reco_file, event_loader_config_path, tmp_path
+):
+ from ctapipe.tools.optimize_event_selection import EventSelectionOptimizer
+
+ output_path = tmp_path / "cuts.fits"
+
+ with pytest.raises(
+ ValueError,
+ match=(
+ "Need a proton file for cut optimization using "
+ "PointSourceSensitivityOptimizer"
+ ),
+ ):
+ run_tool(
+ EventSelectionOptimizer(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--output={output_path}",
+ f"--config={event_loader_config_path}",
+ ],
+ raises=True,
+ )
+
+ ret = run_tool(
+ EventSelectionOptimizer(),
+ argv=[
+ f"--gamma-file={gamma_diffuse_full_reco_file}",
+ f"--output={output_path}",
+ f"--config={event_loader_config_path}",
+ "--EventSelectionOptimizer.optimization_algorithm=PercentileCuts",
+ ],
+ )
+ assert ret == 0
+ assert output_path.exists()
diff --git a/src/ctapipe/tools/utils.py b/src/ctapipe/tools/utils.py
index 5b1e468db93..69cf6f04326 100644
--- a/src/ctapipe/tools/utils.py
+++ b/src/ctapipe/tools/utils.py
@@ -1,10 +1,13 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Utils to create scripts and command-line tools"""
+
import argparse
-import importlib
+import ast
import logging
from collections import OrderedDict
from importlib.metadata import distribution
+from importlib.util import find_spec
+from pathlib import Path
import numpy as np
from astropy.table import vstack
@@ -67,15 +70,12 @@ def get_all_descriptions():
descriptions = OrderedDict()
for name, value in tools.items():
- module_name, attr = value.split(":")
- module = importlib.import_module(module_name)
- if hasattr(module, "__doc__") and module.__doc__ is not None:
- try:
- descrip = module.__doc__
- descrip.replace("\n", "")
- descriptions[name] = descrip
- except Exception as err:
- descriptions[name] = f"[Couldn't parse docstring: {err}]"
+ module_name, _ = value.split(":")
+ descrip = ast.get_docstring(
+ ast.parse(Path(find_spec(module_name).origin).read_text())
+ )
+ if descrip is not None:
+ descriptions[name] = descrip.replace("\n", " ")
else:
descriptions[name] = "[no documentation. Please add a docstring]"