Skip to content

Commit

Permalink
Converted the analyzer device to use ophyd-async.
Browse files Browse the repository at this point in the history
  • Loading branch information
canismarko committed Oct 23, 2024
1 parent 4826aa7 commit e45e5ea
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 57 deletions.
134 changes: 94 additions & 40 deletions src/haven/devices/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging

from scipy import constants
Expand All @@ -7,14 +8,19 @@
from ophyd import FormattedComponent as FCpt
from ophyd import PseudoPositioner, PseudoSingle, Signal
from ophyd.pseudopos import pseudo_position_argument, real_position_argument
from ophyd_async.core import Device, soft_signal_rw, soft_signal_r_and_setter
from scipy import constants

from .motor import Motor
from ..positioner import Positioner
from .signal import derived_signal_r, derived_signal_rw

log = logging.getLogger(__name__)

um_per_mm = 1000


h = constants.physical_constants['Planck constant in eV/Hz'][0]
h = constants.physical_constants["Planck constant in eV/Hz"][0]
c = constants.c


Expand Down Expand Up @@ -101,7 +107,7 @@ def bragg_to_energy(bragg: float, d: float) -> float:
return energy


class Analyzer(PseudoPositioner):
class Analyzer(Device):
"""A pseudo positioner describing a rowland circle.
Real Axes
Expand All @@ -123,64 +129,112 @@ class Analyzer(PseudoPositioner):

def __init__(
self,
x_motor_pv: str,
z_motor_pv: str,
*args,
**kwargs,
*,
x_motor_prefix: str,
z_motor_prefix: str,
name: str = "",
):
self.x_motor_pv = x_motor_pv
self.z_motor_pv = z_motor_pv
super().__init__(*args, **kwargs)
# Create the real motors
self.x = Motor(x_motor_prefix)
self.z = Motor(z_motor_prefix)
# Soft signals for keeping track of the fixed transform properties
self.d_spacing = soft_signal_rw(float, units="Å", precision=4)
self.rowland_diameter = soft_signal_rw(float, units="mm")
self.wedge_angle = soft_signal_rw(float, units="rad")
self.alpha = soft_signal_rw(float, units="rad")
# The actual energy signal that controls the analyzer
self.energy = EnergyPositioner(xtal=self)
super().__init__(name=name)

# Other signals
d_spacing: Signal = Cpt(Signal, name="d_spacing") # In Å
rowland_diameter: Signal = Cpt(Signal, name="rowland_diameter") # In mm
wedge_angle: Signal = Cpt(Signal, name="wedge_angle") # In radians
alpha: Signal = Cpt(Signal, name="alpha")
# d_spacing: Signal = Cpt(Signal, name="d_spacing") # In Å
# rowland_diameter: Signal = Cpt(Signal, name="rowland_diameter") # In mm
# wedge_angle: Signal = Cpt(Signal, name="wedge_angle") # In radians
# alpha: Signal = Cpt(Signal, name="alpha")

# Pseudo axes
energy: PseudoSingle = Cpt(PseudoSingle, name="energy", limits=(0, 1000))
# energy: PseudoSingle = Cpt(PseudoSingle, name="energy", limits=(0, 1000))

# Real axes
x: EpicsMotor = FCpt(EpicsMotor, "{x_motor_pv}", name="x")
z: EpicsMotor = FCpt(EpicsMotor, "{z_motor_pv}", name="z")
# x: EpicsMotor = FCpt(EpicsMotor, "{x_motor_pv}", name="x")
# z: EpicsMotor = FCpt(EpicsMotor, "{z_motor_pv}", name="z")


class EnergyPositioner(Positioner):
"""Positions the energy of an analyzer crystal."""
put_complete = True

def __init__(self, *, xtal: Analyzer, name: str = ""):
xtal_signals = {
"D": xtal.rowland_diameter,
"d": xtal.d_spacing,
"beta": xtal.wedge_angle,
"alpha": xtal.alpha,
# "x": xtal.x,
# "z": xtal.z,
}
self.setpoint = derived_signal_rw(
float,
units="eV",
derived_from=dict(
x=xtal.x.user_setpoint, z=xtal.z.user_setpoint, **xtal_signals
),
forward=self.forward,
inverse=self.inverse,
)
self.readback = derived_signal_r(
float,
units="eV",
derived_from=dict(
x=xtal.x.user_readback, z=xtal.z.user_readback, **xtal_signals
),
inverse=self.inverse,
)
# Metadata
self.velocity, _ = soft_signal_r_and_setter(float, initial_value=1)
self.units, _ = soft_signal_r_and_setter(str, initial_value="eV")
self.precision, _ = soft_signal_r_and_setter(int, initial_value=3)

@pseudo_position_argument
def forward(self, pseudo_pos):
async def forward(self, value, D, d, beta, alpha, x, z):
"""Run a forward (pseudo -> real) calculation"""
# Convert distance to microns and degrees to radians
energy = pseudo_pos.energy
# Resolve the dependent signals into their values
energy = value
D, d, beta, alpha = await asyncio.gather(
D.get_value(),
d.get_value(),
beta.get_value(),
alpha.get_value(),
)
# Step 0: convert energy to bragg angle
bragg = energy_to_bragg(energy, d=self.d_spacing.get())
bragg = energy_to_bragg(energy, d=d)
# Step 1: Convert energy params to geometry params
D = self.rowland_diameter.get(use_monitor=True)
alpha = self.alpha.get()
theta_M = bragg + alpha
rho = D * np.sin(theta_M)
# Step 2: Convert geometry params to motor positions
beta = self.wedge_angle.get(use_monitor=True)
z = rho * np.cos(theta_M) / np.cos(beta)
x = -z * np.sin(beta) + rho * np.sin(theta_M)
z_val = rho * np.cos(theta_M) / np.cos(beta)
x_val = -z_val * np.sin(beta) + rho * np.sin(theta_M)
# Report the calculated result
return self.RealPosition(
x=x,
z=z,
)
return {
x: x_val,
z: z_val,
}

@real_position_argument
def inverse(self, real_pos):
def inverse(self, values, D, d, beta, alpha, x, z):
"""Run an inverse (real -> pseudo) calculation"""
beta = self.wedge_angle.get(use_monitor=True)
x = real_pos.x
z = real_pos.z
# Resolve signals into their values
x = values[x]
z = values[z]
D = values[D]
d = values[d]
beta = values[beta]
alpha = values[alpha]
# Step 1: Convert motor positions to geometry parameters
theta_M = np.arctan2((x + z * np.sin(beta)) , (z * np.cos(beta)))
theta_M = np.arctan2((x + z * np.sin(beta)), (z * np.cos(beta)))
rho = z * np.cos(beta) / np.cos(theta_M)
alpha = self.alpha.get()
# Step 1: Convert geometry params to energy
bragg = theta_M - alpha
print(f"{x=}, {z=}, {beta=}, {theta_M=}, {rho=}, {bragg=}, {alpha=}")
energy = bragg_to_energy(bragg, d=self.d_spacing.get())
return self.PseudoPosition(energy=energy)
energy = bragg_to_energy(bragg, d=d)
return energy


# -----------------------------------------------------------------------------
Expand Down
46 changes: 29 additions & 17 deletions src/haven/tests/test_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pytest
from ophyd.sim import make_fake_device
from ophyd_async.core import get_mock_put, set_mock_value

from haven.devices import analyzer

Expand Down Expand Up @@ -70,40 +71,51 @@ def test_wavelength_to_bragg(theta, d_spacing, wavelength):


@pytest.fixture()
def xtal(sim_registry):
FakeAnalyzer = make_fake_device(analyzer.Analyzer)
xtal = FakeAnalyzer(name="analyzer", x_motor_pv="", z_motor_pv="")
async def xtal(sim_registry):
# Create the analyzer documents
xtal = analyzer.Analyzer(name="analyzer", x_motor_prefix="", z_motor_prefix="")
await xtal.connect(mock=True)
# Set default values for xtal parameters
d = 1.637 * 1e-10 # Si 311 converted to meters
xtal.d_spacing.set(d).wait()
xtal.rowland_diameter.set(0.500).wait()
await xtal.d_spacing.set(d)
await xtal.rowland_diameter.set(0.500)
return xtal


@pytest.mark.parametrize("bragg,alpha,beta,z,x", analyzer_values)
def test_rowland_circle_forward(xtal, bragg, alpha, beta, x, z):
xtal.wedge_angle.set(np.radians(beta)).wait()
xtal.alpha.set(np.radians(alpha)).wait()
d = xtal.d_spacing.get()
async def test_rowland_circle_forward(xtal, bragg, alpha, beta, x, z):
# Set up sensible values for current positions
set_mock_value(xtal.wedge_angle, np.radians(beta))
set_mock_value(xtal.alpha, np.radians(alpha))
set_mock_value(xtal.x.user_readback, 0)
set_mock_value(xtal.z.user_readback, 0)
d = await xtal.d_spacing.get_value()
bragg = np.radians(bragg)
energy = analyzer.bragg_to_energy(bragg, d=d)
# Check the result is correct (convert cm -> m)
expected = (x / 100, z / 100)
actual = xtal.forward(energy)
assert actual == pytest.approx(expected, rel=0.01)
await xtal.energy.set(energy)
x_mock = get_mock_put(xtal.x.user_setpoint)
x_mock.assert_called_once()
assert x_mock.call_args.args[0] == pytest.approx(x/100, abs=0.0001)
z_mock = get_mock_put(xtal.z.user_setpoint)
z_mock.assert_called_once()
assert z_mock.call_args.args[0] == pytest.approx(z/100, abs=0.0001)


@pytest.mark.parametrize("bragg,alpha,beta,z,x", analyzer_values)
def test_rowland_circle_inverse(xtal, bragg, alpha, beta, x, z):
xtal.wedge_angle.set(np.radians(beta)).wait()
xtal.alpha.set(np.radians(alpha)).wait()
async def test_rowland_circle_inverse(xtal, bragg, alpha, beta, x, z):
set_mock_value(xtal.wedge_angle, np.radians(beta))
set_mock_value(xtal.alpha, np.radians(alpha))
# Calculate the expected answer
bragg = np.radians(bragg)
d = xtal.d_spacing.get()
d = await xtal.d_spacing.get_value()
expected_energy = analyzer.bragg_to_energy(bragg, d=d)
# Compare to the calculated inverse
actual = xtal.inverse(x, z)
assert actual[0] == pytest.approx(expected_energy, abs=0.2)
set_mock_value(xtal.x.user_readback, x)
set_mock_value(xtal.z.user_readback, z)
new_energy = await xtal.energy.readback.get_value()
assert new_energy == pytest.approx(expected_energy, abs=0.2)


# -----------------------------------------------------------------------------
Expand Down

0 comments on commit e45e5ea

Please sign in to comment.