diff --git a/mxcubecore/BaseHardwareObjects.py b/mxcubecore/BaseHardwareObjects.py index 0b9ec7d2f1..654f6c32d2 100644 --- a/mxcubecore/BaseHardwareObjects.py +++ b/mxcubecore/BaseHardwareObjects.py @@ -472,31 +472,20 @@ def get_object_by_role(self, role: str) -> Union["HardwareObject", None]: Returns: Union[HardwareObject, None]: Hardware object. """ - object = None - obj: Self = self - objects = [] role = str(role).lower() + objects = [o for o in self if o] - while True: - if role in obj._objects_by_role: - return obj._objects_by_role[role] + while objects: + if role in self._objects_by_role: + return self._objects_by_role[role] - for object in obj: - objects.append(object) + try : + self = objects.pop() - try: - obj = objects.pop() - except IndexError: - break - else: - object = obj.get_object_by_role(role) - if object is not None: - return object + except : + if self.get_object_by_role(role): + return self.get_object_by_role(role) - if len(objects) > 0: - obj = objects.pop() - else: - break def objects_names(self) -> List[Union[str, None]]: """Return hardware object names. diff --git a/mxcubecore/HardwareObjects/TangoShutter.py b/mxcubecore/HardwareObjects/TangoShutter.py index 5f85abe05a..ee55744538 100644 --- a/mxcubecore/HardwareObjects/TangoShutter.py +++ b/mxcubecore/HardwareObjects/TangoShutter.py @@ -31,19 +31,29 @@ Open Close State - {"open": "OPEN", "cloded": "CLOSED", "DISABLE" : "DISABLE"} + {"OPEN": "MYOPEN", "NEWSTATE": ["MYSTATE", "BUSY"]} -In this example the tag contains a json dictionary that maps spectific tango shutter states to the -convantional states defined in the TangoShutter Class. This tag is not necessay in cases where the tango shutter states -are all covered by the TangoShuter class conventional states. +In the example the property contains a dictionary that redefines or +adds specific tango shutter states. +When redefining a known state, only the VALUES Enum will be updated. +When defining a new state (new key), the dictionary value should be a +list. The new state is added to both the VALUES and the SPECIFIC_STATES Enum. +Attention: + - do not use tuples or the python json parser will fail! + - make sure only double quotes are used inside the values dictionary. No single quotes (') are allowed ! + - the second element of the list should be a standard HardwareObjectState name + (UNKNOWN, WARNING, BUSY, READY, FAULT, OFF - see in BaseHardwareObjects.py)! +The property is optional. """ -import json + +import logging +import ast from enum import Enum, unique from mxcubecore.HardwareObjects.abstract.AbstractShutter import AbstractShutter from mxcubecore.BaseHardwareObjects import HardwareObjectState -__copyright__ = """ Copyright © 2023 by the MXCuBE collaboration """ +__copyright__ = """ Copyright © by the MXCuBE collaboration """ __license__ = "LGPLv3+" @@ -58,6 +68,7 @@ class TangoShutterStates(Enum): AUTOMATIC = HardwareObjectState.READY, "RUNNING" UNKNOWN = HardwareObjectState.UNKNOWN, "RUNNING" FAULT = HardwareObjectState.WARNING, "FAULT" + STANDBY = HardwareObjectState.WARNING, "STANDBY" class TangoShutter(AbstractShutter): @@ -81,26 +92,18 @@ def init(self): self.state_channel.connect_signal("update", self._update_value) self.update_state() - try: - self.config_values = json.loads(self.get_property("values")) - except: - self.config_values = None - def _update_value(self, value): """Update the value. Args: value(str): The value reported by the state channel. """ - if self.config_values: - value = self.config_values[str(value)] - else: - value = str(value) - - super().update_value(self.value_to_enum(value)) + super().update_value(self.value_to_enum(str(value))) def _initialise_values(self): - """Add the tango states to VALUES""" + """Add specific tango states to VALUES and, if configured + in the xml file, to SPECIFIC_STATES""" values_dict = {item.name: item.value for item in self.VALUES} + states_dict = {item.name: item.value for item in self.SPECIFIC_STATES} values_dict.update( { "MOVING": "MOVING", @@ -109,7 +112,19 @@ def _initialise_values(self): "FAULT": "FAULT", } ) + try: + config_values = ast.literal_eval(self.get_property("values")) + for key, val in config_values.items(): + if isinstance(val, (tuple, list)): + values_dict.update({key: val[1]}) + states_dict.update({key: (HardwareObjectState[val[1]], val[0])}) + else: + values_dict.update({key: val}) + except (ValueError, TypeError) as err: + logging.error(f"Exception in _initialise_values(): {err}") + self.VALUES = Enum("ValueEnum", values_dict) + self.SPECIFIC_STATES = Enum("TangoShutterStates", states_dict) def get_state(self): """Get the device state. @@ -117,25 +132,18 @@ def get_state(self): (enum 'HardwareObjectState'): Device state. """ try: - if self.config_values: - _state = self.config_values[str(self.state_channel.get_value())] - else: - _state = str(self.state_channel.get_value()) - - except (AttributeError, KeyError): + _state = self.get_value().name + return self.SPECIFIC_STATES[_state].value[0] + except (AttributeError, KeyError) as err: + logging.error(f"Exception in get_state(): {err}") return self.STATES.UNKNOWN - return self.SPECIFIC_STATES[_state].value[0] - def get_value(self): """Get the device value Returns: (Enum): Enum member, corresponding to the 'VALUE' or UNKNOWN. """ - if self.config_values: - _val = self.config_values[str(self.state_channel.get_value())] - else: - _val = str(self.state_channel.get_value()) + _val = str(self.state_channel.get_value()) return self.value_to_enum(_val) def _set_value(self, value): diff --git a/test/pytest/test_hwo_tango_shutter.py b/test/pytest/test_hwo_tango_shutter.py new file mode 100644 index 0000000000..8e39199b0e --- /dev/null +++ b/test/pytest/test_hwo_tango_shutter.py @@ -0,0 +1,119 @@ +from typing import Callable +import pytest +import gevent +from gevent import Timeout +from tango import DeviceProxy, DevState +from tango.server import Device, command +from tango.test_context import DeviceTestContext +from mxcubecore.HardwareObjects import TangoShutter + +""" +Test the HardwareObjects.TangoShutter shutter hardware object. +""" + + +VALUES_JSON = """ +{"OPEN": "OPEN", "CLOSED": "CLOSE", "MOVING" : "MOVING"} +""" + + +class Shutter(Device): + """ + Very simple tango shutter device, that only goes between 'open' and 'close' states. + """ + + def __init__(self, *args, **kwargs): + self._is_open = False + super().__init__(*args, **kwargs) + + @command() + def Open(self): + self._is_open = True + + @command() + def Close(self): + self._is_open = False + + def dev_state(self): + return DevState.OPEN if self._is_open else DevState.CLOSE + + +@pytest.fixture +def shutter(): + tangods_test_context = DeviceTestContext(Shutter, process=True) + tangods_test_context.start() + + # + # set up the TangoShutter hardware object + # + hwo_shutter = TangoShutter.TangoShutter("/random_name") + hwo_shutter.tangoname = tangods_test_context.get_device_access() + hwo_shutter.set_property("values", VALUES_JSON) + hwo_shutter.add_channel( + { + "name": "State", + "type": "tango", + }, + "State", + True, + ) + hwo_shutter.add_command( + { + "name": "Open", + "type": "tango", + }, + "Open", + True, + ) + hwo_shutter.add_command( + { + "name": "Close", + "type": "tango", + }, + "Close", + True, + ) + + hwo_shutter.init() + + yield hwo_shutter + + tangods_test_context.stop() + tangods_test_context.join() + + +def _wait_until(condition: Callable, condition_desc: str): + with Timeout(1.2, Exception(f"timed out while waiting for {condition_desc}")): + while not condition(): + gevent.sleep(0.01) + + +def test_open(shutter): + """ + test opening the shutter + """ + dev = DeviceProxy(shutter.tangoname) + + assert dev.State() == DevState.CLOSE + assert not shutter.is_open + + shutter.open() + + _wait_until(lambda: shutter.is_open, "shutter to open") + assert dev.State() == DevState.OPEN + + +def test_close(shutter): + """ + test closing the shutter + """ + dev = DeviceProxy(shutter.tangoname) + dev.Open() + + assert dev.State() == DevState.OPEN + assert shutter.is_open + + shutter.close() + + _wait_until(lambda: not shutter.is_open, "shutter to close") + assert dev.State() == DevState.CLOSE