Skip to content

Commit

Permalink
Another step in the abstraction of system_interface. variables as fun…
Browse files Browse the repository at this point in the history
…ction
  • Loading branch information
eisDNV committed Jan 17, 2025
1 parent 4c63691 commit d6afd4f
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 77 deletions.
24 changes: 12 additions & 12 deletions src/sim_explorer/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, List
from typing import Any, Iterable, Sequence

import matplotlib.pyplot as plt
import numpy as np
Expand Down Expand Up @@ -487,7 +487,7 @@ class Cases:
"assertion",
"results_print_type",
)
assertion_results: List[AssertionResult] = []
assertion_results: list[AssertionResult] = []

def __init__(self, spec: str | Path):
self.file = Path(spec) # everything relative to the folder of this file!
Expand Down Expand Up @@ -656,7 +656,7 @@ def case_by_name(self, name: str) -> Case | None:
return found
return None

def disect_variable(self, key: str, err_level: int = 2) -> tuple[str, dict, list | range]:
def disect_variable(self, key: str, err_level: int = 2) -> tuple[str, dict, Sequence]:
"""Extract the variable name, definition and explicit variable range, if relevant
(multi-valued variables, where only some elements are addressed).
ToDo: handle multi-dimensional arrays (tables, ...).
Expand All @@ -668,7 +668,7 @@ def disect_variable(self, key: str, err_level: int = 2) -> tuple[str, dict, list
-------
1. The variable name as defined in the 'variables' section of the spec
2. The variable definition, which the name refers to
3. An Iterable over indices of the variable, i.e. the range
3. An Sequence over indices of the variable, i.e. the range
"""

def handle_error(msg: str, err: Exception | None, level: int):
Expand Down Expand Up @@ -908,7 +908,7 @@ def _header_transform(self, tostring: bool = True):
get_path(res.jspath("$.header.file", str, True), self.file.parent),
)

def add(self, time: float, comp: str, cvar: str, values: tuple | int | float | bool | str):
def add(self, time: float, comp: str, cvar: str, values: Sequence | int | float | bool | str):
"""Add the results of a get action to the results dict for the case.
Args:
Expand All @@ -918,7 +918,7 @@ def add(self, time: float, comp: str, cvar: str, values: tuple | int | float | b
values (PyVal, tuple): the value(s) to record
"""
# print(f"Update ({time}): {comp}: {cvar} : {values}")
_values = values[0] if isinstance(values, (tuple, list)) and len(values) == 1 else values
_values = values[0] if isinstance(values, Sequence) and len(values) == 1 else values
self.res.update("$[" + str(time) + "]" + comp, {cvar: _values})

def save(self, jsfile: str | Path = ""):
Expand Down Expand Up @@ -984,12 +984,12 @@ def retrieve(self, comp_var: Iterable) -> list:
"""Retrieve from results js5-dict the variables and return (times, values).
Args:
comp_var (Iterable): iterable of (<component-name>, <variable_name>[, element])
comp_var (Iterator): iterator over (<component-name>, <variable_name>[, element])
Alternatively, the jspath syntax <component-name>.<variable_name>[[element]] can be used as comp_var.
Time is not explicitly including in comp_var
A record is only included if all variable are found for a given time
Time is not explicitly included in comp_var
A record is only included if all variables are found for a given time
Returns:
Data table (list of lists), time and one column per variable
Data table (list of lists): time and one column per variable
"""
data = []
_comp_var = []
Expand Down Expand Up @@ -1022,11 +1022,11 @@ def retrieve(self, comp_var: Iterable) -> list:
data.append(record)
return data

def plot_time_series(self, comp_var: Iterable, title: str = ""):
def plot_time_series(self, comp_var: Sequence, title: str = ""):
"""Extract the provided alias variables and plot the data found in the same plot.
Args:
comp_var (Iterable): Iterable of (<component-instance>,<variable>) tuples (as used in retrieve)
comp_var (Sequence): Sequence of (<component-instance>,<variable>) tuples (as used in retrieve)
Alternatively, the jspath syntax <component>.<variable> is also accepted
title (str): optional title of the plot
"""
Expand Down
106 changes: 56 additions & 50 deletions src/sim_explorer/system_interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum
from pathlib import Path
from typing import Any, TypeAlias
from typing import Any, Sequence, TypeAlias

