Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Add support for PDO extensions close #58 #68
Browse files Browse the repository at this point in the history
  • Loading branch information
Boneill3 committed Jun 10, 2021
1 parent 2f85fbb commit fe36aa4
Show file tree
Hide file tree
Showing 5 changed files with 2,892 additions and 264 deletions.
20 changes: 6 additions & 14 deletions canopen_monitor/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,20 @@
from .app import App
from .meta import Meta
from .can import MagicCANBus, MessageTable
from .parse import CANOpenParser, load_eds_file, EDS, DataType
from .parse import CANOpenParser, load_eds_files, EDS, DataType


def init_dirs():
os.makedirs(CONFIG_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)


def load_eds_files(filepath: str = CACHE_DIR) -> dict:
configs = {}
for file in os.listdir(filepath):
full_path = f'{filepath}/{file}'
if file.lower().endswith(".eds") or file.lower().endswith(".dcf"):
config = load_eds_file(full_path)
configs[config.node_id] = config
return configs


def enable_ecss_time(configs: dict) -> None:
for od in configs:
if '0x2101' in od:
od['0x2101'].data_type = DataType.ECSS_TIME.value
pass
# for od in configs:
# if 0x2101 in od:
# od[0x2101].data_type = DataType.ECSS_TIME.value


def main():
Expand Down Expand Up @@ -61,7 +53,7 @@ def main():
init_dirs()
meta = Meta(CONFIG_DIR, CACHE_DIR)
features = meta.load_features()
eds_configs = load_eds_files()
eds_configs = load_eds_files(CACHE_DIR)
if features.ecss_time:
enable_ecss_time(eds_configs)
mt = MessageTable(CANOpenParser(eds_configs))
Expand Down
3 changes: 2 additions & 1 deletion canopen_monitor/parse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
for parsing CANOpen messages according to Object Definiton files or Electronic
Data Sheet files, provided by the end user.
"""
from .eds import EDS, load_eds_file
from .eds import EDS, load_eds_file, load_eds_files
from .canopen import CANOpenParser
from .utilities import DataType

Expand All @@ -11,4 +11,5 @@
'EDS',
'load_eds_file',
'DataType',
'load_eds_files'
]
123 changes: 110 additions & 13 deletions canopen_monitor/parse/eds.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import copy
import string
from re import finditer
from typing import Union
from dateutil.parser import parse as dtparse
import os


def camel_to_snake(old_str: str) -> str:
Expand Down Expand Up @@ -108,12 +111,14 @@ def __init__(self, data, index: Union[str, int], is_sub=False):
value = convert_value(value)

self.__setattr__(camel_to_snake(key), value)

"""
Add a subindex to an index object
:param index: The subindex being added
:type Index
:raise ValueError: A subindex has already been added a this subindex
"""

def add(self, index: Index) -> None:
if self.sub_indices.setdefault(int(index.index), index) != index:
raise ValueError
Expand All @@ -124,6 +129,7 @@ def add(self, index: Index) -> None:
:type Index
:raise ValueError: A subindex has already been added a this subindex
"""

def __getitem__(self, key: int):
if key not in self.sub_indices:
raise KeyError(f"{self.index}sub{key}")
Expand All @@ -149,14 +155,90 @@ def convert_value(value: str) -> Union[int, str]:
return value


class EDS:
class OD:
def __init__(self):
self.node_id = None
self.indices = {}
self.device_commissioning = None
# tools section is optional per CiA 306
self.tools = None
self.file_info = None
self.device_info = None
self.dummy_usage = None
# comments section is optional per CiA 306
self.comments = None
self.mandatory_objects = None
self.optional_objects = None
self.manufacturer_objects = None

