diff --git a/src/sim_explorer/case.py b/src/sim_explorer/case.py index 6f1e8b4..d1a3547 100644 --- a/src/sim_explorer/case.py +++ b/src/sim_explorer/case.py @@ -5,7 +5,7 @@ from collections.abc import Callable from datetime import datetime from pathlib import Path -from typing import Any, Iterable, List +from typing import Any, Iterable, Sequence import matplotlib.pyplot as plt import numpy as np @@ -487,7 +487,7 @@ class Cases: "assertion", "results_print_type", ) - assertion_results: List[AssertionResult] = [] + assertion_results: list[AssertionResult] = [] def __init__(self, spec: str | Path): self.file = Path(spec) # everything relative to the folder of this file! @@ -656,7 +656,7 @@ def case_by_name(self, name: str) -> Case | None: return found return None - def disect_variable(self, key: str, err_level: int = 2) -> tuple[str, dict, list | range]: + def disect_variable(self, key: str, err_level: int = 2) -> tuple[str, dict, Sequence]: """Extract the variable name, definition and explicit variable range, if relevant (multi-valued variables, where only some elements are addressed). ToDo: handle multi-dimensional arrays (tables, ...). @@ -668,7 +668,7 @@ def disect_variable(self, key: str, err_level: int = 2) -> tuple[str, dict, list ------- 1. The variable name as defined in the 'variables' section of the spec 2. The variable definition, which the name refers to - 3. An Iterable over indices of the variable, i.e. the range + 3. An Sequence over indices of the variable, i.e. the range """ def handle_error(msg: str, err: Exception | None, level: int): @@ -908,7 +908,7 @@ def _header_transform(self, tostring: bool = True): get_path(res.jspath("$.header.file", str, True), self.file.parent), ) - def add(self, time: float, comp: str, cvar: str, values: tuple | int | float | bool | str): + def add(self, time: float, comp: str, cvar: str, values: Sequence | int | float | bool | str): """Add the results of a get action to the results dict for the case. Args: @@ -918,7 +918,7 @@ def add(self, time: float, comp: str, cvar: str, values: tuple | int | float | b values (PyVal, tuple): the value(s) to record """ # print(f"Update ({time}): {comp}: {cvar} : {values}") - _values = values[0] if isinstance(values, (tuple, list)) and len(values) == 1 else values + _values = values[0] if isinstance(values, Sequence) and len(values) == 1 else values self.res.update("$[" + str(time) + "]" + comp, {cvar: _values}) def save(self, jsfile: str | Path = ""): @@ -984,12 +984,12 @@ def retrieve(self, comp_var: Iterable) -> list: """Retrieve from results js5-dict the variables and return (times, values). Args: - comp_var (Iterable): iterable of (, [, element]) + comp_var (Iterator): iterator over (, [, element]) Alternatively, the jspath syntax .[[element]] can be used as comp_var. - Time is not explicitly including in comp_var - A record is only included if all variable are found for a given time + Time is not explicitly included in comp_var + A record is only included if all variables are found for a given time Returns: - Data table (list of lists), time and one column per variable + Data table (list of lists): time and one column per variable """ data = [] _comp_var = [] @@ -1022,11 +1022,11 @@ def retrieve(self, comp_var: Iterable) -> list: data.append(record) return data - def plot_time_series(self, comp_var: Iterable, title: str = ""): + def plot_time_series(self, comp_var: Sequence, title: str = ""): """Extract the provided alias variables and plot the data found in the same plot. Args: - comp_var (Iterable): Iterable of (,) tuples (as used in retrieve) + comp_var (Sequence): Sequence of (,) tuples (as used in retrieve) Alternatively, the jspath syntax . is also accepted title (str): optional title of the plot """ diff --git a/src/sim_explorer/system_interface.py b/src/sim_explorer/system_interface.py index f6eac95..bb14c11 100644 --- a/src/sim_explorer/system_interface.py +++ b/src/sim_explorer/system_interface.py @@ -1,6 +1,6 @@ from enum import Enum from pathlib import Path -from typing import Any, TypeAlias +from typing import Any, Sequence, TypeAlias import numpy as np @@ -36,6 +36,9 @@ class SystemInterface: description (str)="": Optional possibility to provide a system description log_level (str) = 'fatal': Per default the level is set to 'fatal', but it can be set to 'trace', 'debug', 'info', 'warning', 'error' or 'fatal' (e.g. for debugging purposes) + **kwargs: Optional possibility to supply additional keyword arguments: + + * full_simulator_available=True to overwrite the oposite when called from a superclass """ def __init__( @@ -44,16 +47,23 @@ def __init__( name: str | None = None, description: str = "", log_level: str = "fatal", + **kwargs, ): self.structure_file = Path(structure_file) self.name = name # overwrite if the system includes that self.description = description # overwrite if the system includes that self.system_structure = SystemInterface.read_system_structure(self.structure_file) - self._models = self._get_models() + self._models, self.components = self._get_models_components() # self.simulator=None # derived classes override this to instantiate the system simulator self.message = "" # possibility to save additional message for (optional) retrieval by client self.log_level = log_level - self.full_simulator_available = False # only system and components specification available. No simulation! + if "full_simulator_available" in kwargs: + self.full_simulator_available = kwargs["full_simulator_available"] + else: + self.full_simulator_available = False # only system and components specification available. No simulation! + if not self.full_simulator_available: # we need a minimal version of variable info (no full ModelDescription) + for m, info in self._models.items(): + self._models[m].update({"variables": self._get_variables(info["source"])}) @property def path(self): @@ -82,36 +92,23 @@ def read_system_structure(file: Path, fmus_exist: bool = True): assert not fmus_exist or comp["source"].exists(), f"FMU {comp['source']} not found" return system_structure - @property - def components(self): - """Return an iterator over all components (instances). - Each component is represented by a dict , together with the stem of their fmu files. - - Note: there can be several instances per model (FMU) + def _get_models_components(self) -> tuple[dict, dict]: + """Get a dict of the models and a dict of components in the system: + {model-name : {'source':, 'components':[component-list], 'variables':{variables-dict} + {component-name : {'model':'model-name, }, ...}. """ + mods = {} + components = {} for k, v in self.system_structure["Simulators"].items(): source = v["source"] - yield (k, {"model": source.stem, "source": source}) - - def _get_models(self) -> dict: - """Get a dict of the models in the system: - { : {'source':, 'components':[component-list], 'variables':{variables-dict}. - """ - mods = {} - for k, v in self.components: - if v["model"] not in mods: - mods.update( - { - v["model"]: { - "source": v["source"], - "components": [k], - "variables": self._get_variables(v["source"]), - } - } - ) + model = source.stem + if model not in mods: + mods.update({model: {"source": source, "components": [k]}}) else: - mods[v["model"]]["components"].append(k) - return mods + mods[model]["components"].append(k) + assert k not in components, f"Duplicate component name {k} related to model {model} encountered" + components.update({k: model}) + return (mods, components) @property def models(self) -> dict: @@ -127,11 +124,11 @@ def match_components(self, comps: str | tuple[str, ...]) -> tuple[str, tuple]: collect = [] model = None for c in comps: - for k, v in self.components: + for k, m in self.components.items(): if match_with_wildcard(c, k): if model is None: - model = v["model"] - if v["model"] == model and k not in collect: + model = m + if m == model and k not in collect: collect.append(k) assert model is not None and len(collect), f"No component match for {comps}" return (model, tuple(collect)) @@ -141,7 +138,8 @@ def _get_variables(self, source: Path) -> dict[str, dict]: Returns ------- - A dictionary of variable {names:info, ...}, where info is a dictionary containing reference, type, causality and variability + A dictionary of variable {names:info, ...}, + where info is a dictionary containing reference, type, causality, variability and initial """ assert source.exists() and source.suffix == ".fmu", f"FMU file {source} not found or wrong suffix" md = from_xml(source, sub="modelDescription.xml") @@ -157,8 +155,22 @@ def _get_variables(self, source: Path) -> dict[str, dict]: variables.update({name: var}) return variables + def model_from_component(self, comp: str | int) -> str: + """Find the model name from the component name or index.""" + if isinstance(comp, str): + return self.components[comp] + elif isinstance(comp, int): + for i, mod in enumerate(self.components.values()): + if i == comp: + return mod + return "" + else: + raise AssertionError(f"Unallowed argument {comp} in 'variables'") + def variables(self, comp: str | int) -> dict: """Get the registered variables for a given component from the system. + This is the default version which works without the full modelDescription inside self._models. + Can be overridden by super-classes which have the modelDescription available. Args: comp (str, int): The component name or its index within the model @@ -167,19 +179,13 @@ def variables(self, comp: str | int) -> dict: ------- A dictionary of variable {names:info, ...}, where info is a dictionary containing reference, type, causality and variability """ - if isinstance(comp, str): - for k, c in self.components: - if k == comp: - return self.models[c["model"]]["variables"] - elif isinstance(comp, int): - for i, (_, c) in enumerate(self.components): - if i == comp: - return self.models[c["model"]]["variables"] - else: - raise AssertionError(f"Unallowed argument {comp} in 'variables'") - raise KeyError(f"Component {comp} not found. Avalable components: {list(self.components)}") from None + mod = self.model_from_component(comp) + try: + return self.models[mod]["variables"] + except Exception: + raise KeyError(f"Variables for {comp} not found. Components: {list(self.components.keys())}") from None - def variable_iter(self, variables: dict, flt: int | str | tuple | list): + def variable_iter(self, variables: dict, flt: int | str | Sequence): """Get the variable dicts of the variables refered to by ids. Returns: Iterator over the dicts of the selected variables @@ -248,14 +254,14 @@ def component_name_from_id(self, idx: int) -> str: """Retrieve the component name from the given index. Return an empty string if not found. """ - for i, (k, _) in enumerate(self.components): + for i, k in enumerate(self.components.keys()): if i == idx: return k return "" def component_id_from_name(self, name: str) -> int: """Get the component id from the name. -1 if not found.""" - for i, (k, _) in enumerate(self.components): + for i, k in enumerate(self.components.keys()): if k == name: return i return -1 @@ -313,7 +319,7 @@ def default_initial(causality: str, variability: str, only_default: bool = True) else: return init if only_default else (init,) - def allowed_action(self, action: str, comp: int | str, var: int | str | tuple | list, time: float): + def allowed_action(self, action: str, comp: int | str, var: int | str | Sequence, time: float): """Check whether the action would be allowed according to FMI2 rules, see FMI2.01, p.49. * if a tuple of variables is provided, the variables shall have equal properties @@ -417,9 +423,9 @@ def update_refs_values( def comp_model_var(self, cref: int, vref: int | tuple[int]): """Find the component name and the variable names from the provided reference(s).""" model = None - for i, (_comp, m) in enumerate(self.components): + for i, (_comp, m) in enumerate(self.components.items()): if i == cref: - model = m["model"] + model = m comp = _comp break assert model is not None, f"Model for component id {cref} not found" diff --git a/src/sim_explorer/system_interface_osp.py b/src/sim_explorer/system_interface_osp.py index 5be8266..263cf99 100644 --- a/src/sim_explorer/system_interface_osp.py +++ b/src/sim_explorer/system_interface_osp.py @@ -21,6 +21,9 @@ class SystemInterfaceOSP(SystemInterface): description (str)="": Optional possibility to provide a system description log_level (str) = 'fatal': Per default the level is set to 'fatal', but it can be set to 'trace', 'debug', 'info', 'warning', 'error' or 'fatal' (e.g. for debugging purposes) + **kwargs: Optional possibility to supply additional keyword arguments: + + * full_simulator_available=True to overwrite the oposite when called from a superclass """ def __init__( @@ -29,6 +32,7 @@ def __init__( name: str | None = None, description: str = "", log_level: str = "fatal", + **kwargs, ): super().__init__(structure_file, name, description, log_level) self.full_simulator_available = True # system and components specification + simulation capabilities diff --git a/src/sim_explorer/utils/osp.py b/src/sim_explorer/utils/osp.py index 8f0071c..e5fcfe2 100644 --- a/src/sim_explorer/utils/osp.py +++ b/src/sim_explorer/utils/osp.py @@ -78,7 +78,6 @@ def make_initial_value(var: str, val: bool | int | float | str): _simulators = ET.Element("Simulators") if simulators is not None: for m, props in simulators.items(): - print("COMPONENT", m, props) simulator = ET.Element( "Simulator", { diff --git a/tests/data/Oscillator/DrivingForce.fmu b/tests/data/Oscillator/DrivingForce.fmu index 7efcf5e..c9a42d4 100644 Binary files a/tests/data/Oscillator/DrivingForce.fmu and b/tests/data/Oscillator/DrivingForce.fmu differ diff --git a/tests/data/Oscillator/HarmonicOscillator.fmu b/tests/data/Oscillator/HarmonicOscillator.fmu index c71642d..194c0f2 100644 Binary files a/tests/data/Oscillator/HarmonicOscillator.fmu and b/tests/data/Oscillator/HarmonicOscillator.fmu differ diff --git a/tests/test_case.py b/tests/test_case.py index 5d82dae..7bff43c 100644 --- a/tests/test_case.py +++ b/tests/test_case.py @@ -189,8 +189,9 @@ def test_case_set_get(simpletable): if __name__ == "__main__": retcode = pytest.main(["-rA", "-v", __file__]) assert retcode == 0, f"Non-zero return code {retcode}" - # import os - # os.chdir(Path(__file__).parent.absolute() / "test_working_directory") + import os + + os.chdir(Path(__file__).parent.absolute() / "test_working_directory") # test_fixture(_simpletable()) # test_case_at_time(_simpletable()) # test_case_range(_simpletable()) diff --git a/tests/test_run_mobilecrane.py b/tests/test_run_mobilecrane.py index 297e8fb..83ab5af 100644 --- a/tests/test_run_mobilecrane.py +++ b/tests/test_run_mobilecrane.py @@ -147,9 +147,9 @@ def initial_settings(): assert system.exists(), f"OspSystemStructure file {system} not found" sim = SystemInterfaceOSP(system) sim.init_simulator() - print("COMP", {k: v for k, v in sim.components}) - expected = {k: v for k, v in sim.components} - assert isinstance(expected["mobileCrane"], dict), f"Found components {expected}" + print("COMP", {k: v for k, v in sim.components.items()}) + expected = {k: v for k, v in sim.components.items()} + assert isinstance(expected["mobileCrane"], str), f"Found components {expected}" path = Path(Path(__file__).parent, "data/MobileCrane/MobileCrane.cases") assert path.exists(), "Cases file not found" diff --git a/tests/test_system_interface.py b/tests/test_system_interface.py index e30e876..5dd3dfd 100644 --- a/tests/test_system_interface.py +++ b/tests/test_system_interface.py @@ -33,17 +33,18 @@ def test_pytype(): def test_interface(): sys = SystemInterface(Path(__file__).parent / "data" / "MobileCrane" / "crane_table.js5") - st = Path(__file__).parent / "data" / "SimpleTable" / "SimpleTable.fmu" - sys.system_structure["Simulators"].update({"simpleTable2": {"source": st, "stepSize": "0.01"}}) - assert [k for k, _ in sys.components] == ["simpleTable", "mobileCrane", "simpleTable2"] + # manually adding another SimpleTable to the system + sys._models["SimpleTable"]["components"].append("simpleTable2") + sys.components.update({"simpleTable2": "SimpleTable"}) + assert isinstance(sys, SystemInterface) + assert list(sys.components.keys()) == ["simpleTable", "mobileCrane", "simpleTable2"] assert len(sys.models) == 2 assert tuple(sys.models.keys()) == ("SimpleTable", "MobileCrane"), f"Found:{sys.models}" m = sys.match_components("simple*") assert m[0] == "SimpleTable", f"Found {m[0]}" assert m[1] == ("simpleTable", "simpleTable2") - for k, _ in sys.components: + for k in sys.components.keys(): assert sys.component_name_from_id(sys.component_id_from_name(k)) == k - vars = sys.variables("simpleTable") assert vars["interpolate"]["causality"] == "parameter" assert vars["interpolate"]["type"] is bool, f"Found {vars['interpolate']['type']}" diff --git a/tests/test_system_interface_osp.py b/tests/test_system_interface_osp.py index e6d7290..d971770 100644 --- a/tests/test_system_interface_osp.py +++ b/tests/test_system_interface_osp.py @@ -87,14 +87,13 @@ def test_simulator_from_system_structure(): path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") system = SystemInterfaceOSP(str(path), name="BouncingBall") assert system.name == "BouncingBall", f"System.name should be BouncingBall. Found {system.name}" - comps = {k: v for (k, v) in system.components} - assert "bb" in comps, f"Instance name 'bb' expected. Found instances {list(comps.keys())}" - assert len(comps) == 3 + assert "bb" in system.components, f"Instance name 'bb' expected. Found instances {list(system.components.keys())}" + assert len(system.components) == 3 assert len(system.models) == 1 assert "BouncingBall" in system.models # system.check_instances_variables() variables = system.variables("bb") - print(f"g: {variables['g']}") + # print(f"g: {variables['g']}") assert variables["g"]["reference"] == 5 assert variables["g"]["type"] is float assert variables["g"]["causality"] == "parameter"