import numpy as np

Expand Down Expand Up @@ -36,6 +36,9 @@ class SystemInterface:
description (str)="": Optional possibility to provide a system description
log_level (str) = 'fatal': Per default the level is set to 'fatal',
but it can be set to 'trace', 'debug', 'info', 'warning', 'error' or 'fatal' (e.g. for debugging purposes)
**kwargs: Optional possibility to supply additional keyword arguments:
* full_simulator_available=True to overwrite the oposite when called from a superclass
"""

def __init__(
Expand All @@ -44,16 +47,23 @@ def __init__(
name: str | None = None,
description: str = "",
log_level: str = "fatal",
**kwargs,
):
self.structure_file = Path(structure_file)
self.name = name # overwrite if the system includes that
self.description = description # overwrite if the system includes that
self.system_structure = SystemInterface.read_system_structure(self.structure_file)
self._models = self._get_models()
self._models, self.components = self._get_models_components()
# self.simulator=None # derived classes override this to instantiate the system simulator
self.message = "" # possibility to save additional message for (optional) retrieval by client
self.log_level = log_level
self.full_simulator_available = False # only system and components specification available. No simulation!
if "full_simulator_available" in kwargs:
self.full_simulator_available = kwargs["full_simulator_available"]
else:
self.full_simulator_available = False # only system and components specification available. No simulation!
if not self.full_simulator_available: # we need a minimal version of variable info (no full ModelDescription)
for m, info in self._models.items():
self._models[m].update({"variables": self._get_variables(info["source"])})

@property
def path(self):
Expand Down Expand Up @@ -82,36 +92,23 @@ def read_system_structure(file: Path, fmus_exist: bool = True):
assert not fmus_exist or comp["source"].exists(), f"FMU {comp['source']} not found"
return system_structure

@property
def components(self):
"""Return an iterator over all components (instances).
Each component is represented by a dict , together with the stem of their fmu files.
Note: there can be several instances per model (FMU)
def _get_models_components(self) -> tuple[dict, dict]:
"""Get a dict of the models and a dict of components in the system:
{model-name : {'source':<source>, 'components':[component-list], 'variables':{variables-dict}
{component-name : {'model':'model-name, }, ...}.
"""
mods = {}
components = {}
for k, v in self.system_structure["Simulators"].items():
source = v["source"]
yield (k, {"model": source.stem, "source": source})

def _get_models(self) -> dict:
"""Get a dict of the models in the system:
{<name> : {'source':<source>, 'components':[component-list], 'variables':{variables-dict}.
"""
mods = {}
for k, v in self.components:
if v["model"] not in mods:
mods.update(
{
v["model"]: {
"source": v["source"],
"components": [k],
"variables": self._get_variables(v["source"]),
}
}
)
model = source.stem
if model not in mods:
mods.update({model: {"source": source, "components": [k]}})
else:
mods[v["model"]]["components"].append(k)
return mods
mods[model]["components"].append(k)
assert k not in components, f"Duplicate component name {k} related to model {model} encountered"
components.update({k: model})
return (mods, components)

