Skip to content

Commit

Permalink
Merge pull request #1183 from ZLLentz/fix_att_move_err
Browse files Browse the repository at this point in the history
ENH: for LCLS-I att, give proper error for uninterruptable move
  • Loading branch information
ZLLentz authored Nov 10, 2023
2 parents 04b27ca + bf54961 commit f4121f6
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 4 deletions.
33 changes: 33 additions & 0 deletions docs/source/upcoming_release_notes/1183-fix_att_move_err.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
1183 fix_att_move_err
#####################

API Breaks
----------
- N/A

Features
--------
- Added `PVPositionerNoInterrupt`, a pv positioner base class whose moves
cannot be interrupted (and will loudly complain about any such attempts).

Device Updates
--------------
- N/A

New Devices
-----------
- N/A

Bugfixes
--------
- LCLSI attenuator classes (generated from the `Attenuator` factory function)
will now raise a much clearer error for when they cannot interrupt a
previous move, without trying (and failing) to interrupt the previous move.

Maintenance
-----------
- N/A

Contributors
------------
- zllentz
7 changes: 4 additions & 3 deletions pcdsdevices/attenuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ophyd.device import Device
from ophyd.device import DynamicDeviceComponent as DDC
from ophyd.device import FormattedComponent as FCpt
from ophyd.pv_positioner import PVPositioner, PVPositionerPC
from ophyd.pv_positioner import PVPositionerPC
from ophyd.signal import EpicsSignal, EpicsSignalRO, Signal, SignalRO

from . import utils
Expand All @@ -26,6 +26,7 @@
from .interface import (BaseInterface, FltMvInterface, LightpathInOutCptMixin,
LightpathMixin)
from .pmps import TwinCATStatePMPS
from .pv_positioner import PVPositionerNoInterrupt
from .signal import InternalSignal, MultiDerivedSignal, MultiDerivedSignalRO
from .type_hints import OphydDataType, SignalToValue
from .utils import get_status_float, get_status_value
Expand Down Expand Up @@ -134,9 +135,9 @@ def _status_update(self, *args, value, **kwargs):
self.status.put(BladeStateEnum.Unknown, force=True)


class AttBase(FltMvInterface, PVPositioner):
class AttBase(FltMvInterface, PVPositionerNoInterrupt):
"""
Base class for attenuators with fundamental frequency.
Base class for pre-L2SI beam power attenuators.
This is a device that puts an array of filters in or out to achieve a
desired transmission ratio.
Expand Down
113 changes: 112 additions & 1 deletion pcdsdevices/pv_positioner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Optional
from __future__ import annotations

from typing import Callable, Optional

import numpy as np
from ophyd.device import Component as Cpt
Expand Down Expand Up @@ -160,3 +162,112 @@ def _setup_move(self, position):
def _toggle_done(self):
self.done.put(0, force=True)
self.done.put(1, force=True)


class PVPositionerNoInterrupt(PVPositioner):
"""
A PV positioner whose moves cannot be interrupted.
If we try to start a new move before the previous move completes,
instead we will get a clear error advising us to wait.
Parameters
----------
prefix : str, optional
The device prefix used for all sub-positioners. This is optional as it
may be desirable to specify full PV names for PVPositioners.
limits : 2-element sequence, optional
(low_limit, high_limit)
name : str
The device name
egu : str, optional
The engineering units (EGU) for the position
settle_time : float, optional
The amount of time to wait after moves to report status completion
timeout : float, optional
The default timeout to use for motion requests, in seconds.
Attributes
----------
setpoint : Signal
The setpoint (request) signal
readback : Signal or None
The readback PV (e.g., encoder position PV)
actuate : Signal or None
The actuation PV to set when movement is requested
actuate_value : any, optional
The actuation value, sent to the actuate signal when motion is
requested
stop_signal : Signal or None
The stop PV to set when motion should be stopped
stop_value : any, optional
The value sent to stop_signal when a stop is requested
done : Signal
A readback value indicating whether motion is finished
done_value : any, optional
The value that the done pv should be when motion has completed
put_complete : bool, optional
If set, the specified PV should allow for asynchronous put completion
to indicate motion has finished. If ``actuate`` is specified, it will be
used for put completion. Otherwise, the ``setpoint`` will be used. See
the `-c` option from ``caput`` for more information.
"""
def __init__(self, *args, **kwargs):
if self.__class__ is PVPositionerNoInterrupt:
raise TypeError(
"PVPositionerNoInterrupt must be subclassed with the correct "
"signals set in the class definition."
)
super().__init__(*args, **kwargs)

def move(
self,
position: float,
wait: bool = True,
timeout: float | None = None,
moved_cb: Callable[[PVPositionerNoInterrupt], None] | None = None,
):
"""
Move to a specified position, optionally waiting for motion to
complete. Unlike the standard move, this will fail with a clear
error message when a move is already in progress.
Parameters
----------
position : float
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than ``timeout``
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out, including when a move is
attempted while another move is already in progress.
"""
if self.moving:
try:
progress = f"Position = {self.position}, goal = {self.setpoint.get()}."
except Exception:
progress = ""
raise RuntimeError(
f"The {self.name} device cannot start a new move because the "
"previous move has not completed. This is not an "
"interruptable positioner. Try waiting after the previous "
f"move or for the move's status to complete. {progress}"
)
else:
return super().move(position, wait=wait, timeout=timeout, moved_cb=moved_cb)
10 changes: 10 additions & 0 deletions pcdsdevices/tests/test_attenuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ def test_attenuator_motion(fake_att):
assert att.setpoint.get() == 0


@pytest.mark.timeout(5)
def test_attenuator_no_interrupt(fake_att):
logger.debug('test_attenuator_no_interrupt')
att = fake_att
# Set as already moving
att.done.sim_put(1)
with pytest.raises(RuntimeError):
att.move(0.5)


@pytest.mark.timeout(5)
def test_attenuator_subscriptions(fake_att):
logger.debug('test_attenuator_subscriptions')
Expand Down
34 changes: 34 additions & 0 deletions pcdsdevices/tests/test_pv_positioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest
from ophyd.device import Component as Cpt
from ophyd.signal import Signal

from pcdsdevices.pv_positioner import PVPositionerNoInterrupt


class PVPositionerNoInterruptLocal(PVPositionerNoInterrupt):
setpoint = Cpt(Signal)
done = Cpt(Signal)


@pytest.fixture(scope="function")
def pvpos_no(request):
return PVPositionerNoInterruptLocal(name=request.node.name)


def test_pvpos_no_motion(pvpos_no):
pvpos_no.done.put(1)
status = pvpos_no.move(100, wait=False)
# This is kind of silly but let's sim the full move
pvpos_no.done.put(0)
pvpos_no.done.put(1)
assert pvpos_no.setpoint.get() == 100
status.wait(timeout=1.0)
assert status.done
assert status.success


def test_pvpos_no_interrupt(pvpos_no):
# Already moving, can't start a new one
pvpos_no.done.put(0)
with pytest.raises(RuntimeError):
pvpos_no.move(100, wait=False)

0 comments on commit f4121f6

Please sign in to comment.