Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar-1 #39

Merged
merged 6 commits into from
Aug 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
matched filter class + robust noise estimation
pravirkr committed Jul 17, 2024
commit c52622cb68301314ffdb8f16322576e03c361c94
325 changes: 325 additions & 0 deletions sigpyproc/core/filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
from __future__ import annotations

import numpy as np
from astropy.convolution import convolve_fft
from astropy.convolution.kernels import Model1DKernel
from astropy.modeling.models import Box1D, Gaussian1D, Lorentz1D
from astropy.stats import gaussian_fwhm_to_sigma
from matplotlib import pyplot as plt

from sigpyproc.core.stats import estimate_noise_std


class MatchedFilter:
"""Matched filter class for pulse detection.

This class implements a matched filter algorithm to detect pulses in 1D data.

Parameters
----------
data : np.ndarray
Input data array
temp_kind : str, optional
Type of the pulse template, by default "boxcar"
nbins_max : int, optional
Maximum number of bins for template width, by default 32
spacing_factor : float, optional
Factor for spacing between template widths, by default 1.5

Raises
------
ValueError
_description_
"""

def __init__(
self,
data: np.ndarray,
noise_method: str = "iqr",
temp_kind: str = "boxcar",
nbins_max: int = 32,
spacing_factor: float = 1.5,
) -> None:
if data.ndim != 1:
msg = f"Data dimension {data.ndim} is not supported."
raise ValueError(msg)
self._temp_kind = temp_kind
self._noise_method = noise_method
self._data = self._get_norm_data(data)
self._temp_widths = self.get_width_spacing(nbins_max, spacing_factor)
self._temp_bank = [
getattr(Template, f"gen_{self.temp_kind}")(iwidth)
for iwidth in self.temp_widths
]

self._convs = np.array(
[
convolve_fft(self.data, temp.kernel, normalize_kernel=False)
for temp in self.temp_bank
],
)
self._itemp, self._peak_bin = np.unravel_index(
self._convs.argmax(),
self._convs.shape,
)
self._best_temp = self.temp_bank[self._itemp]
self._best_snr = self._convs[self._itemp, self._peak_bin]

@property
def data(self) -> np.ndarray:
return self._data

@property
def noise_method(self) -> str:
return self._noise_method

@property
def temp_kind(self) -> str:
return self._temp_kind

@property
def temp_widths(self) -> np.ndarray:
return self._temp_widths

@property
def temp_bank(self) -> list[Template]:
return self._temp_bank

@property
def convs(self) -> np.ndarray:
return self._convs

@property
def peak_bin(self) -> int:
"""Best match template peak bin (`int`, read-only)."""
return int(self._peak_bin)

@property
def best_temp(self) -> Template:
return self._best_temp

@property
def snr(self) -> float:
"""Signal-to-noise ratio based on best match template on pulse."""
return self._best_snr

@property
def best_model(self) -> np.ndarray:
"""Best match template fit (`np.ndarray`, read-only)."""
return self.snr * np.roll(
self.best_temp.get_padded(self.data.size),
self.peak_bin - self.best_temp.ref_bin,
)

@property
def on_pulse(self) -> tuple[int, int]:
"""Best match template pulse region (`Tuple[int, int]`, read-only)."""
# \TODO check for edge cases
return (
self.peak_bin - round(self.best_temp.width),
self.peak_bin + round(self.best_temp.width),
)

def plot(self) -> plt.Figure:
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(self.data, label="Data")
ax.plot(self.best_model, label="Best Model")
ax.axvline(self.peak_bin, color="r", linestyle="--", label="Peak")
ax.axvspan(*self.on_pulse, alpha=0.2, color="g", label="On Pulse")
ax.set(
xlabel="Bin",
ylabel="Amplitude",
title=f"Matched Filter Result (SNR: {self.snr:.2f})",
)
ax.legend()
fig.tight_layout()
return fig