@property
def models(self) -> dict:
Expand All @@ -127,11 +124,11 @@ def match_components(self, comps: str | tuple[str, ...]) -> tuple[str, tuple]:
collect = []
model = None
for c in comps:
for k, v in self.components:
for k, m in self.components.items():
if match_with_wildcard(c, k):
if model is None:
model = v["model"]
if v["model"] == model and k not in collect:
model = m
if m == model and k not in collect:
collect.append(k)
assert model is not None and len(collect), f"No component match for {comps}"
return (model, tuple(collect))
Expand All @@ -141,7 +138,8 @@ def _get_variables(self, source: Path) -> dict[str, dict]:
Returns
-------
A dictionary of variable {names:info, ...}, where info is a dictionary containing reference, type, causality and variability
A dictionary of variable {names:info, ...},
where info is a dictionary containing reference, type, causality, variability and initial
"""
assert source.exists() and source.suffix == ".fmu", f"FMU file {source} not found or wrong suffix"
md = from_xml(source, sub="modelDescription.xml")
Expand All @@ -157,8 +155,22 @@ def _get_variables(self, source: Path) -> dict[str, dict]:
variables.update({name: var})
return variables

def model_from_component(self, comp: str | int) -> str:
"""Find the model name from the component name or index."""
if isinstance(comp, str):
return self.components[comp]
elif isinstance(comp, int):
for i, mod in enumerate(self.components.values()):
if i == comp:
return mod
return ""
else:
raise AssertionError(f"Unallowed argument {comp} in 'variables'")

def variables(self, comp: str | int) -> dict:
"""Get the registered variables for a given component from the system.
This is the default version which works without the full modelDescription inside self._models.
Can be overridden by super-classes which have the modelDescription available.
Args:
comp (str, int): The component name or its index within the model
Expand All @@ -167,19 +179,13 @@ def variables(self, comp: str | int) -> dict:
-------
A dictionary of variable {names:info, ...}, where info is a dictionary containing reference, type, causality and variability
"""
if isinstance(comp, str):
for k, c in self.components:
if k == comp:
return self.models[c["model"]]["variables"]
elif isinstance(comp, int):
for i, (_, c) in enumerate(self.components):
if i == comp:
return self.models[c["model"]]["variables"]
else:
raise AssertionError(f"Unallowed argument {comp} in 'variables'")
raise KeyError(f"Component {comp} not found. Avalable components: {list(self.components)}") from None
mod = self.model_from_component(comp)
try:
return self.models[mod]["variables"]
except Exception:
raise KeyError(f"Variables for {comp} not found. Components: {list(self.components.keys())}") from None

def variable_iter(self, variables: dict, flt: int | str | tuple | list):
def variable_iter(self, variables: dict, flt: int | str | Sequence):
"""Get the variable dicts of the variables refered to by ids.
Returns: Iterator over the dicts of the selected variables
Expand Down Expand Up @@ -248,14 +254,14 @@ def component_name_from_id(self, idx: int) -> str:
"""Retrieve the component name from the given index.
Return an empty string if not found.
"""
for i, (k, _) in enumerate(self.components):
for i, k in enumerate(self.components.keys()):
if i == idx:
return k
return ""

def component_id_from_name(self, name: str) -> int:
"""Get the component id from the name. -1 if not found."""
for i, (k, _) in enumerate(self.components):
for i, k in enumerate(self.components.keys()):
if k == name:
return i
return -1
Expand Down Expand Up @@ -313,7 +319,7 @@ def default_initial(causality: str, variability: str, only_default: bool = True)
else:
return init if only_default else (init,)

def allowed_action(self, action: str, comp: int | str, var: int | str | tuple | list, time: float):
def allowed_action(self, action: str, comp: int | str, var: int | str | Sequence, time: float):
"""Check whether the action would be allowed according to FMI2 rules, see FMI2.01, p.49.
* if a tuple of variables is provided, the variables shall have equal properties
Expand Down Expand Up @@ -417,9 +423,9 @@ def update_refs_values(
def comp_model_var(self, cref: int, vref: int | tuple[int]):
"""Find the component name and the variable names from the provided reference(s)."""
model = None
for i, (_comp, m) in enumerate(self.components):
for i, (_comp, m) in enumerate(self.components.items()):
if i == cref:
model = m["model"]
model = m
comp = _comp
break
assert model is not None, f"Model for component id {cref} not found"
Expand Down
4 changes: 4 additions & 0 deletions src/sim_explorer/system_interface_osp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class SystemInterfaceOSP(SystemInterface):
description (str)="": Optional possibility to provide a system description
log_level (str) = 'fatal': Per default the level is set to 'fatal',
but it can be set to 'trace', 'debug', 'info', 'warning', 'error' or 'fatal' (e.g. for debugging purposes)
**kwargs: Optional possibility to supply additional keyword arguments:
* full_simulator_available=True to overwrite the oposite when called from a superclass
"""

