Skip to content

Commit

Permalink
fix linting errors
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanpdx committed May 19, 2024
1 parent 3d1dac3 commit 455e682
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 91 deletions.
6 changes: 6 additions & 0 deletions olaf/canopen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class EmcyCode(IntEnum):


class OdDataType(IntEnum):
"""Object dictionary data types defined by CiA 301."""

BOOLEAN = 0x0001
INTEGER8 = 0x0002
INTEGER16 = 0x0003
Expand All @@ -95,7 +97,9 @@ class OdDataType(IntEnum):
INTEGER64 = 0x0015
UNSIGNED64 = 0x001B

@property
def is_int(self) -> bool:
"""bool: Check if type is a integer type."""
return self.value in [
self.INTEGER8,
self.INTEGER16,
Expand All @@ -108,7 +112,9 @@ def is_int(self) -> bool:
]

def is_float(self) -> bool:
"""bool: Check if type is a float type."""
return self.value in [self.REAL32, self.REAL64]

def is_str(self) -> bool:
"""bool: Check if type is a str type."""
return self.value in [self.VISIBLE_STRING, self.OCTET_STRING]
106 changes: 19 additions & 87 deletions olaf/canopen/master_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@

import canopen
from canopen.sdo import SdoArray, SdoRecord, SdoVariable
from canopen.sdo.exceptions import SdoError
from loguru import logger

from ..canopen.network import CanNetwork, CanNetworkError, CanNetworkState
from ..canopen.network import CanNetwork
from .node import Node

NodeHeartbeatInfo = namedtuple("NodeHeartbeatInfo", ["state", "timestamp", "time_since_boot"])
Expand Down Expand Up @@ -90,73 +89,6 @@ def send_sync(self):

self._network.send_message(0x80, b"", False)

def sdo_read(self, key: Any, index: Union[int, str], subindex: Union[int, str]) -> Any:
"""
Read a value from a remote node's object dictionary using an SDO.
Parameters
----------
key: Any
The dict key for the node to read from.
index: int or str
The index to read from.
subindex: int or str
The subindex to read from.
Raises
------
CanNetworkError
Cannot send a SDO read message when the network is down.
canopen.SdoError
Error with the SDO.
Returns
-------
Any
The value read.
"""

if self._network.status == CanNetworkState.NETWORK_UP:
raise CanNetworkError("network is down cannot send an SDO read message")

if subindex == 0 and isinstance(self._od_db[key][index], canopen.objectdictionary.Variable):
value = self._remote_nodes[key].sdo[index].raw
else:
value = self._remote_nodes[key].sdo[index][subindex].raw

return value

def sdo_write(self, key: Any, index: int, subindex: int, value: Any):
"""
Write a value to a remote node's object dictionary using an SDO.
Parameters
----------
key: Any
The dict key for the node to write to.
index: int
The index to write to.
subindex: int
The subindex to write to.
value: Any
The value to write.
Raises
------
CanNetworkError
Cannot send a SDO write message when the network is down.
canopen.SdoError
Error with the SDO.
"""

if self._network.status == CanNetworkState.NETWORK_UP:
raise CanNetworkError("network is down cannot send an SDO write message")

if subindex == 0 and isinstance(self._od_db[key][index], canopen.objectdictionary.Variable):
self._remote_nodes[key].sdo[index].raw = value
else:
self._remote_nodes[key].sdo[index][subindex].raw = value

@property
def remote_nodes(self) -> dict[Any, canopen.RemoteNode]:
"""dict[Any, canopen.RemoteNode]: All other node as remote node."""
Expand All @@ -167,7 +99,7 @@ def od_db(self) -> dict[Any, canopen.ObjectDictionary]:
"""dict[Any, canopen.ObjectDictionary]: All other node ODs."""
return self._od_db

