Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: for LCLS-I att, give proper error for uninterruptable move #1183

Merged
merged 4 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/source/upcoming_release_notes/1183-fix_att_move_err.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
1183 fix_att_move_err
#####################

API Breaks
----------
- N/A

Features
--------
- Added `PVPositionerNoInterrupt`, a pv positioner base class whose moves
cannot be interrupted (and will loudly complain about any such attempts).

Device Updates
--------------
- N/A

New Devices
-----------
- N/A

Bugfixes
--------
- LCLSI attenuator classes (generated from the `Attenuator` factory function)
will now raise a much clearer error for when they cannot interrupt a
previous move, without trying (and failing) to interrupt the previous move.

Maintenance
-----------
- N/A

Contributors
------------
- zllentz
7 changes: 4 additions & 3 deletions pcdsdevices/attenuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ophyd.device import Device
from ophyd.device import DynamicDeviceComponent as DDC
from ophyd.device import FormattedComponent as FCpt
from ophyd.pv_positioner import PVPositioner, PVPositionerPC
from ophyd.pv_positioner import PVPositionerPC
from ophyd.signal import EpicsSignal, EpicsSignalRO, Signal, SignalRO

from . import utils
Expand All @@ -26,6 +26,7 @@
from .interface import (BaseInterface, FltMvInterface, LightpathInOutCptMixin,
LightpathMixin)
from .pmps import TwinCATStatePMPS
from .pv_positioner import PVPositionerNoInterrupt
from .signal import InternalSignal, MultiDerivedSignal, MultiDerivedSignalRO
from .type_hints import OphydDataType, SignalToValue
from .utils import get_status_float, get_status_value
Expand Down Expand Up @@ -134,9 +135,9 @@ def _status_update(self, *args, value, **kwargs):
self.status.put(BladeStateEnum.Unknown, force=True)


class AttBase(FltMvInterface, PVPositioner):
class AttBase(FltMvInterface, PVPositionerNoInterrupt):
"""
Base class for attenuators with fundamental frequency.
Base class for pre-L2SI beam power attenuators.

This is a device that puts an array of filters in or out to achieve a
desired transmission ratio.
Expand Down
113 changes: 112 additions & 1 deletion pcdsdevices/pv_positioner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Optional
from __future__ import annotations

from typing import Callable, Optional

import numpy as np
from ophyd.device import Component as Cpt
Expand Down Expand Up @@ -160,3 +162,112 @@ def _setup_move(self, position):
def _toggle_done(self):
self.done.put(0, force=True)
self.done.put(1, force=True)


class PVPositionerNoInterrupt(PVPositioner):
"""
A PV positioner whose moves cannot be interrupted.

If we try to start a new move before the previous move completes,
instead we will get a clear error advising us to wait.

Parameters
----------
prefix : str, optional
The device prefix used for all sub-positioners. This is optional as it
may be desirable to specify full PV names for PVPositioners.
limits : 2-element sequence, optional
(low_limit, high_limit)
name : str
The device name
egu : str, optional
The engineering units (EGU) for the position
settle_time : float, optional
The amount of time to wait after moves to report status completion
timeout : float, optional
The default timeout to use for motion requests, in seconds.
tangkong marked this conversation as resolved.
Show resolved Hide resolved

Attributes
----------
setpoint : Signal
The setpoint (request) signal
readback : Signal or None
The readback PV (e.g., encoder position PV)
actuate : Signal or None
The actuation PV to set when movement is requested
actuate_value : any, optional
The actuation value, sent to the actuate signal when motion is
requested
stop_signal : Signal or None
The stop PV to set when motion should be stopped
stop_value : any, optional
The value sent to stop_signal when a stop is requested
done : Signal
A readback value indicating whether motion is finished
done_value : any, optional
The value that the done pv should be when motion has completed
put_complete : bool, optional
If set, the specified PV should allow for asynchronous put completion
to indicate motion has finished. If ``actuate`` is specified, it will be
used for put completion. Otherwise, the ``setpoint`` will be used. See
the `-c` option from ``caput`` for more information.
"""
def __init__(self, *args, **kwargs):
if self.__class__ is PVPositionerNoInterrupt:
raise TypeError(
"PVPositionerNoInterrupt must be subclassed with the correct "
"signals set in the class definition."
)
super().__init__(*args, **kwargs)

def move(
self,
position: float,
wait: bool = True,
timeout: float | None = None,
moved_cb: Callable[[PVPositionerNoInterrupt], None] | None = None,
):
"""
Move to a specified position, optionally waiting for motion to
complete. Unlike the standard move, this will fail with a clear
error message when a move is already in progress.

Parameters
----------
position : float
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.

Returns
-------
status : MoveStatus

Raises
------
TimeoutError
When motion takes longer than ``timeout``
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out, including when a move is
attempted while another move is already in progress.
"""
if self.moving:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try:
progress = f"Position = {self.position}, goal = {self.setpoint.get()}."
except Exception:
progress = ""
raise RuntimeError(
f"The {self.name} device cannot start a new move because the "
"previous move has not completed. This is not an "
"interruptable positioner. Try waiting after the previous "
f"move or for the move's status to complete. {progress}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the addition of "progress" here

I understand what the error message means - and it's likely clear enough for the end user as well. 👍

else:
return super().move(position, wait=wait, timeout=timeout, moved_cb=moved_cb)
10 changes: 10 additions & 0 deletions pcdsdevices/tests/test_attenuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ def test_attenuator_motion(fake_att):
assert att.setpoint.get() == 0


@pytest.mark.timeout(5)
def test_attenuator_no_interrupt(fake_att):
logger.debug('test_attenuator_no_interrupt')
att = fake_att
# Set as already moving
att.done.sim_put(1)
with pytest.raises(RuntimeError):
att.move(0.5)


@pytest.mark.timeout(5)
def test_attenuator_subscriptions(fake_att):
logger.debug('test_attenuator_subscriptions')
Expand Down
34 changes: 34 additions & 0 deletions pcdsdevices/tests/test_pv_positioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest
from ophyd.device import Component as Cpt
from ophyd.signal import Signal

from pcdsdevices.pv_positioner import PVPositionerNoInterrupt


class PVPositionerNoInterruptLocal(PVPositionerNoInterrupt):
setpoint = Cpt(Signal)
done = Cpt(Signal)


@pytest.fixture(scope="function")
def pvpos_no(request):
return PVPositionerNoInterruptLocal(name=request.node.name)


def test_pvpos_no_motion(pvpos_no):
pvpos_no.done.put(1)
status = pvpos_no.move(100, wait=False)
# This is kind of silly but let's sim the full move
pvpos_no.done.put(0)
pvpos_no.done.put(1)
assert pvpos_no.setpoint.get() == 100
status.wait(timeout=1.0)
assert status.done
assert status.success


def test_pvpos_no_interrupt(pvpos_no):
# Already moving, can't start a new one
pvpos_no.done.put(0)
with pytest.raises(RuntimeError):
pvpos_no.move(100, wait=False)