From e45e5ead138b0a704a13742938d00973dfe92487 Mon Sep 17 00:00:00 2001 From: Mark Wolfman Date: Wed, 23 Oct 2024 16:36:22 -0500 Subject: [PATCH] Converted the analyzer device to use ophyd-async. --- src/haven/devices/analyzer.py | 134 ++++++++++++++++++++++--------- src/haven/tests/test_analyzer.py | 46 +++++++---- 2 files changed, 123 insertions(+), 57 deletions(-) diff --git a/src/haven/devices/analyzer.py b/src/haven/devices/analyzer.py index 0605a8a4..7f72e7c7 100644 --- a/src/haven/devices/analyzer.py +++ b/src/haven/devices/analyzer.py @@ -1,3 +1,4 @@ +import asyncio import logging from scipy import constants @@ -7,14 +8,19 @@ from ophyd import FormattedComponent as FCpt from ophyd import PseudoPositioner, PseudoSingle, Signal from ophyd.pseudopos import pseudo_position_argument, real_position_argument +from ophyd_async.core import Device, soft_signal_rw, soft_signal_r_and_setter from scipy import constants +from .motor import Motor +from ..positioner import Positioner +from .signal import derived_signal_r, derived_signal_rw + log = logging.getLogger(__name__) um_per_mm = 1000 -h = constants.physical_constants['Planck constant in eV/Hz'][0] +h = constants.physical_constants["Planck constant in eV/Hz"][0] c = constants.c @@ -101,7 +107,7 @@ def bragg_to_energy(bragg: float, d: float) -> float: return energy -class Analyzer(PseudoPositioner): +class Analyzer(Device): """A pseudo positioner describing a rowland circle. Real Axes @@ -123,64 +129,112 @@ class Analyzer(PseudoPositioner): def __init__( self, - x_motor_pv: str, - z_motor_pv: str, - *args, - **kwargs, + *, + x_motor_prefix: str, + z_motor_prefix: str, + name: str = "", ): - self.x_motor_pv = x_motor_pv - self.z_motor_pv = z_motor_pv - super().__init__(*args, **kwargs) + # Create the real motors + self.x = Motor(x_motor_prefix) + self.z = Motor(z_motor_prefix) + # Soft signals for keeping track of the fixed transform properties + self.d_spacing = soft_signal_rw(float, units="Å", precision=4) + self.rowland_diameter = soft_signal_rw(float, units="mm") + self.wedge_angle = soft_signal_rw(float, units="rad") + self.alpha = soft_signal_rw(float, units="rad") + # The actual energy signal that controls the analyzer + self.energy = EnergyPositioner(xtal=self) + super().__init__(name=name) # Other signals - d_spacing: Signal = Cpt(Signal, name="d_spacing") # In Å - rowland_diameter: Signal = Cpt(Signal, name="rowland_diameter") # In mm - wedge_angle: Signal = Cpt(Signal, name="wedge_angle") # In radians - alpha: Signal = Cpt(Signal, name="alpha") + # d_spacing: Signal = Cpt(Signal, name="d_spacing") # In Å + # rowland_diameter: Signal = Cpt(Signal, name="rowland_diameter") # In mm + # wedge_angle: Signal = Cpt(Signal, name="wedge_angle") # In radians + # alpha: Signal = Cpt(Signal, name="alpha") # Pseudo axes - energy: PseudoSingle = Cpt(PseudoSingle, name="energy", limits=(0, 1000)) + # energy: PseudoSingle = Cpt(PseudoSingle, name="energy", limits=(0, 1000)) # Real axes - x: EpicsMotor = FCpt(EpicsMotor, "{x_motor_pv}", name="x") - z: EpicsMotor = FCpt(EpicsMotor, "{z_motor_pv}", name="z") + # x: EpicsMotor = FCpt(EpicsMotor, "{x_motor_pv}", name="x") + # z: EpicsMotor = FCpt(EpicsMotor, "{z_motor_pv}", name="z") + + +class EnergyPositioner(Positioner): + """Positions the energy of an analyzer crystal.""" + put_complete = True + + def __init__(self, *, xtal: Analyzer, name: str = ""): + xtal_signals = { + "D": xtal.rowland_diameter, + "d": xtal.d_spacing, + "beta": xtal.wedge_angle, + "alpha": xtal.alpha, + # "x": xtal.x, + # "z": xtal.z, + } + self.setpoint = derived_signal_rw( + float, + units="eV", + derived_from=dict( + x=xtal.x.user_setpoint, z=xtal.z.user_setpoint, **xtal_signals + ), + forward=self.forward, + inverse=self.inverse, + ) + self.readback = derived_signal_r( + float, + units="eV", + derived_from=dict( + x=xtal.x.user_readback, z=xtal.z.user_readback, **xtal_signals + ), + inverse=self.inverse, + ) + # Metadata + self.velocity, _ = soft_signal_r_and_setter(float, initial_value=1) + self.units, _ = soft_signal_r_and_setter(str, initial_value="eV") + self.precision, _ = soft_signal_r_and_setter(int, initial_value=3) - @pseudo_position_argument - def forward(self, pseudo_pos): + async def forward(self, value, D, d, beta, alpha, x, z): """Run a forward (pseudo -> real) calculation""" - # Convert distance to microns and degrees to radians - energy = pseudo_pos.energy + # Resolve the dependent signals into their values + energy = value + D, d, beta, alpha = await asyncio.gather( + D.get_value(), + d.get_value(), + beta.get_value(), + alpha.get_value(), + ) # Step 0: convert energy to bragg angle - bragg = energy_to_bragg(energy, d=self.d_spacing.get()) + bragg = energy_to_bragg(energy, d=d) # Step 1: Convert energy params to geometry params - D = self.rowland_diameter.get(use_monitor=True) - alpha = self.alpha.get() theta_M = bragg + alpha rho = D * np.sin(theta_M) # Step 2: Convert geometry params to motor positions - beta = self.wedge_angle.get(use_monitor=True) - z = rho * np.cos(theta_M) / np.cos(beta) - x = -z * np.sin(beta) + rho * np.sin(theta_M) + z_val = rho * np.cos(theta_M) / np.cos(beta) + x_val = -z_val * np.sin(beta) + rho * np.sin(theta_M) # Report the calculated result - return self.RealPosition( - x=x, - z=z, - ) + return { + x: x_val, + z: z_val, + } - @real_position_argument - def inverse(self, real_pos): + def inverse(self, values, D, d, beta, alpha, x, z): """Run an inverse (real -> pseudo) calculation""" - beta = self.wedge_angle.get(use_monitor=True) - x = real_pos.x - z = real_pos.z + # Resolve signals into their values + x = values[x] + z = values[z] + D = values[D] + d = values[d] + beta = values[beta] + alpha = values[alpha] # Step 1: Convert motor positions to geometry parameters - theta_M = np.arctan2((x + z * np.sin(beta)) , (z * np.cos(beta))) + theta_M = np.arctan2((x + z * np.sin(beta)), (z * np.cos(beta))) rho = z * np.cos(beta) / np.cos(theta_M) - alpha = self.alpha.get() + # Step 1: Convert geometry params to energy bragg = theta_M - alpha - print(f"{x=}, {z=}, {beta=}, {theta_M=}, {rho=}, {bragg=}, {alpha=}") - energy = bragg_to_energy(bragg, d=self.d_spacing.get()) - return self.PseudoPosition(energy=energy) + energy = bragg_to_energy(bragg, d=d) + return energy # ----------------------------------------------------------------------------- diff --git a/src/haven/tests/test_analyzer.py b/src/haven/tests/test_analyzer.py index 0c7a21f5..bc7292c2 100644 --- a/src/haven/tests/test_analyzer.py +++ b/src/haven/tests/test_analyzer.py @@ -3,6 +3,7 @@ import numpy as np import pytest from ophyd.sim import make_fake_device +from ophyd_async.core import get_mock_put, set_mock_value from haven.devices import analyzer @@ -70,40 +71,51 @@ def test_wavelength_to_bragg(theta, d_spacing, wavelength): @pytest.fixture() -def xtal(sim_registry): - FakeAnalyzer = make_fake_device(analyzer.Analyzer) - xtal = FakeAnalyzer(name="analyzer", x_motor_pv="", z_motor_pv="") +async def xtal(sim_registry): + # Create the analyzer documents + xtal = analyzer.Analyzer(name="analyzer", x_motor_prefix="", z_motor_prefix="") + await xtal.connect(mock=True) # Set default values for xtal parameters d = 1.637 * 1e-10 # Si 311 converted to meters - xtal.d_spacing.set(d).wait() - xtal.rowland_diameter.set(0.500).wait() + await xtal.d_spacing.set(d) + await xtal.rowland_diameter.set(0.500) return xtal @pytest.mark.parametrize("bragg,alpha,beta,z,x", analyzer_values) -def test_rowland_circle_forward(xtal, bragg, alpha, beta, x, z): - xtal.wedge_angle.set(np.radians(beta)).wait() - xtal.alpha.set(np.radians(alpha)).wait() - d = xtal.d_spacing.get() +async def test_rowland_circle_forward(xtal, bragg, alpha, beta, x, z): + # Set up sensible values for current positions + set_mock_value(xtal.wedge_angle, np.radians(beta)) + set_mock_value(xtal.alpha, np.radians(alpha)) + set_mock_value(xtal.x.user_readback, 0) + set_mock_value(xtal.z.user_readback, 0) + d = await xtal.d_spacing.get_value() bragg = np.radians(bragg) energy = analyzer.bragg_to_energy(bragg, d=d) # Check the result is correct (convert cm -> m) expected = (x / 100, z / 100) - actual = xtal.forward(energy) - assert actual == pytest.approx(expected, rel=0.01) + await xtal.energy.set(energy) + x_mock = get_mock_put(xtal.x.user_setpoint) + x_mock.assert_called_once() + assert x_mock.call_args.args[0] == pytest.approx(x/100, abs=0.0001) + z_mock = get_mock_put(xtal.z.user_setpoint) + z_mock.assert_called_once() + assert z_mock.call_args.args[0] == pytest.approx(z/100, abs=0.0001) @pytest.mark.parametrize("bragg,alpha,beta,z,x", analyzer_values) -def test_rowland_circle_inverse(xtal, bragg, alpha, beta, x, z): - xtal.wedge_angle.set(np.radians(beta)).wait() - xtal.alpha.set(np.radians(alpha)).wait() +async def test_rowland_circle_inverse(xtal, bragg, alpha, beta, x, z): + set_mock_value(xtal.wedge_angle, np.radians(beta)) + set_mock_value(xtal.alpha, np.radians(alpha)) # Calculate the expected answer bragg = np.radians(bragg) - d = xtal.d_spacing.get() + d = await xtal.d_spacing.get_value() expected_energy = analyzer.bragg_to_energy(bragg, d=d) # Compare to the calculated inverse - actual = xtal.inverse(x, z) - assert actual[0] == pytest.approx(expected_energy, abs=0.2) + set_mock_value(xtal.x.user_readback, x) + set_mock_value(xtal.z.user_readback, z) + new_energy = await xtal.energy.readback.get_value() + assert new_energy == pytest.approx(expected_energy, abs=0.2) # -----------------------------------------------------------------------------