def sdo_get_obj(
def _sdo_get_obj(
self, key: Any, index: Union[int, str], subindex: Union[int, str, None]
) -> [SdoVariable, SdoArray, SdoRecord]:

Expand All @@ -194,7 +126,7 @@ def sdo_read(
------
NetworkError
Cannot send a SDO read message when the network is down.
SdoError
canopen.sdo.exceptions.SdoError
Error with the SDO.
Returns
Expand All @@ -203,7 +135,7 @@ def sdo_read(
The value read.
"""

return self.sdo_get_obj(key, index, subindex).phys
return self._sdo_get_obj(key, index, subindex).phys

def sdo_read_bitfield(
self, key: Any, index: Union[int, str], subindex: Union[int, str, None], field: str
Expand All @@ -224,7 +156,7 @@ def sdo_read_bitfield(
------
NetworkError
Cannot send a SDO read message when the network is down.
SdoError
canopen.sdo.exceptions.SdoError
Error with the SDO.
Returns
Expand All @@ -233,7 +165,7 @@ def sdo_read_bitfield(
The field value.
"""

obj = self.sdo_get_obj(key, index, subindex)
obj = self._sdo_get_obj(key, index, subindex)
bits = obj.od.bit_definitions[field]

value = 0
Expand Down Expand Up @@ -262,7 +194,7 @@ def sdo_read_enum(
------
NetworkError
Cannot send a SDO read message when the network is down.
SdoError
canopen.sdo.exceptions.SdoError
Error with the SDO.
Returns
Expand All @@ -271,7 +203,7 @@ def sdo_read_enum(
The enum str value.
"""

obj = self.sdo_get_obj(key, index, subindex)
obj = self._sdo_get_obj(key, index, subindex)
obj_value = obj.phys
return obj.od.value_descriptions[obj_value]

Expand Down Expand Up @@ -300,11 +232,11 @@ def sdo_write(
------
NetworkError
Cannot send a SDO write message when the network is down.
canopen.SdoError
canopen.sdo.exceptions.SdoError
Error with the SDO.
"""

obj = self.sdo_get_obj(key, index, subindex)
obj = self._sdo_get_obj(key, index, subindex)
obj.phys = value

def sdo_write_bitfield(
Expand Down Expand Up @@ -334,14 +266,12 @@ def sdo_write_bitfield(
Raises
------
NetworkError
Cannot send a SDO write message when the network is down.
SdoError
Cannot send a SDO read message when the network is down.
canopen.sdo.exceptions.SdoError
Error with the SDO.
"""

obj = self.sdo_get_obj(key, index, subindex)

obj = self.sdo_get_obj(key, index, subindex)
obj = self._sdo_get_obj(key, index, subindex)
bits = obj.od.bit_definitions[field]
offset = min(bits)

Expand Down Expand Up @@ -374,12 +304,12 @@ def sdo_write_enum(
Raises
------
NetworkError
Cannot send a SDO write message when the network is down.
SdoError
Cannot send a SDO read message when the network is down.
canopen.sdo.exceptions.SdoError
Error with the SDO.
"""

obj = self.sdo_get_obj(key, index, subindex)
obj = self._sdo_get_obj(key, index, subindex)
tmp = {d: v for v, d in obj.od.value_descriptions}
obj.phys = tmp[value]

Expand All @@ -396,8 +326,10 @@ def send_rpdo(self, rpdo: int, raise_error: bool = True):
Raises
------
ValueError
Invalud rpdo value.
NetworkError
Cannot send a RPDO message when the network is down.
Cannot send a RPDO read message when the network is down.
"""

if rpdo < 1:
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ linters = "pycodestyle,pyflakes,pylint,mccabe,mypy,radon"
# W0102: Dangerous default value as argument
# W0107: Unnecessary pass statement
# R0903: Too few public methods
ignore = "E402,C901,C0103,E203,R0912,R0915,R901,R901,R0914,C0413,C0206,R1716,W1514,R0902,R0913,W0707,W0102,W0107,R0903"
# R0904: Too many public methods
ignore = "E402,C901,C0103,E203,R0912,R0915,R901,R901,R0914,C0413,C0206,R1716,W1514,R0902,R0913,W0707,W0102,W0107,R0903,R0904"
max_line_length = 100

[[tool.pylama.files]]
Expand All @@ -82,7 +83,7 @@ path = "*/_version.py"
ignore = "C0114"

[[tool.mypy.overrides]]
module = "canopen,oresat_configs,psutil"
module = "canopen,oresat_configs,psutil,canopen.sdo,canopen.sdo.exceptions,canopen.objectdictionary"
ignore_missing_imports = true

[tool.isort]
Expand Down
2 changes: 1 addition & 1 deletion tests/internals/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self):

self._setup_node()

def send_tpdo(self, tpdo: int):
def send_tpdo(self, tpdo: int, raise_error: bool = True):
pass # override to do nothing


Expand Down
2 changes: 1 addition & 1 deletion tests/internals/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self):

self._setup_node()

def send_tpdo(self, tpdo: int):
def send_tpdo(self, tpdo: int, raise_error: bool = True):
pass # override to do nothing


Expand Down

0 comments on commit 455e682

Please sign in to comment.