def _get_norm_data(self, data: np.ndarray) -> np.ndarray:
data = np.asarray(data, dtype=np.float32)
median = np.median(data)
noise_std = estimate_noise_std(data, self.noise_method)
return (data - median) / noise_std

@staticmethod
def get_width_spacing(
nbins_max: int,
spacing_factor: float = 1.5,
) -> np.ndarray:
"""Get width spacing for matched filtering.

Parameters
----------
nbins_max : int
Maximum number of bins.
spacing_factor : float, optional
Spacing factor for width, by default 1.5

Returns
-------
np.ndarray
Width spacing for matched filtering.
"""
widths = [1]
while widths[-1] < nbins_max:
next_width = int(max(widths[-1] + 1, spacing_factor * widths[-1]))
if next_width > nbins_max:
break
widths.append(next_width)
return np.array(widths, dtype=np.float32)


class Template:
"""1D pulse template class for matched filtering.

This class represents various pulse shapes as templates for matched filtering
and provides methods to generate and visualize them.

Parameters
----------
kernel : Model1DKernel
Astropy 1D model kernel.
width : float
Width of the pulse template in bins.
kind : str, optional
Type of the pulse template, by default "custom"
"""

def __init__(
self,
kernel: Model1DKernel,
width: float,
kind: str = "custom",
) -> None:
self._kernel = kernel
self._width = width
self._kind = kind

@property
def kernel(self) -> Model1DKernel:
"""Astropy 1D model kernel (`Model1DKernel`, read-only)."""
return self._kernel

@property
def width(self) -> float:
"""Width of the pulse template in bins (`float`, read-only)."""
return self._width

@property
def kind(self) -> str:
"""Type of the pulse template (`str`, read-only)."""
return self._kind

@property
def ref_bin(self) -> int:
"""Reference bin of the pulse template (`int`, read-only)."""
return self.kernel.center[0]

@property
def size(self) -> int:
"""Size of the pulse template (`int`, read-only)."""
return self.kernel.shape[0]

def get_padded(self, size: int) -> np.ndarray:
"""
Pad template to desired size.

Parameters
----------
size: int
Size of the padded pulse template.
"""
if self.size >= size:
msg = f"Template size {self.size} is larger than {size}."
raise ValueError(msg)
return np.pad(self.kernel.array, (0, size - self.size))

def plot(
self,
figsize: tuple[float, float] = (10, 5),
dpi: int = 100,
) -> plt.Figure:
"""Plot the pulse template.

Parameters
----------
figsize : tuple[float, float], optional
Figure size in inches, by default (10, 5)
dpi : int, optional
Dots per inch, by default 100

Returns
-------
plt.Figure
Matplotlib figure object.
"""
fig, ax = plt.subplots(figsize=figsize, dpi=dpi)
ax.bar(range(self.size), self.kernel.array, ec="k", fc="#a6cee3")
ax.axvline(self.ref_bin, ls="--", lw=2, color="k", label="Ref Bin")
ax.legend()
ax.set(
xlim=(-0.5, self.size - 0.5),
xlabel="Bin",
ylabel="Amplitude",
title=str(self),
)
fig.tight_layout()
return fig

@classmethod
def gen_boxcar(cls, width: int) -> Template:
"""
Generate a boxcar pulse template.

Parameters
----------
width: int
Width of the box in bins.
"""
norm = 1 / np.sqrt(width)
temp = Model1DKernel(Box1D(norm, 0, width), x_size=width)
return cls(temp, width, kind="boxcar")

@classmethod
def gen_gaussian(cls, width: float, extent: float = 3.5) -> Template:
"""
Generate a Gaussian pulse template.

Parameters
----------
width: float
FWHM of the Gaussian pulse in bins.

extent: float
Extent of the Gaussian pulse in sigma units, by default 3.5.
"""
stddev = gaussian_fwhm_to_sigma * width
norm = 1 / (np.sqrt(np.sqrt(np.pi) * stddev))
size = int(np.ceil(extent * stddev) * 2 + 1)
temp = Model1DKernel(Gaussian1D(norm, 0, stddev), x_size=size)
return cls(temp, width, kind="gaussian")