def extended_pdo_definition(self, offset: int) -> OD:
# TODO: Move to constant with message types
pdo_tx = 0x1A00
pdo_tx_offset = 0x1A00 + (offset * 4)
pdo_rx = 0x1600
pdo_rx_offset = 0x1600 + (offset * 4)
node = OD()
node.node_id = copy.deepcopy(self.node_id)
node.device_commissioning = copy.deepcopy(self.device_commissioning)
node.tools = copy.deepcopy(self.tools)
node.file_info = copy.deepcopy(self.file_info)
node.device_info = copy.deepcopy(self.device_info)
node.dummy_usage = copy.deepcopy(self.dummy_usage)
node.comments = copy.deepcopy(self.dummy_usage)
node.mandatory_objects = copy.deepcopy(self.dummy_usage)
node.optional_objects = copy.deepcopy(self.optional_objects)
node.manufacturer_objects = copy.deepcopy(self.manufacturer_objects)

if (pdo_tx_offset not in self and pdo_rx_offset not in self) or \
(self[pdo_tx_offset].parameter_name != "TPDO mapping parameter"
and self[pdo_rx_offset].parameter_name != "RPDO mapping parameter"):

raise KeyError("Extended PDO definitions not found")

self.get_pdo_offset(node, pdo_tx, pdo_tx_offset)
self.get_pdo_offset(node, pdo_rx, pdo_rx_offset)

return node

def get_pdo_offset(self, node: OD, start: int, offset: int):
while offset in self:
node[start] = copy.deepcopy(self[offset])
start += 1
offset += 1
if start % 4 == 0:
break

def __len__(self) -> int:
return sum(map(lambda x: len(x), self.indices.values()))

def __getitem__(self, key: Union[int, str]) -> Index:
callable = hex if type(key) == int else str
key = callable(key)
if key not in self.indices:
raise KeyError(key[2:])

return self.indices[key]

def __setitem__(self, key, value):
callable = hex if type(key) == int else str
key = callable(key)
self.indices[key] = value

def __contains__(self, item):
callable = hex if type(item) == int else str
item = callable(item)
return item in self.indices


class EDS(OD):
def __init__(self, eds_data: [str]):
"""Parse the array of EDS lines into a dictionary of Metadata/Index
objects.
:param eds_data: The list of raw lines from the EDS file.
:type eds_data: [str]
"""
super().__init__()
self.indices = {}

prev = 0
Expand Down Expand Up @@ -184,24 +266,13 @@ def __init__(self, eds_data: [str]):
Metadata(section[1:]))
prev = i + 1

if hasattr(self, 'device_commissioning'):
if self.device_commissioning is not None:
self.node_id = convert_value(self.device_commissioning.node_id)
elif '0x2101' in self.indices.keys():
self.node_id = self['0x2101'].default_value
else:
self.node_id = None

def __len__(self) -> int:
return sum(map(lambda x: len(x), self.indices.values()))

def __getitem__(self, key: Union[int, str]) -> Index:
callable = hex if type(key) == int else str
key = callable(key)
if key not in self.indices:
raise KeyError(key[2:])

return self.indices[callable(key)]


def load_eds_file(filepath: str) -> EDS:
"""Read in the EDS file, grab the raw lines, strip them of all escaped
Expand All @@ -216,3 +287,29 @@ def load_eds_file(filepath: str) -> EDS:
"""
with open(filepath) as file:
return EDS(list(map(lambda x: x.strip(), file.read().split('\n'))))


def load_eds_files(filepath: str) -> dict:
"""Read a directory of OD files
:param filepath: Directory to load files from
:type filepath: str
:return: dictionary of OD files with node id as key and OD as value
:rtype: dict
"""
configs = {}
for file in os.listdir(filepath):
full_path = f'{filepath}/{file}'
if file.lower().endswith(".eds") or file.lower().endswith(".dcf"):
config = load_eds_file(full_path)
configs[config.node_id] = config
try:
i = 1
while True:
extended_node = config.extended_pdo_definition(i)
configs[config.node_id+i] = extended_node
i += 1
except KeyError:
...

return configs
Loading

0 comments on commit fe36aa4

Please sign in to comment.