diff --git a/olaf/canopen/__init__.py b/olaf/canopen/__init__.py index f880675..353eb4b 100644 --- a/olaf/canopen/__init__.py +++ b/olaf/canopen/__init__.py @@ -80,6 +80,8 @@ class EmcyCode(IntEnum): class OdDataType(IntEnum): + """Object dictionary data types defined by CiA 301.""" + BOOLEAN = 0x0001 INTEGER8 = 0x0002 INTEGER16 = 0x0003 @@ -95,7 +97,9 @@ class OdDataType(IntEnum): INTEGER64 = 0x0015 UNSIGNED64 = 0x001B + @property def is_int(self) -> bool: + """bool: Check if type is a integer type.""" return self.value in [ self.INTEGER8, self.INTEGER16, @@ -108,7 +112,9 @@ def is_int(self) -> bool: ] def is_float(self) -> bool: + """bool: Check if type is a float type.""" return self.value in [self.REAL32, self.REAL64] def is_str(self) -> bool: + """bool: Check if type is a str type.""" return self.value in [self.VISIBLE_STRING, self.OCTET_STRING] diff --git a/olaf/canopen/master_node.py b/olaf/canopen/master_node.py index becd958..b64d61e 100644 --- a/olaf/canopen/master_node.py +++ b/olaf/canopen/master_node.py @@ -6,10 +6,9 @@ import canopen from canopen.sdo import SdoArray, SdoRecord, SdoVariable -from canopen.sdo.exceptions import SdoError from loguru import logger -from ..canopen.network import CanNetwork, CanNetworkError, CanNetworkState +from ..canopen.network import CanNetwork from .node import Node NodeHeartbeatInfo = namedtuple("NodeHeartbeatInfo", ["state", "timestamp", "time_since_boot"]) @@ -90,73 +89,6 @@ def send_sync(self): self._network.send_message(0x80, b"", False) - def sdo_read(self, key: Any, index: Union[int, str], subindex: Union[int, str]) -> Any: - """ - Read a value from a remote node's object dictionary using an SDO. - - Parameters - ---------- - key: Any - The dict key for the node to read from. - index: int or str - The index to read from. - subindex: int or str - The subindex to read from. - - Raises - ------ - CanNetworkError - Cannot send a SDO read message when the network is down. - canopen.SdoError - Error with the SDO. - - Returns - ------- - Any - The value read. - """ - - if self._network.status == CanNetworkState.NETWORK_UP: - raise CanNetworkError("network is down cannot send an SDO read message") - - if subindex == 0 and isinstance(self._od_db[key][index], canopen.objectdictionary.Variable): - value = self._remote_nodes[key].sdo[index].raw - else: - value = self._remote_nodes[key].sdo[index][subindex].raw - - return value - - def sdo_write(self, key: Any, index: int, subindex: int, value: Any): - """ - Write a value to a remote node's object dictionary using an SDO. - - Parameters - ---------- - key: Any - The dict key for the node to write to. - index: int - The index to write to. - subindex: int - The subindex to write to. - value: Any - The value to write. - - Raises - ------ - CanNetworkError - Cannot send a SDO write message when the network is down. - canopen.SdoError - Error with the SDO. - """ - - if self._network.status == CanNetworkState.NETWORK_UP: - raise CanNetworkError("network is down cannot send an SDO write message") - - if subindex == 0 and isinstance(self._od_db[key][index], canopen.objectdictionary.Variable): - self._remote_nodes[key].sdo[index].raw = value - else: - self._remote_nodes[key].sdo[index][subindex].raw = value - @property def remote_nodes(self) -> dict[Any, canopen.RemoteNode]: """dict[Any, canopen.RemoteNode]: All other node as remote node.""" @@ -167,7 +99,7 @@ def od_db(self) -> dict[Any, canopen.ObjectDictionary]: """dict[Any, canopen.ObjectDictionary]: All other node ODs.""" return self._od_db - def sdo_get_obj( + def _sdo_get_obj( self, key: Any, index: Union[int, str], subindex: Union[int, str, None] ) -> [SdoVariable, SdoArray, SdoRecord]: @@ -194,7 +126,7 @@ def sdo_read( ------ NetworkError Cannot send a SDO read message when the network is down. - SdoError + canopen.sdo.exceptions.SdoError Error with the SDO. Returns @@ -203,7 +135,7 @@ def sdo_read( The value read. """ - return self.sdo_get_obj(key, index, subindex).phys + return self._sdo_get_obj(key, index, subindex).phys def sdo_read_bitfield( self, key: Any, index: Union[int, str], subindex: Union[int, str, None], field: str @@ -224,7 +156,7 @@ def sdo_read_bitfield( ------ NetworkError Cannot send a SDO read message when the network is down. - SdoError + canopen.sdo.exceptions.SdoError Error with the SDO. Returns @@ -233,7 +165,7 @@ def sdo_read_bitfield( The field value. """ - obj = self.sdo_get_obj(key, index, subindex) + obj = self._sdo_get_obj(key, index, subindex) bits = obj.od.bit_definitions[field] value = 0 @@ -262,7 +194,7 @@ def sdo_read_enum( ------ NetworkError Cannot send a SDO read message when the network is down. - SdoError + canopen.sdo.exceptions.SdoError Error with the SDO. Returns @@ -271,7 +203,7 @@ def sdo_read_enum( The enum str value. """ - obj = self.sdo_get_obj(key, index, subindex) + obj = self._sdo_get_obj(key, index, subindex) obj_value = obj.phys return obj.od.value_descriptions[obj_value] @@ -300,11 +232,11 @@ def sdo_write( ------ NetworkError Cannot send a SDO write message when the network is down. - canopen.SdoError + canopen.sdo.exceptions.SdoError Error with the SDO. """ - obj = self.sdo_get_obj(key, index, subindex) + obj = self._sdo_get_obj(key, index, subindex) obj.phys = value def sdo_write_bitfield( @@ -334,14 +266,12 @@ def sdo_write_bitfield( Raises ------ NetworkError - Cannot send a SDO write message when the network is down. - SdoError + Cannot send a SDO read message when the network is down. + canopen.sdo.exceptions.SdoError Error with the SDO. """ - obj = self.sdo_get_obj(key, index, subindex) - - obj = self.sdo_get_obj(key, index, subindex) + obj = self._sdo_get_obj(key, index, subindex) bits = obj.od.bit_definitions[field] offset = min(bits) @@ -374,12 +304,12 @@ def sdo_write_enum( Raises ------ NetworkError - Cannot send a SDO write message when the network is down. - SdoError + Cannot send a SDO read message when the network is down. + canopen.sdo.exceptions.SdoError Error with the SDO. """ - obj = self.sdo_get_obj(key, index, subindex) + obj = self._sdo_get_obj(key, index, subindex) tmp = {d: v for v, d in obj.od.value_descriptions} obj.phys = tmp[value] @@ -396,8 +326,10 @@ def send_rpdo(self, rpdo: int, raise_error: bool = True): Raises ------ + ValueError + Invalud rpdo value. NetworkError - Cannot send a RPDO message when the network is down. + Cannot send a RPDO read message when the network is down. """ if rpdo < 1: diff --git a/pyproject.toml b/pyproject.toml index 5b7a14d..69a9ae7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,8 @@ linters = "pycodestyle,pyflakes,pylint,mccabe,mypy,radon" # W0102: Dangerous default value as argument # W0107: Unnecessary pass statement # R0903: Too few public methods -ignore = "E402,C901,C0103,E203,R0912,R0915,R901,R901,R0914,C0413,C0206,R1716,W1514,R0902,R0913,W0707,W0102,W0107,R0903" +# R0904: Too many public methods +ignore = "E402,C901,C0103,E203,R0912,R0915,R901,R901,R0914,C0413,C0206,R1716,W1514,R0902,R0913,W0707,W0102,W0107,R0903,R0904" max_line_length = 100 [[tool.pylama.files]] @@ -82,7 +83,7 @@ path = "*/_version.py" ignore = "C0114" [[tool.mypy.overrides]] -module = "canopen,oresat_configs,psutil" +module = "canopen,oresat_configs,psutil,canopen.sdo,canopen.sdo.exceptions,canopen.objectdictionary" ignore_missing_imports = true [tool.isort] diff --git a/tests/internals/resources/__init__.py b/tests/internals/resources/__init__.py index 7c4d278..d5e5320 100644 --- a/tests/internals/resources/__init__.py +++ b/tests/internals/resources/__init__.py @@ -25,7 +25,7 @@ def __init__(self): self._setup_node() - def send_tpdo(self, tpdo: int): + def send_tpdo(self, tpdo: int, raise_error: bool = True): pass # override to do nothing diff --git a/tests/internals/services/__init__.py b/tests/internals/services/__init__.py index 0d4e1c4..9d6e2c3 100644 --- a/tests/internals/services/__init__.py +++ b/tests/internals/services/__init__.py @@ -25,7 +25,7 @@ def __init__(self): self._setup_node() - def send_tpdo(self, tpdo: int): + def send_tpdo(self, tpdo: int, raise_error: bool = True): pass # override to do nothing