@classmethod
def gen_lorentzian(cls, width: float, extent: float = 3.5) -> Template:
"""
Generate a Lorentzian pulse template for given pulse FWHM (bins).

Parameters
----------
width: float
FWHM of the Lorentzian pulse in bins.

extent: float
Extent of the Lorentzian pulse in sigma units, by default 3.5.
"""
stddev = gaussian_fwhm_to_sigma * width
norm = 1 / (np.sqrt((np.pi * width) / 4))
size = int(np.ceil(extent * stddev) * 2 + 1)
temp = Model1DKernel(Lorentz1D(norm, 0, width), x_size=size)
return cls(temp, width, kind="lorentzian")

def __str__(self) -> str:
return f"Template(size={self.size}, kind={self.kind}, width={self.width:.3f})"

def __repr__(self) -> str:
return str(self)
312 changes: 271 additions & 41 deletions sigpyproc/core/stats.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
from __future__ import annotations

from typing import Callable

import attrs
import bottleneck as bn
import numpy as np
from astropy import stats as astrostats

from sigpyproc.core import kernels


@attrs.define(auto_attribs=True, slots=True, kw_only=True)
class ZScoreResult:
"""Result of a Z-score calculation.
Attributes
----------
zscores: np.ndarray
Robust Z-scores of the array.
loc: float
Estimated location used for the Z-score calculation.
scale: float | np.ndarray
Estimated scale used for the Z-score calculation.
"""

zscores: np.ndarray
loc: float
scale: float | np.ndarray