def __init__(
Expand All @@ -29,6 +32,7 @@ def __init__(
name: str | None = None,
description: str = "",
log_level: str = "fatal",
**kwargs,
):
super().__init__(structure_file, name, description, log_level)
self.full_simulator_available = True # system and components specification + simulation capabilities
Expand Down
1 change: 0 additions & 1 deletion src/sim_explorer/utils/osp.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def make_initial_value(var: str, val: bool | int | float | str):
_simulators = ET.Element("Simulators")
if simulators is not None:
for m, props in simulators.items():
print("COMPONENT", m, props)
simulator = ET.Element(
"Simulator",
{
Expand Down
Binary file modified tests/data/Oscillator/DrivingForce.fmu
Binary file not shown.
Binary file modified tests/data/Oscillator/HarmonicOscillator.fmu
Binary file not shown.
5 changes: 3 additions & 2 deletions tests/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ def test_case_set_get(simpletable):
if __name__ == "__main__":
retcode = pytest.main(["-rA", "-v", __file__])
assert retcode == 0, f"Non-zero return code {retcode}"
# import os
# os.chdir(Path(__file__).parent.absolute() / "test_working_directory")
import os

os.chdir(Path(__file__).parent.absolute() / "test_working_directory")
# test_fixture(_simpletable())
# test_case_at_time(_simpletable())
# test_case_range(_simpletable())
Expand Down
6 changes: 3 additions & 3 deletions tests/test_run_mobilecrane.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ def initial_settings():
assert system.exists(), f"OspSystemStructure file {system} not found"
sim = SystemInterfaceOSP(system)
sim.init_simulator()
print("COMP", {k: v for k, v in sim.components})
expected = {k: v for k, v in sim.components}
assert isinstance(expected["mobileCrane"], dict), f"Found components {expected}"
print("COMP", {k: v for k, v in sim.components.items()})
expected = {k: v for k, v in sim.components.items()}
assert isinstance(expected["mobileCrane"], str), f"Found components {expected}"

path = Path(Path(__file__).parent, "data/MobileCrane/MobileCrane.cases")
assert path.exists(), "Cases file not found"
Expand Down
11 changes: 6 additions & 5 deletions tests/test_system_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ def test_pytype():

def test_interface():
sys = SystemInterface(Path(__file__).parent / "data" / "MobileCrane" / "crane_table.js5")
st = Path(__file__).parent / "data" / "SimpleTable" / "SimpleTable.fmu"
sys.system_structure["Simulators"].update({"simpleTable2": {"source": st, "stepSize": "0.01"}})
assert [k for k, _ in sys.components] == ["simpleTable", "mobileCrane", "simpleTable2"]
# manually adding another SimpleTable to the system
sys._models["SimpleTable"]["components"].append("simpleTable2")
sys.components.update({"simpleTable2": "SimpleTable"})
assert isinstance(sys, SystemInterface)
assert list(sys.components.keys()) == ["simpleTable", "mobileCrane", "simpleTable2"]
assert len(sys.models) == 2
assert tuple(sys.models.keys()) == ("SimpleTable", "MobileCrane"), f"Found:{sys.models}"
m = sys.match_components("simple*")
assert m[0] == "SimpleTable", f"Found {m[0]}"
assert m[1] == ("simpleTable", "simpleTable2")
for k, _ in sys.components:
for k in sys.components.keys():
assert sys.component_name_from_id(sys.component_id_from_name(k)) == k

vars = sys.variables("simpleTable")
assert vars["interpolate"]["causality"] == "parameter"
assert vars["interpolate"]["type"] is bool, f"Found {vars['interpolate']['type']}"
Expand Down
7 changes: 3 additions & 4 deletions tests/test_system_interface_osp.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,13 @@ def test_simulator_from_system_structure():
path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml")
system = SystemInterfaceOSP(str(path), name="BouncingBall")
assert system.name == "BouncingBall", f"System.name should be BouncingBall. Found {system.name}"
comps = {k: v for (k, v) in system.components}
assert "bb" in comps, f"Instance name 'bb' expected. Found instances {list(comps.keys())}"
assert len(comps) == 3
assert "bb" in system.components, f"Instance name 'bb' expected. Found instances {list(system.components.keys())}"
assert len(system.components) == 3
assert len(system.models) == 1
assert "BouncingBall" in system.models
# system.check_instances_variables()
variables = system.variables("bb")
print(f"g: {variables['g']}")
# print(f"g: {variables['g']}")
assert variables["g"]["reference"] == 5
assert variables["g"]["type"] is float
assert variables["g"]["causality"] == "parameter"
Expand Down

0 comments on commit d6afd4f

Please sign in to comment.