From c1d4f6548a9463deaad1457008f8fc74846afbf2 Mon Sep 17 00:00:00 2001 From: Joep Vanlier Date: Wed, 30 Oct 2024 10:42:22 +0100 Subject: [PATCH] channel: support recalibrating the force --- changelog.md | 2 +- .../force_calibration/calibration_items.rst | 6 +- lumicks/pylake/channel.py | 58 ++++++++++++ lumicks/pylake/conftest.py | 22 +++++ .../force_calibration/calibration_item.py | 3 +- .../tests/test_channels/test_channels.py | 88 +++++++++++++++++++ lumicks/pylake/tests/test_file/test_file.py | 19 +--- 7 files changed, 174 insertions(+), 24 deletions(-) diff --git a/changelog.md b/changelog.md index 02c44e948..5ccba478e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,6 @@ # Changelog -## v1.6.1 | t.b.d. +## v1.7.0 | t.b.d. #### New features diff --git a/docs/tutorial/force_calibration/calibration_items.rst b/docs/tutorial/force_calibration/calibration_items.rst index cc5b00868..2e5ed54ea 100644 --- a/docs/tutorial/force_calibration/calibration_items.rst +++ b/docs/tutorial/force_calibration/calibration_items.rst @@ -160,11 +160,9 @@ We can also see that the residual now should less systematic deviation:: .. image:: figures/residual_better.png -Now that we have our new calibration item, we can recalibrate a slice of force data. -To do so, take the slice and multiply it by the ratio of the old and new calibration factors:: +Now that we have our new calibration item, we can recalibrate a slice of force data:: - correction_factor = recalibrated_hyco.force_sensitivity / old_calibration.force_sensitivity - recalibrated_force1x = force1x_slice * correction_factor + recalibrated_force1x = force1x_slice.recalibrate_force(recalibrated_hyco) plt.figure() force1x_slice.plot() diff --git a/lumicks/pylake/channel.py b/lumicks/pylake/channel.py index 6fa17cd12..d5287a348 100644 --- a/lumicks/pylake/channel.py +++ b/lumicks/pylake/channel.py @@ -239,6 +239,64 @@ def calibration(self) -> list: else: return [] + def recalibrate_force(self, calibration): + """Recalibrate the data in this slice using a new force calibration item + + Parameters + ---------- + calibration : ForceCalibrationItem + The calibration to apply to the data + + Examples + -------- + :: + + import lumicks.pylake as lk + + f = lk.File("passive_calibration.h5") + calibration = f.force1x.calibration[1] # Grab a calibration item for force 1x + + # Slice the data corresponding to the calibration we want to reproduce. + calib_slice = f.force1x[calibration] + + # De-calibrate to volts using the calibration that was active before this slice. + previous_calibration = calib_slice.calibration[0] + calib_slice = calib_slice / previous_calibration.force_sensitivity + + calibration_params = previous_calibration.calibration_params() + new_calibration = lk.calibrate_force(calib_slice.data, **calibration_params) + new_calibration.plot() + + # Make a new calibration, but change the amount of blocking + less_blocking_params = calibration_params | {"num_points_per_block": 200} + less_blocking = lk.calibrate_force(calib_slice.data, **less_blocking_params) + less_blocking.plot() + + # Recalibrate the force channels + recalibrated_force1x = f.force1x.recalibrate_force(less_blocking) + """ + from lumicks.pylake.calibration import ForceCalibrationList + + if not self.calibration: + raise RuntimeError( + "Slice does not contain any calibration items. Is this an unprocessed force " + "channel?" + ) + + # Grab sensitivities. If for whatever reason, we don't know part of the calibration, we + # can't reasonably recalibrate, so we return NaNs for those time points. + force_sensitivities = np.full(len(self.timestamps), np.nan) + for c in self.calibration: + force_sensitivities[self.timestamps >= c.applied_at] = c.force_sensitivity + + return Slice( + self._src._with_data( + self._unpack_other(self) * calibration.force_sensitivity / force_sensitivities + ), + labels=self.labels, + calibration=ForceCalibrationList(items=[calibration._with_timestamp(self.start)]), + ) + @property def sample_rate(self) -> Union[float, None]: """The data frequency for `Continuous` and `TimeSeries` data sources or `None` if it is not diff --git a/lumicks/pylake/conftest.py b/lumicks/pylake/conftest.py index d0e57ec6e..d3746d83a 100644 --- a/lumicks/pylake/conftest.py +++ b/lumicks/pylake/conftest.py @@ -2,6 +2,7 @@ import hashlib import warnings import importlib +import contextlib import numpy as np import pytest @@ -70,6 +71,27 @@ def report(): return reporter +@pytest.fixture() +def mock_datetime(monkeypatch): + class FakeDateTime: + @classmethod + def fromtimestamp(cls, timestamp, *args, **kwargs): + """Inject a timezone""" + return FakeDateTime() + + def strftime(self, str_format): + return str_format + + @contextlib.contextmanager + def fake_datetime(function_location): + # Representation of time is timezone and locale dependent, hence we monkeypatch it + with monkeypatch.context() as m: + m.setattr(function_location, FakeDateTime) + yield m + + return fake_datetime + + @pytest.fixture(autouse=True) def configure_warnings(): # importing scipy submodules on some version of Python diff --git a/lumicks/pylake/force_calibration/calibration_item.py b/lumicks/pylake/force_calibration/calibration_item.py index 7313f99c6..80ca8374c 100644 --- a/lumicks/pylake/force_calibration/calibration_item.py +++ b/lumicks/pylake/force_calibration/calibration_item.py @@ -187,8 +187,7 @@ def calibration_params(self): less_blocking.plot() # Recalibrate the force channels - rf_ratio = less_blocking.force_sensitivity / previous_calibration.force_sensitivity - recalibrated_force1x = f.force1x * rf_ratio + recalibrated_force1x = f.force1x.recalibrate_force(less_blocking) """ return ( self.power_spectrum_params() diff --git a/lumicks/pylake/tests/test_channels/test_channels.py b/lumicks/pylake/tests/test_channels/test_channels.py index 782c03a1d..8635fb5fd 100644 --- a/lumicks/pylake/tests/test_channels/test_channels.py +++ b/lumicks/pylake/tests/test_channels/test_channels.py @@ -1,4 +1,5 @@ import re +import textwrap from collections import namedtuple from dataclasses import dataclass @@ -10,6 +11,7 @@ from lumicks.pylake import channel from lumicks.pylake.low_level import make_continuous_slice from lumicks.pylake.calibration import ForceCalibrationItem, ForceCalibrationList +from lumicks.pylake.force_calibration import power_spectrum_calibration as psc def with_offset(t, start_time=1592916040906356300): @@ -1006,3 +1008,89 @@ def test_low_level_construction(): slc = make_continuous_slice(data, start, int(1e9 / 78125), name="hi", y_label="there") assert slc.labels["title"] == "hi" assert slc.labels["y"] == "there" + + +def test_recalibrate_force_wrong_number_of_calibrations(): + cc = channel.Slice( + channel.Continuous(np.arange(100), int(with_offset(40)), 10), + calibration=ForceCalibrationList([]), + ) + with pytest.raises(RuntimeError, match="Slice does not contain any calibration items"): + cc.recalibrate_force(None) + + +def make_calibration_item(response, applied_at): + return ForceCalibrationItem( + { + "Calibration Data": 50, + "Stop time (ns)": with_offset(50), + "Timestamp (ns)": with_offset(applied_at), + "Response (pN/V)": response, + } + ) + + +def make_calibration_result(calibration_factor): + return psc.CalibrationResults( + model=None, + ps_model=None, + ps_data=None, + params={}, + results={ + "Rf": psc.CalibrationParameter("Rf", calibration_factor, "val"), + "kappa": psc.CalibrationParameter("kappa", 5, "val"), + }, + fitted_params=[], + ) + + +def test_recalibrate_force(mock_datetime): + slc = channel.Slice( + channel.Continuous(np.arange(100), with_offset(40), 10), + calibration=ForceCalibrationList._from_items([make_calibration_item(10, 40)]), + ) + + slc_recalibrated = slc.recalibrate_force(make_calibration_result(5)) + np.testing.assert_allclose(slc.data, np.arange(100)) + np.testing.assert_allclose(slc_recalibrated.data, np.arange(100) * 0.5) + assert slc.calibration[0].force_sensitivity == 10 + assert slc_recalibrated.calibration[0].force_sensitivity == 5 + + with mock_datetime("lumicks.pylake.calibration.datetime.datetime"): + assert str(slc_recalibrated.calibration) == textwrap.dedent( + """\ + # Applied at Kind Stiffness (pN/nm) Force sens. (pN/V) Disp. sens. (µm/V) Hydro Surface Data? + --- ------------ ------- ------------------- -------------------- -------------------- ------- --------- ------- + 0 %x %X Unknown 5 5 N/A False False False""" + ) + slc_recalibrated.calibration._repr_html_() + + slc_recalibrate_twice = slc.recalibrate_force(make_calibration_result(10)) + np.testing.assert_allclose(slc_recalibrated.data, np.arange(100) * 0.5) + np.testing.assert_allclose(slc_recalibrate_twice.data, np.arange(100)) + assert slc_recalibrated.calibration[0].force_sensitivity == 5 + assert slc_recalibrate_twice.calibration[0].force_sensitivity == 10 + + +@pytest.mark.parametrize( + "items, ref_forces", + [ + (((10, 40), (20, 60), (40, 80)), [10, 10, 5, 5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5]), + (((10, 40), (40, 60), (40, 80)), [10, 10, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5]), + (((10, 40), (20, 60)), [10, 10, 5, 5, 5, 5, 5, 5, 5, 5]), + (((10, 40), (20, 61)), [10, 10, 10, 5, 5, 5, 5, 5, 5, 5]), + (((10, 40), (20, 50000)), [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]), + # If there is no calibration at the start, we cannot meaningfully decalibrate, so we + # emit np.nan + ([(20, 60)], [np.nan, np.nan, 5, 5, 5, 5, 5, 5, 5, 5]), + ], +) +def test_recalibrate_force_multi(items, ref_forces): + slc = channel.Slice( + channel.Continuous(np.full(10, 10), with_offset(40), 10), + calibration=ForceCalibrationList._from_items( + [make_calibration_item(response, applied_at) for response, applied_at in items] + ), + ) + slc_recal = slc.recalibrate_force(make_calibration_result(10)) + np.testing.assert_equal(slc_recal.data, np.array(ref_forces)) diff --git a/lumicks/pylake/tests/test_file/test_file.py b/lumicks/pylake/tests/test_file/test_file.py index de7c971c8..51473179f 100644 --- a/lumicks/pylake/tests/test_file/test_file.py +++ b/lumicks/pylake/tests/test_file/test_file.py @@ -75,25 +75,10 @@ def test_redirect_list(h5_file): assert f["Point Scan"]["PointScan1"].start == np.int64(20e9) -def test_calibration_str(h5_file, monkeypatch): - # Representation of time is timezone and locale dependent, hence we monkeypatch it - class FakeDateTime: - @classmethod - def fromtimestamp(cls, timestamp, *args, **kwargs): - """Inject a timezone""" - return FakeDateTime() - - def strftime(self, str_format): - return str_format - +def test_calibration_str(h5_file, mock_datetime): f = pylake.File.from_h5py(h5_file) if f.format_version == 2: - with monkeypatch.context() as m: - m.setattr( - "lumicks.pylake.calibration.datetime.datetime", - FakeDateTime, - ) - + with mock_datetime("lumicks.pylake.calibration.datetime.datetime"): assert str(f.force1x.calibration) == dedent( ( """\