From b14b44f81f2bd80ce7f0eddbdcfa7a0428023f6b Mon Sep 17 00:00:00 2001 From: Joep Vanlier Date: Wed, 30 Oct 2024 10:40:59 +0100 Subject: [PATCH] focal: improve uniformity API --- lumicks/pylake/calibration.py | 64 +++++++++---------- .../force_calibration/calibration_item.py | 38 +++-------- .../force_calibration/calibration_results.py | 13 ++++ .../detail/calibration_properties.py | 31 +++++++++ .../tests/test_calibration_item.py | 13 ++-- .../tests/test_power_spectrum_calibration.py | 7 ++ .../pylake/piezo_tracking/piezo_tracking.py | 2 +- .../pylake/piezo_tracking/tests/conftest.py | 18 +++--- .../tests/test_channels/test_arithmetic.py | 6 +- .../tests/test_channels/test_channels.py | 44 ++++++------- 10 files changed, 132 insertions(+), 104 deletions(-) diff --git a/lumicks/pylake/calibration.py b/lumicks/pylake/calibration.py index e82829af3..62247da2e 100644 --- a/lumicks/pylake/calibration.py +++ b/lumicks/pylake/calibration.py @@ -5,13 +5,13 @@ from lumicks.pylake.force_calibration.calibration_item import ForceCalibrationItem -def _filter_calibration(time_field, items, start, stop): +def _filter_calibration(items, start, stop): """filter calibration data based on time stamp range [ns]""" if len(items) == 0: return [] def timestamp(x): - return x[time_field] + return x.stop if x.stop else x.applied_at # Pylake items do not have a start and stop (yet) items = sorted(items, key=timestamp) @@ -38,25 +38,22 @@ class ForceCalibrationList: calibration = f.force1x.calibration[1] # Grab a calibration item for force 1x """ - def __init__(self, time_field, items, slice_start=None, slice_stop=None): + def __init__(self, items, slice_start=None, slice_stop=None): """Calibration item Parameters ---------- - time_field : string - name of the field used for time items : list[ForceCalibrationItem] list of force calibration items slice_start, slice_stop : int Start and stop index of the slice associated with these items """ - self._time_field = time_field self._src = items self._slice_start = slice_start self._slice_stop = slice_stop def _with_src(self, _src): - return ForceCalibrationList(self._time_field, _src) + return ForceCalibrationList(_src) def __getitem__(self, item): if isinstance(item, slice): @@ -86,14 +83,13 @@ def filter_calibration(self, start, stop): stop : int time stamp at stop [ns]""" return ForceCalibrationList( - self._time_field, - _filter_calibration(self._time_field, self._src, start, stop), + _filter_calibration(self._src, start, stop), start, stop, ) @staticmethod - def from_field(hdf5, force_channel, time_field="Stop time (ns)") -> "ForceCalibrationList": + def from_field(hdf5, force_channel) -> "ForceCalibrationList": """Fetch force calibration data from the HDF5 file Parameters @@ -102,23 +98,25 @@ def from_field(hdf5, force_channel, time_field="Stop time (ns)") -> "ForceCalibr A Bluelake HDF5 file. force_channel : str Calibration field to access (e.g. "Force 1x"). - time_field : str - Attribute which holds the timestamp of the item (e.g. "Stop time (ns)"). """ if "Calibration" not in hdf5.keys(): - return ForceCalibrationList(time_field=time_field, items=[]) + return ForceCalibrationList(items=[]) - items = [] - for calibration_item in hdf5["Calibration"].values(): - if force_channel in calibration_item: - attrs = dict(calibration_item[force_channel].attrs) - if time_field in attrs.keys(): - # Copy the timestamp at which the calibration was applied into the item - attrs["Timestamp (ns)"] = calibration_item.attrs.get("Timestamp (ns)") - items.append(ForceCalibrationItem(attrs)) + return ForceCalibrationList._from_items( + items=[ + ForceCalibrationItem( + dict(calibration_item[force_channel].attrs) + | {"Timestamp (ns)": calibration_item.attrs.get("Timestamp (ns)")} + ) + for calibration_item in hdf5["Calibration"].values() + if force_channel in calibration_item + ] + ) - return ForceCalibrationList(time_field=time_field, items=items) + @staticmethod + def _from_items(items: list[ForceCalibrationItem]): + return ForceCalibrationList(items=items) def _print_summary(self, tablefmt): def format_timestamp(timestamp): @@ -139,11 +137,15 @@ def format_timestamp(timestamp): ), item.hydrodynamically_correct, item.distance_to_surface is not None, - bool( - self._slice_start - and (item.start >= self._slice_start) - and self._slice_stop - and (item.stop <= self._slice_stop) + ( + bool( + self._slice_start + and (item.start >= self._slice_start) + and self._slice_stop + and (item.stop <= self._slice_stop) + ) + if item.start and item.stop + else False ), ) for idx, item in enumerate(self._src) @@ -169,7 +171,7 @@ def __str__(self): return self._print_summary(tablefmt="text") @staticmethod - def from_dataset(hdf5, n, xy, time_field="Stop time (ns)") -> "ForceCalibrationList": + def from_dataset(hdf5, n, xy) -> "ForceCalibrationList": """Fetch the force calibration data from the HDF5 file Parameters @@ -180,14 +182,10 @@ def from_dataset(hdf5, n, xy, time_field="Stop time (ns)") -> "ForceCalibrationL Trap index. xy : str Force axis (e.g. "x"). - time_field : str - Attribute which holds the timestamp of the item (e.g. "Stop time (ns)"). """ if xy: - return ForceCalibrationList.from_field( - hdf5, force_channel=f"Force {n}{xy}", time_field=time_field - ) + return ForceCalibrationList.from_field(hdf5, force_channel=f"Force {n}{xy}") else: raise NotImplementedError( "Calibration is currently only implemented for single axis data" diff --git a/lumicks/pylake/force_calibration/calibration_item.py b/lumicks/pylake/force_calibration/calibration_item.py index 33802ef56..7313f99c6 100644 --- a/lumicks/pylake/force_calibration/calibration_item.py +++ b/lumicks/pylake/force_calibration/calibration_item.py @@ -1,4 +1,5 @@ import re +import copy from functools import wraps from collections import UserDict @@ -28,11 +29,6 @@ def _get_parameter(self, _, bluelake_key): if bluelake_key in self: return self[bluelake_key] - @property - def applied_at(self): - """Time the calibration was applied in nanoseconds since epoch""" - return self.data.get("Timestamp (ns)") - @property def _fitted_diode(self): """Diode parameters were fitted""" @@ -225,6 +221,13 @@ def _fit_range(self): self.data["Fit range (max.) (Hz)"], ) + def _with_timestamp(self, applied_timestamp): + """Return a copy of this item with a timestamp of when it was applied""" + item = copy.copy(self) + item.data = copy.deepcopy(self.data) + item.data["Timestamp (ns)"] = applied_timestamp + return item + @property def sample_rate(self): """Returns the data sample rate""" @@ -241,28 +244,3 @@ def num_points_per_block(self): """Number of points per block used for spectral down-sampling""" if "Points per block" in self.data: return int(self.data["Points per block"]) # BL returns float which API doesn't accept - - @property - def start(self): - """Starting timestamp of this calibration - - Examples - -------- - :: - - import lumicks.pylake as lk - - f = lk.File("file.h5") - item = f.force1x.calibration[1] # Grab a calibration item for force 1x - - # Slice the data corresponding to this item - calibration_data = f.force1x[item.start : item.stop] - - # or alternatively: - calibration_data = f.force1x[item] - """ - return self.data.get("Start time (ns)") - - @property - def stop(self): - return self.data.get("Stop time (ns)") diff --git a/lumicks/pylake/force_calibration/calibration_results.py b/lumicks/pylake/force_calibration/calibration_results.py index 7af2763ca..c6e833d0f 100644 --- a/lumicks/pylake/force_calibration/calibration_results.py +++ b/lumicks/pylake/force_calibration/calibration_results.py @@ -1,3 +1,5 @@ +import copy + import numpy as np from lumicks.pylake.force_calibration.detail.calibration_properties import ( @@ -48,6 +50,17 @@ def __call__(self, frequency): """ return self.model(frequency, *self.fitted_params) + def _with_timestamp(self, applied_timestamp): + """Return a copy of this item with a timestamp of when it was applied""" + from lumicks.pylake.force_calibration.power_spectrum_calibration import CalibrationParameter + + item = copy.copy(self) + item.params = copy.deepcopy(self.params) + item.params["Timestamp"] = CalibrationParameter( + "Timestamp when item was applied", applied_timestamp, "nanoseconds" + ) + return item + def __contains__(self, key): return key in self.params or key in self.results diff --git a/lumicks/pylake/force_calibration/detail/calibration_properties.py b/lumicks/pylake/force_calibration/detail/calibration_properties.py index 69442659d..13a534cb5 100644 --- a/lumicks/pylake/force_calibration/detail/calibration_properties.py +++ b/lumicks/pylake/force_calibration/detail/calibration_properties.py @@ -581,3 +581,34 @@ def kind(self): return "Active" if self.active_calibration else "Passive" else: return kind if kind is not None else "Unknown" + + @property + def applied_at(self): + """Time the calibration was applied in nanoseconds since epoch""" + return self._get_parameter("Timestamp", "Timestamp (ns)") + + @property + def start(self): + """Starting timestamp of this calibration + + Examples + -------- + :: + + import lumicks.pylake as lk + + f = lk.File("file.h5") + item = f.force1x.calibration[1] # Grab a calibration item for force 1x + + # Slice the data corresponding to this item + calibration_data = f.force1x[item.start : item.stop] + + # or alternatively: + calibration_data = f.force1x[item] + """ + return self._get_parameter("Start time", "Start time (ns)") + + @property + def stop(self): + """Stop time stored in the calibration item""" + return self._get_parameter("Stop time", "Stop time (ns)") diff --git a/lumicks/pylake/force_calibration/tests/test_calibration_item.py b/lumicks/pylake/force_calibration/tests/test_calibration_item.py index 58cb29d20..f6dd20358 100644 --- a/lumicks/pylake/force_calibration/tests/test_calibration_item.py +++ b/lumicks/pylake/force_calibration/tests/test_calibration_item.py @@ -191,6 +191,9 @@ def test_passive_item(compare_to_reference_dict, reference_data, calibration_dat test_name="text_representations_passive", ) + new_item = item._with_timestamp(1696171386701856701) + assert new_item.applied_at == 1696171386701856701 + def test_active_item_fixed_diode(compare_to_reference_dict, calibration_data): item = ForceCalibrationItem(ref_active) @@ -310,12 +313,10 @@ def create_item(t_start, t_stop, **kwargs): create_item(11, 12), create_item(15, 25), ] - items = ForceCalibrationList("Stop time (ns)", fcs) - same_items = ForceCalibrationList("Stop time (ns)", fcs2) - different_item = ForceCalibrationList( - "Stop time (ns)", fcs2[:-1] + [create_item(15, 25, extra=5)] - ) - shorter_list = ForceCalibrationList("Stop time (ns)", fcs[:-1]) + items = ForceCalibrationList(fcs) + same_items = ForceCalibrationList(fcs2) + different_item = ForceCalibrationList(fcs2[:-1] + [create_item(15, 25, extra=5)]) + shorter_list = ForceCalibrationList(fcs[:-1]) assert len(items) == 3 assert items == same_items diff --git a/lumicks/pylake/force_calibration/tests/test_power_spectrum_calibration.py b/lumicks/pylake/force_calibration/tests/test_power_spectrum_calibration.py index 936bc2bdb..65869bd9e 100644 --- a/lumicks/pylake/force_calibration/tests/test_power_spectrum_calibration.py +++ b/lumicks/pylake/force_calibration/tests/test_power_spectrum_calibration.py @@ -224,6 +224,13 @@ def test_bad_fit(reference_calibration_result): assert ps_calibration["backing"].value > bad_calibration["backing"].value +def test_applied_at(reference_calibration_result): + calibration = reference_calibration_result[0] + assert calibration.applied_at is None + new_calibration = calibration._with_timestamp(1696171386701856701) + assert new_calibration.applied_at == 1696171386701856701 + + def test_actual_spectrum(reference_calibration_result): ps_calibration, model, reference_spectrum = reference_calibration_result diff --git a/lumicks/pylake/piezo_tracking/piezo_tracking.py b/lumicks/pylake/piezo_tracking/piezo_tracking.py index 72e5758cd..078a03eef 100644 --- a/lumicks/pylake/piezo_tracking/piezo_tracking.py +++ b/lumicks/pylake/piezo_tracking/piezo_tracking.py @@ -141,7 +141,7 @@ def piezo_track(self, trap_position, force1, force2, downsampling_factor=None): trap_trap_dist = self.trap_calibration(trap_position) bead_displacements = 1e-3 * sum( - sign * force / force.calibration[0]["kappa (pN/nm)"] + sign * force / force.calibration[0].stiffness for force, sign in zip((force1, force2), self._signs) ) diff --git a/lumicks/pylake/piezo_tracking/tests/conftest.py b/lumicks/pylake/piezo_tracking/tests/conftest.py index 950fd201e..2bef7ac05 100644 --- a/lumicks/pylake/piezo_tracking/tests/conftest.py +++ b/lumicks/pylake/piezo_tracking/tests/conftest.py @@ -2,7 +2,7 @@ import pytest from lumicks.pylake.channel import Slice, Continuous, TimeSeries -from lumicks.pylake.calibration import ForceCalibrationList +from lumicks.pylake.calibration import ForceCalibrationItem, ForceCalibrationList from lumicks.pylake.fitting.models import ewlc_odijk_force @@ -77,13 +77,13 @@ def piezo_tracking_test_data(poly_baseline_data, camera_calibration_data): If we assume that the baseline force leads to a real displacement, then our function for the trap position becomes implicit, since the displacement depends on the baseline which in turn depends on the trap position: - + trap_trap_distance = tether_length + 2 * bead_radius + 2 * displacement_um - + And displacement_um is given by (wlc_force + baseline(trap_position)) / stiffness So we solve the following to obtain the trap position: - + displacement = 2 * (wlc_force + baseline(trap_position)) / stiffness 0 = tether_length + 2 * bead_radius + displacement - (trap_position - trap2_ref) """ @@ -95,8 +95,10 @@ def piezo_tracking_test_data(poly_baseline_data, camera_calibration_data): def implicit_trap_position_equation(x): trap_trap_dist = x - trap2_ref - displacement = 2 * (force + baseline(x)) / stiffness_um - return (tether_dist + 2 * bead_radius + displacement - trap_trap_dist) ** 2 + displacement = 2 * (force + baseline(x)) / stiffness_um # noqa: B023 + return ( + tether_dist + 2 * bead_radius + displacement - trap_trap_dist # noqa: B023 + ) ** 2 trap_position.append(minimize_scalar(implicit_trap_position_equation, [12.95, 13.35]).x) @@ -105,8 +107,8 @@ def implicit_trap_position_equation(x): # Add our baseline force (assumption is that the baseline force leads to a real displacement) force_pn = wlc_force + baseline(trap_position.data) - calibration = ForceCalibrationList( - "Stop time (ns)", [{"Stop time (ns)": 1, "kappa (pN/nm)": stiffness}] + calibration = ForceCalibrationList._from_items( + [ForceCalibrationItem({"Stop time (ns)": 1, "kappa (pN/nm)": stiffness})] ) force_1x = Slice(Continuous(force_pn, 0, dt), calibration=calibration) diff --git a/lumicks/pylake/tests/test_channels/test_arithmetic.py b/lumicks/pylake/tests/test_channels/test_arithmetic.py index 95d1c8b23..0dbec37de 100644 --- a/lumicks/pylake/tests/test_channels/test_arithmetic.py +++ b/lumicks/pylake/tests/test_channels/test_arithmetic.py @@ -4,12 +4,12 @@ import pytest from lumicks.pylake.channel import Slice, TimeTags, Continuous, TimeSeries -from lumicks.pylake.calibration import ForceCalibrationList +from lumicks.pylake.calibration import ForceCalibrationItem, ForceCalibrationList from lumicks.pylake.detail.value import ValueMixin start = 1 + int(1e18) -calibration = ForceCalibrationList( - "Stop time (ns)", [{"Stop time (ns)": start, "kappa (pN/nm)": 0.45}] +calibration = ForceCalibrationList._from_items( + [ForceCalibrationItem({"Stop time (ns)": start, "kappa (pN/nm)": 0.45})] ) time_series = np.array([1, 2, 3, 4, 5], dtype=np.int64) + int(1e18) slice_continuous_1 = Slice(Continuous([1, 2, 3, 4, 5], start=start, dt=1), calibration=calibration) diff --git a/lumicks/pylake/tests/test_channels/test_channels.py b/lumicks/pylake/tests/test_channels/test_channels.py index 349d941cc..782c03a1d 100644 --- a/lumicks/pylake/tests/test_channels/test_channels.py +++ b/lumicks/pylake/tests/test_channels/test_channels.py @@ -9,7 +9,7 @@ from lumicks.pylake import channel from lumicks.pylake.low_level import make_continuous_slice -from lumicks.pylake.calibration import ForceCalibrationList +from lumicks.pylake.calibration import ForceCalibrationItem, ForceCalibrationList def with_offset(t, start_time=1592916040906356300): @@ -18,16 +18,15 @@ def with_offset(t, start_time=1592916040906356300): def test_calibration_timeseries_channels(): time_field = "Stop time (ns)" - mock_calibration = ForceCalibrationList( - time_field=time_field, + mock_calibration = ForceCalibrationList._from_items( items=[ - {"Calibration Data": 50, time_field: 50}, - {"Calibration Data": 20, time_field: 20}, - {"Calibration Data": 30, time_field: 30}, - {"Calibration Data": 40, time_field: 40}, - {"Calibration Data": 80, time_field: 80}, - {"Calibration Data": 90, time_field: 90}, - {"Calibration Data": 120, time_field: 120}, + ForceCalibrationItem({"Calibration Data": 50, time_field: 50}), + ForceCalibrationItem({"Calibration Data": 20, time_field: 20}), + ForceCalibrationItem({"Calibration Data": 30, time_field: 30}), + ForceCalibrationItem({"Calibration Data": 40, time_field: 40}), + ForceCalibrationItem({"Calibration Data": 80, time_field: 80}), + ForceCalibrationItem({"Calibration Data": 90, time_field: 90}), + ForceCalibrationItem({"Calibration Data": 120, time_field: 120}), ], ) @@ -72,16 +71,15 @@ def test_calibration_timeseries_channels(): def test_calibration_continuous_channels(): time_field = "Stop time (ns)" - mock_calibration = ForceCalibrationList( - time_field=time_field, + mock_calibration = ForceCalibrationList._from_items( items=[ - {"Calibration Data": 50, time_field: 50}, - {"Calibration Data": 20, time_field: 20}, - {"Calibration Data": 30, time_field: 30}, - {"Calibration Data": 40, time_field: 40}, - {"Calibration Data": 80, time_field: 80}, - {"Calibration Data": 90, time_field: 90}, - {"Calibration Data": 120, time_field: 120}, + ForceCalibrationItem({"Calibration Data": 50, time_field: 50}), + ForceCalibrationItem({"Calibration Data": 20, time_field: 20}), + ForceCalibrationItem({"Calibration Data": 30, time_field: 30}), + ForceCalibrationItem({"Calibration Data": 40, time_field: 40}), + ForceCalibrationItem({"Calibration Data": 80, time_field: 80}), + ForceCalibrationItem({"Calibration Data": 90, time_field: 90}), + ForceCalibrationItem({"Calibration Data": 120, time_field: 120}), ], ) @@ -263,8 +261,8 @@ def test_timeseries_indexing(): def test_timeseries_mask(): """Test masking operation""" - calibration = ForceCalibrationList( - "Stop time (ns)", [{"Stop time (ns)": 1, "kappa (pN/nm)": 0.45}] + calibration = ForceCalibrationList._from_items( + [ForceCalibrationItem({"Stop time (ns)": 1, "kappa (pN/nm)": 0.45})] ) s = channel.Slice( channel.TimeSeries([14, 15, 16, 17], [4, 5, 6, 7]), @@ -341,8 +339,8 @@ def test_continuous_indexing(): def test_continuous_mask(): """Test masking operation""" - calibration = ForceCalibrationList( - "Stop time (ns)", [{"Stop time (ns)": 1, "kappa (pN/nm)": 0.45}] + calibration = ForceCalibrationList._from_items( + [ForceCalibrationItem({"Stop time (ns)": 1, "kappa (pN/nm)": 0.45})] ) s = channel.Slice( channel.Continuous([14, 15, 16, 17], 4, 1),