def running_filter(
array: np.ndarray,
window: int,
@@ -51,91 +74,298 @@ def running_filter(
raise ValueError(msg)
return filtered_ar[window - 1 :]

def zscore_iqr(array: np.ndarray) -> np.ndarray:
"""Calculate the z-score of an array using the IQR (Interquartile Range).

def estimate_loc(array: np.ndarray, method: str = "median") -> float:
"""Estimate the location of an array.
Parameters
----------
array : :py:obj:`~numpy.ndarray`
The array to calculate the z-score of.
array : np.ndarray
The array to estimate the location of.
method : str, optional
The method to use for estimating the location, by default "median".
Returns
-------
:py:obj:`~numpy.ndarray`
The z-score of the array.
float
The estimated location of the array.
Raises
------
ValueError
If the method is not supported
"""
if method == "median":
loc = np.median(array)
elif method == "mean":
loc = np.mean(array)
else:
msg = f"Method {method} is not supported for estimating location."
raise ValueError(msg)
return loc


def _scale_iqr(array: np.ndarray) -> float:
"""Calculate the normalized Inter-quartile Range (IQR) scale of an array.
Parameters
----------
array : np.ndarray
Input array.
Returns
-------
float
The normalized IQR scale of the array.
"""
q1, median, q3 = np.percentile(array, [25, 50, 75])
iqr = (q3 - q1) / 1.349
diff = array - median
return np.divide(diff, iqr, out=np.zeros_like(diff), where=iqr != 0)
# \scipy.stats.norm.ppf(0.75) - scipy.stats.norm.ppf(0.25)
norm = 1.3489795003921634
q1, q3 = np.percentile(array, [25, 75])
return (q3 - q1) / norm


def zscore_mad(array: np.ndarray) -> np.ndarray:
"""Calculate the z-score of an array using the MAD (Modified z-score).
def _scale_mad(array: np.ndarray) -> float:
"""Calculate the Median Absolute Deviation (MAD) scale of an array.
Parameters
----------
array : :py:obj:`~numpy.typing.ArrayLike`
The array to calculate the modified z-score of.
array : np.ndarray
Input array.
Returns
-------
:py:obj:`~numpy.ndarray`
The modified z-score of the array.
float
The MAD scale of the array.
Notes
-----
The modified z-score is defined as:
https://www.ibm.com/docs/en/cognos-analytics/11.1.0?topic=terms-modified-z-score
https://www.ibm.com/docs/en/cognos-analytics/12.0.0?topic=terms-modified-z-score
"""
scale_mad = 0.6744897501960817 # scipy.stats.norm.ppf(3/4.)
scale_aad = np.sqrt(2 / np.pi)
norm = 0.6744897501960817 # scipy.stats.norm.ppf(0.75)
norm_aad = np.sqrt(2 / np.pi)
diff = array - np.median(array)
mad = np.median(np.abs(diff)) / scale_mad
std = np.mean(np.abs(diff)) / scale_aad if mad == 0 else mad
return np.divide(diff, std, out=np.zeros_like(diff), where=std != 0)
mad = np.median(np.abs(diff)) / norm
if mad == 0:
return np.mean(np.abs(diff)) / norm_aad
return mad


def zscore_double_mad(array: np.ndarray) -> np.ndarray:
"""Calculate the modified z-score of an array using the Double MAD.
def _scale_doublemad(array: np.ndarray) -> np.ndarray:
"""Calculate the Double MAD scale of an array.
Parameters
----------
array : :py:obj:`~numpy.typing.ArrayLike`
The array to calculate the modified z-score of.
array : np.ndarray
Input array.
Returns
-------
:py:obj:`~numpy.ndarray`
The modified z-score of the array.
np.ndarray
The Double MAD scale of the array.
Notes
-----
The Double MAD is defined as:
https://eurekastatistics.com/using-the-median-absolute-deviation-to-find-outliers/
https://aakinshin.net/posts/harrell-davis-double-mad-outlier-detector/
"""
scale_mad = 0.6744897501960817 # scipy.stats.norm.ppf(3/4.)
scale_aad = np.sqrt(2 / np.pi)
array = np.asarray(array)
norm = 0.6744897501960817 # scipy.stats.norm.ppf(0.75)
norm_aad = np.sqrt(2 / np.pi)
med = np.median(array)
diff = array - med
mad_left = np.median(np.abs(diff[array <= med])) / scale_mad
mad_right = np.median(np.abs(diff[array >= med])) / scale_mad

mad_left = np.median(np.abs(diff[array <= med])) / norm
mad_right = np.median(np.abs(diff[array >= med])) / norm
if mad_left == 0:
std_left = np.mean(np.abs(diff[array <= med])) / scale_aad
scale_left = np.mean(np.abs(diff[array <= med])) / norm_aad
else:
std_left = mad_left
scale_left = mad_left

if mad_right == 0:
std_right = np.mean(np.abs(diff[array >= med])) / scale_aad
scale_right = np.mean(np.abs(diff[array >= med])) / norm_aad
else:
std_right = mad_right
scale_right = mad_right

return np.where(array < med, scale_left, scale_right)


def _scale_diffcov(array: np.ndarray) -> float:
"""Calculate the Difference Covariance scale of an array.
Parameters
----------
array : np.ndarray
Input array.
Returns
-------
float
The Difference Covariance scale of the array.
"""
diff = np.diff(array)
return np.sqrt(-np.cov(diff[:-1], diff[1:])[0, 1])


def _scale_biweight(array: np.ndarray) -> float:
"""Calculate the Biweight Midvariance scale of an array.
Parameters
----------
array : np.ndarray
Input array.
Returns
-------
float
The Biweight Midvariance scale of the array.
"""
return astrostats.biweight_scale(array)


def _scale_qn(array: np.ndarray) -> float:
"""Calculate the Normalized Qn scale of an array.
Parameters
----------
array : np.ndarray
Input array.
Returns
-------
float
The Normalized Qn scale of the array.
"""
# \np.sqrt(2) * stats.norm.ppf(5/8)
norm = 0.4506241100243562
h = len(array) // 2 + 1
k = h * (h - 1) // 2
diffs = np.abs(array[:, None] - array)
return np.partition(diffs.ravel(), k - 1)[k - 1] / norm


def _scale_sn(array: np.ndarray) -> float:
"""Calculate the Normalized Sn scale of an array.
Parameters
----------
array : np.ndarray
Input array.
Returns
-------
float
The Normalized Sn scale of the array.
"""
diffs = np.abs(array[:, None] - array)
return 1.1926 * np.median(np.median(diffs, axis=1))


def _scale_gapper(array: np.ndarray) -> float:
"""Calculate the Gapper Estimator scale of an array.
Parameters
----------
array : np.ndarray
Input array.
Returns
-------
float
The Gapper Estimator scale of the array.
"""
n = len(array)
gaps = np.diff(np.sort(array))
weights = np.arange(1, n) * np.arange(n - 1, 0, -1)
return np.dot(weights, gaps) * np.sqrt(np.pi) / (n * (n - 1))


def estimate_scale(array: np.ndarray, method: str = "mad") -> float | np.ndarray:
"""Estimate the scale or standard deviation of an array.
Parameters
----------
array : np.ndarray
The array to estimate the scale of.
method : str, optional
The method to use for estimating the scale, by default "mad".
Returns
-------
float | np.ndarray
The estimated scale of the array.
Raises
------
ValueError
If the method is not supported or if the array is empty.
Notes
-----
https://en.wikipedia.org/wiki/Robust_measures_of_scale
Following methods are supported:
- "iqr": Normalized Inter-quartile Range
- "mad": Median Absolute Deviation
- "diffcov": Difference Covariance
- "biweight": Biweight Midvariance
- "qn": Normalized Qn scale
- "sn": Normalized Sn scale
- "gapper": Gapper Estimator
"""
scale_methods: dict[str, Callable[[np.ndarray], float | np.ndarray]] = {
"iqr": _scale_iqr,
"mad": _scale_mad,
"doublemad": _scale_doublemad,
"diffcov": _scale_diffcov,
"biweight": _scale_biweight,
"qn": _scale_qn,
"sn": _scale_sn,
"gapper": _scale_gapper,
}
array = np.asarray(array)
n = len(array)
if n == 0:
msg = "Cannot estimate noise from an empty array."
raise ValueError(msg)

scale_func = scale_methods.get(method)
if scale_func is None:
msg = f"Method {method} is not supported for estimating scale."
raise ValueError(msg)
return scale_func(array)

std_map = np.where(array < med, std_left, std_right)
return np.divide(diff, std_map, out=np.zeros_like(diff), where=std_map != 0)

def zscore(
array: np.ndarray,
loc_method: str = "median",
scale_method: str = "mad",
) -> ZScoreResult:
"""Calculate robust Z-scores of an array.
Parameters
----------
array : np.ndarray
The array to calculate the Z-score of.
loc_method : str, optional
The method to use for estimating the location, by default "median".
scale_method : str, optional
The method to use for estimating the scale, by default "mad".
Returns
-------
ZScoreResult
The robust Z-scores of the array.
Raises
------
ValueError
If the location or scale method is not supported.
"""
loc = estimate_loc(array, loc_method)
scale = estimate_scale(array, scale_method)
diff = array - loc
zscores = np.divide(diff, scale, out=np.zeros_like(diff), where=scale != 0)
return ZScoreResult(zscores=zscores, loc=loc, scale=scale)


class ChannelStats:
163 changes: 92 additions & 71 deletions sigpyproc/foldedcube.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,40 @@
from __future__ import annotations
import numpy as np

import numpy as np
from numpy import typing as npt

from sigpyproc.params import DM_CONSTANT_LK
from sigpyproc.header import Header
from sigpyproc.params import DM_CONSTANT_LK
from sigpyproc.utils import roll_array


class Profile(np.ndarray):
class Profile:
"""An array class to handle a 1-D pulse profile.
Parameters
----------
input_array : :py:obj:`~numpy.typing.ArrayLike`
1-D array of a pulse profile
data : :py:obj:`~numpy.typing.ArrayLike`
1-D pulse profile
Returns
-------
:py:obj:`~numpy.ndarray`
Pulse profile
1-D Pulse profile
"""

def __new__(cls, input_array: npt.ArrayLike) -> Profile:
"""Create a new 1D Pulse profile."""
return np.asarray(input_array).astype(np.float32, copy=False).view(cls)
def __init__(self, data: npt.ArrayLike, tsamp: float) -> None:
self._data = np.asarray(data, dtype=np.float32)
self._tsamp = tsamp

def __array_finalize__(self, obj):
if obj is None:
return
@property
def data(self) -> np.ndarray:
"""The pulse profile data (`numpy.ndarray`, read-only)."""
return self._data

@property
def tsamp(self) -> float:
"""The sampling time of the profile (`float`, read-only)."""
return self._tsamp

def snr(self):
"""Calculate a rudimentary Signal-to-noise ratio for the profile.
@@ -71,8 +77,8 @@ def _get_baseline(self, width):
return np.hstack((self[: pos - wing], self[pos + wing + 1 :]))


class FoldSlice(np.ndarray):
"""An array class to handle a 2-D slice of :class:`~sigpyproc.foldedcube.FoldedData`.
class FoldSlice:
"""An array class to handle a folded 2-D data slice.
Parameters
----------
@@ -86,13 +92,24 @@ class FoldSlice(np.ndarray):
"""

def __new__(cls, input_array: npt.ArrayLike) -> FoldSlice:
"""Create a new FoldSlice array."""
return np.asarray(input_array).astype(np.float32, copy=False).view(cls)
def __init__(self, data: np.ndarray, tsamp: float) -> None:
self._data = np.asarray(data, dtype=np.float32)
self._tsamp = tsamp

def __array_finalize__(self, obj):
if obj is None:
return
@property
def data(self) -> np.ndarray:
"""The data slice (`numpy.ndarray`, read-only)."""
return self._data

@property
def tsamp(self) -> float:
"""The sampling time of the slice (`float`, read-only)."""
return self._tsamp

@property
def nbins(self) -> int:
"""Number of bins in the slice (`int`, read-only)."""
return self.data.shape[1]

def normalize(self) -> FoldSlice:
"""Normalise the slice by dividing each row by its mean.
@@ -102,7 +119,8 @@ def normalize(self) -> FoldSlice:
:class:`~sigpyproc.foldedcube.FoldSlice`
normalised version of slice
"""
return self / self.mean(axis=1).reshape(self.shape[0], 1)
norm_data = self.data / self.data.mean(axis=1).reshape(self.data.shape[0], 1)
return FoldSlice(norm_data, self.tsamp)

def get_profile(self) -> Profile:
"""Return the pulse profile from the slice.
@@ -112,15 +130,15 @@ def get_profile(self) -> Profile:
:class:`~sigpyproc.foldedcube.Profile`
a pulse profile
"""
return self.sum(axis=0).view(Profile)
return Profile(self.data.sum(axis=0), self.tsamp)


class FoldedData(np.ndarray):
"""An array class to handle a data cube produced by any of the folding methods.
class FoldedData:
"""An array class to handle a folded 3-D data cube.
Parameters
----------
input_array : :py:obj:`~numpy.typing.ArrayLike`
data : :py:obj:`~numpy.ndarray`
3-D array of folded data
header : :class:`~sigpyproc.header.Header`
observational metadata
@@ -142,85 +160,85 @@ class FoldedData(np.ndarray):
(number of subintegrations, number of subbands, number of profile bins)
"""

def __new__(
cls,
input_array: npt.ArrayLike,
header: Header,
def __init__(
self,
data: np.ndarray,
hdr: Header,
period: float,
dm: float,
accel: float = 0,
):
"""Construct Folded Data cube."""
obj = np.asarray(input_array).astype(np.float32, copy=False).view(cls)
obj.header = header
obj.period = period
obj.dm = dm
obj.accel = accel
obj._set_defaults()
return obj

def __array_finalize__(self, obj):
if obj is None:
return
self.header = getattr(obj, "header", None)
self.period = getattr(obj, "period", None)
self.dm = getattr(obj, "dm", None)
self.accel = getattr(obj, "accel", None)
) -> None:
self._data = np.asarray(data, dtype=np.float32)
self._hdr = hdr
self._period = period
self._dm = dm
self._accel = accel
self._check_input()

@property
def data(self) -> np.ndarray:
"""The folded data cube (`numpy.ndarray`, read-only)."""
return self._data

@property
def header(self) -> Header:
"""The observational metadata (`sigpyproc.header.Header`, read-only)."""
return self._hdr

@property
def nints(self) -> int:
def nsubints(self) -> int:
"""Number of subintegrations in the data cube(`int`, read-only)."""
return self.shape[0]
return self.data.shape[0]

@property
def nbands(self) -> int:
def nsubbands(self) -> int:
"""Number of subbands in the data cube (`int`, read-only)."""
return self.shape[1]
return self.data.shape[1]

@property
def nbins(self) -> int:
"""Number of bins in the data cube(`int`, read-only)."""
return self.shape[2]
return self.data.shape[2]

def get_subint(self, nint: int) -> FoldSlice:
"""Return a single subintegration from the data cube.
def get_subint(self, nsubint: int) -> FoldSlice:
"""Get a single subintegration from the data cube.
Parameters
----------
nint : int
subintegration number (n=0 is first subintegration
nsubint : int
subintegration number (n=0 is first subintegration)
Returns
-------
:class:`~sigpyproc.foldedcube.FoldSlice`
a 2-D array containing the subintegration
"""
return self[nint].view(FoldSlice)
return FoldSlice(self.data[nsubint], self.header.tsamp)

def get_subband(self, nint: int) -> FoldSlice:
"""Return a single subband from the data cube.
def get_subband(self, nsubband: int) -> FoldSlice:
"""Get a single subband from the data cube.
Parameters
----------
nint : int
nsubband : int
subband number (n=0 is first subband)
Returns
-------
:class:`~sigpyproc.foldedcube.FoldSlice`
a 2-D array containing the subband
"""
return self[:, nint].view(FoldSlice)
return FoldSlice(self.data[:, nsubband], self.header.tsamp)

def get_profile(self) -> Profile:
"""Return a the data cube summed in time and frequency.
"""Get the summed pulse profile from the data cube.
Returns
-------
:class:`~sigpyproc.foldedcube.Profile`
a 1-D array containing the power as a function of phase (pulse profile)
a 1-D array containing the power as a function of phase
"""
return self.sum(axis=0).sum(axis=0).view(Profile)
return Profile(self.data.sum(axis=0).sum(axis=0), self.header.tsamp)

def get_time_phase(self) -> FoldSlice:
"""Return the data cube collapsed in frequency.
@@ -230,7 +248,7 @@ def get_time_phase(self) -> FoldSlice:
:class:`~sigpyproc.foldedcube.FoldSlice`
a 2-D array containing the time vs. phase plane
"""
return self.sum(axis=1).view(FoldSlice)
return FoldSlice(self.data.sum(axis=1), self.header.tsamp)

def get_freq_phase(self) -> FoldSlice:
"""Return the data cube collapsed in time.
@@ -240,7 +258,7 @@ def get_freq_phase(self) -> FoldSlice:
:class:`~sigpyproc.foldedcube.FoldSlice`
a 2-D array containing the frequency vs. phase plane
"""
return self.sum(axis=0).view(FoldSlice)
return FoldSlice(self.data.sum(axis=0), self.header.tsamp)

def centre(self) -> FoldedData:
"""Roll the data cube to center the pulse."""
@@ -253,13 +271,13 @@ def replace_nan(self):
good_ids = np.where(np.isfinite(self))
self[bad_ids] = np.median(self[good_ids])

def update_dm(self, dm: float) -> None:
"""Install a new DM in the data cube.
def dedisperse(self, dm: float) -> None:
"""Rotate the data cube to remove dispersion delay between subbands.
Parameters
----------
dm : float
the new DM to dedisperse to
New DM to dedisperse to
"""
dmdelays = self._get_dmdelays(dm)
for iint in range(self.nints):
@@ -300,8 +318,8 @@ def _get_dmdelays(self, newdm: float) -> np.ndarray:
drifts = (
delta_dm
* DM_CONSTANT_LK
* ((freqs ** -2) - (self.header.fch1 ** -2))
/ ((self.period / self.nbins))
* ((freqs**-2) - (self.header.fch1**-2))
/ (self.period / self.nbins)
)
drifts = drifts.round().astype("int32")
bin_drifts = drifts - self._fph_shifts
@@ -310,7 +328,10 @@ def _get_dmdelays(self, newdm: float) -> np.ndarray:

def _get_pdelays(self, newperiod: float) -> np.ndarray:
dbins = (
(newperiod / self._period - 1) * self.header.tobs * self.nbins / self._period
(newperiod / self._period - 1)
* self.header.tobs
* self.nbins
/ self._period
)
if dbins == 0:
drifts = -1 * self._tph_shifts
22 changes: 4 additions & 18 deletions sigpyproc/fourierseries.py
Original file line number Diff line number Diff line change
@@ -26,8 +26,6 @@ class PowerSpectrum:
1 dimensional power spectrum
header : :class:`~sigpyproc.header.Header`
header object containing metadata
copy : bool, optional
make a copy of the data, by default False
Returns
-------
@@ -39,13 +37,8 @@ class PowerSpectrum:
Data is converted to 32 bits regardless of original type.
"""

def __init__(
self,
data: npt.ArrayLike,
hdr: Header,
copy: bool | None = None,
) -> None:
self._data = np.asarray(data, dtype=np.float32, copy=copy)
def __init__(self, data: npt.ArrayLike, hdr: Header) -> None:
self._data = np.asarray(data, dtype=np.float32)
self._hdr = hdr
self._check_input()

@@ -179,8 +172,6 @@ class FourierSeries:
1 dimensional fourier series
header : :class:`~sigpyproc.header.Header`
header object containing metadata
copy : bool, optional
make a copy of the data, by default None
Returns
-------
@@ -193,13 +184,8 @@ class FourierSeries:
"""

def __init__(
self,
data: npt.ArrayLike,
hdr: Header,
copy: bool | None = None,
) -> None:
self._data = np.asarray(data, dtype=np.complex64, copy=copy)
def __init__(self, data: npt.ArrayLike, hdr: Header) -> None:
self._data = np.asarray(data, dtype=np.complex64)
self._hdr = hdr
self._check_input()

17 changes: 3 additions & 14 deletions sigpyproc/timeseries.py
Original file line number Diff line number Diff line change
@@ -21,29 +21,18 @@ class TimeSeries:
Parameters
----------
data : :py:obj:`~numpy.typing.ArrayLike`
1 dimensional time series
1-D time series
header : :class:`~sigpyproc.header.Header`
header object containing metadata
copy : bool, optional
make a copy of the data, by default None
Returns
-------
:py:obj:`~numpy.ndarray`
1 dimensional time series array with header metadata
Notes
-----
Data is converted to 32-bit floats regardless of original type.
"""

def __init__(
self,
data: npt.ArrayLike,
hdr: Header,
copy: bool | None = None,
) -> None:
self._data = np.asarray(data, dtype=np.float32, copy=copy)
def __init__(self, data: npt.ArrayLike, hdr: Header) -> None:
self._data = np.asarray(data, dtype=np.float32)
self._hdr = hdr
self._check_input()