Skip to content

Commit

Permalink
Added a prepare() method to the xspress device.
Browse files Browse the repository at this point in the history
  • Loading branch information
canismarko committed Sep 23, 2024
1 parent fc98554 commit a1e3aa1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
20 changes: 16 additions & 4 deletions src/haven/instrument/xspress.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ophyd.areadetector.base import EpicsSignalWithRBV as SignalWithRBV
from ophyd.signal import InternalSignal
from ophyd.status import StatusBase, SubscriptionStatus
from ophyd_async.core import DetectorTrigger, TriggerInfo
from pcdsdevices.signal import MultiDerivedSignal
from pcdsdevices.type_hints import OphydDataType, SignalToValue

Expand Down Expand Up @@ -460,13 +461,27 @@ def walk_fly_signals(self, *, include_lazy=False):
# continue
yield walk

def kickoff(self) -> StatusBase:
def prepare(self, value: TriggerInfo) -> StatusBase:
info = value
# Set the right parameters
trigger_mode = {
DetectorTrigger.internal: self.trigger_modes.INTERNAL,
# DetectorTrigger.edge_trigger: None,
# DetectorTrigger.constant_gate: None,
DetectorTrigger.variable_gate: self.trigger_modes.TTL_VETO_ONLY,
}[value.trigger]
status = self.cam.trigger_mode.set(trigger_mode)
status &= self.cam.num_images.set(info.number)
status &= self.cam.acquire_time.set(info.livetime)
status &= self.cam.acquire_period.set(info.livetime + info.deadtime)
# Set up subscriptions for capturing data
self._fly_data = {}
for walk in self.walk_fly_signals():
sig = walk.item
sig.subscribe(self.save_fly_datum, run=True)
return status

def kickoff(self) -> StatusBase:
# Set up the status for when the detector is ready to fly
def check_acquiring(*, old_value, value, **kwargs):
is_acquiring = value == self.detector_states.ACQUIRE
Expand All @@ -475,9 +490,6 @@ def check_acquiring(*, old_value, value, **kwargs):
return is_acquiring

status = SubscriptionStatus(self.detector_state, check_acquiring)
# Set the right parameters
status &= self.cam.trigger_mode.set(self.trigger_modes.TTL_VETO_ONLY)
status &= self.cam.num_images.set(2**14)
status &= self.acquire.set(self.acquire_states.ACQUIRE)
return status

Expand Down
26 changes: 23 additions & 3 deletions src/haven/tests/test_fluorescence_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import numpy as np
import pytest
from ophyd import OphydObject, Signal
from ophyd_async.core import DetectorTrigger, TriggerInfo

from haven.instrument.dxp import load_dxp_detectors, parse_xmap_buffer
from haven.instrument.xspress import load_xspress_detectors
Expand Down Expand Up @@ -384,8 +385,27 @@ def test_complete_dxp(dxp):
assert status.done


@pytest.fixture()
def trigger_info():
return TriggerInfo(
number=5, trigger=DetectorTrigger.internal, deadtime=0.2, livetime=1.3
)


def test_prepare_xspress(xspress, trigger_info):
"""Check the behavior of the Xspress3 electornic's fly-scan prepare call."""
vortex = xspress
status = vortex.prepare(trigger_info)
status.wait(timeout=3)
assert status.done
assert vortex.cam.trigger_mode.get() == vortex.trigger_modes.INTERNAL
assert vortex.cam.num_images.get() == 5
assert vortex.cam.acquire_time.get() == 1.3
assert vortex.cam.acquire_period.get() == 1.5


def test_kickoff_xspress(xspress):
"""Check the behavior of the Xspress3 electornic's fly-scan complete call."""
"""Check the behavior of the Xspress3 electronics' fly-scan kickoff call."""
vortex = xspress
# Make sure the num_images is included
fly_sigs = [walk.item for walk in vortex.walk_fly_signals()]
Expand All @@ -398,7 +418,6 @@ def test_kickoff_xspress(xspress):
vortex.detector_state.sim_put(vortex.detector_states.ACQUIRE)
status.wait(timeout=3)
assert status.done
assert vortex.cam.trigger_mode.get() == vortex.trigger_modes.TTL_VETO_ONLY
assert vortex.acquire.get() == vortex.acquire_states.ACQUIRE


Expand All @@ -412,9 +431,10 @@ def test_complete_xspress(xspress):
assert status.done


def test_collect_xspress(xspress):
def test_collect_xspress(xspress, trigger_info):
"""Check the Xspress3 collects data during fly-scanning."""
vortex = xspress
vortex.prepare(trigger_info)
# Kick off the detector
status = vortex.kickoff()
vortex.detector_state.sim_put(vortex.detector_states.ACQUIRE)
Expand Down

0 comments on commit a1e3aa1

Please sign in to comment.