Skip to content

Commit

Permalink
resolve missing type hints (work in progress)
Browse files Browse the repository at this point in the history
  • Loading branch information
ClaasRostock committed Jan 24, 2025
1 parent 38f4358 commit f61fa62
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 46 deletions.
20 changes: 10 additions & 10 deletions src/sim_explorer/assertion.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ def info(self, sym: str, typ: str = "instance") -> str | int:
assert len(self._cases_variables[var]["instances"]) == 1, f"Non-unique instance for variable {var}"
instance = self._cases_variables[var]["instances"][0] # use the unique instance
else:
instance = parts[0] + "".join("_" + x for x in parts[1:])
instance = parts[0] + "".join(f"_{x}" for x in parts[1:])
assert instance in self._cases_variables[var]["instances"], f"No instance {instance} of {var}"
break
if not len(parts):
raise KeyError(f"The symbol {sym} does not seem to represent a registered variable") from None
var = parts.pop() + "_" + var
var = f"{parts.pop()}_{var}"
if typ == "instance": # get the instance
return instance
if typ == "variable": # get the generic variable name
Expand Down Expand Up @@ -107,11 +107,11 @@ def expr(self, key: str, ex: str | None = None) -> str | CodeType:

def make_func(name: str, args: dict, body: str) -> str:
"""Make a python function from the body."""
code = "def _" + name + "("
code = f"def _{name}("
for a in args:
code += a + ", "
code += f"{a}, "
code += "):\n"
code += " return " + body + "\n"
code += f" return {body}" + "\n"
return code

if ex is None: # getter
Expand Down Expand Up @@ -146,7 +146,7 @@ def syms(self, key: str) -> list[str]:
else:
return syms

def expr_get_symbols_functions(self, expr: str) -> tuple[list[str], list[str]]:
def expr_get_symbols_functions(self, expr: str) -> tuple[list[str], list[str]]: # noqa: C901
"""Get the symbols used in the expression.
1. Symbol as listed in expression and function body. In general <instant>_<variable>[<index>]
Expand Down Expand Up @@ -255,7 +255,7 @@ def register_vars(self, variables: dict[str, dict[str, Any]]) -> None:
for inst in info["instances"]:
if len(info["instances"]) == 1: # the instance is unique
_ = self.symbol(key, len(info["names"])) # we allow to use the 'short name' if unique
_ = self.symbol(inst + "_" + key, len(info["names"])) # fully qualified name can always be used
_ = self.symbol(f"{inst}_{key}", len(info["names"]))

def make_locals(self, loc: dict[str, Any]) -> dict[str, Any]:
"""Adapt the locals with 'allowed' functions."""
Expand Down Expand Up @@ -307,9 +307,9 @@ def eval_single(self, key: str, kvargs: dict[str, Any] | list[Any] | tuple[Any,
loc = self.make_locals(locals())
exec(self._compiled[key], loc, loc) # noqa: S102
# print("kvargs", kvargs, self._syms[key], self.expr_get_symbols_functions(key)) # noqa: ERA001
return self._eval(locals()["_" + key], kvargs)
return self._eval(locals()[f"_{key}"], kvargs)

def eval_series(
def eval_series( # noqa: C901, PLR0912
self,
key: str,
data: list[list[int | float | bool]],
Expand Down Expand Up @@ -346,7 +346,7 @@ def eval_series(
argnames = self._syms[key]
loc = self.make_locals(locals())
exec(self._compiled[key], loc, loc) # the function is then available as _<key> among locals() # noqa: S102
func = locals()["_" + key] # scalar function of all used arguments
func = locals()[f"_{key}"]
_temp = self._temporal[key]["type"] if ret is None else Temporal.UNDEFINED

for _row in data:
Expand Down
20 changes: 15 additions & 5 deletions src/sim_explorer/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,16 +854,22 @@ class Results:
If "" default file name is used, if None, results are not stored.
"""

def __init__(self, case: Case | str | Path | None = None, file: str | Path | None = None):
def __init__(
self,
case: Case | str | Path | None = None,
file: str | Path | None = None,
) -> None:
self.file: Path | None # None denotes that results are not automatically saved
self.case: Case | None = None
self.res: Json5
if (case is None or isinstance(case, (str, Path))) and file is not None:
self._init_from_existing(file) # instantiating from existing results file (work with data)
elif isinstance(case, Case): # instantiating from cases file (for data collection)
self._init_new(case)
else:
raise ValueError(f"Inconsistent init arguments case:{case}, file:{file}")

def _init_from_existing(self, file: str | Path):
def _init_from_existing(self, file: str | Path) -> None:
self.file = Path(file)
assert self.file.exists(), f"File {file} is expected to exist."
self.res = Json5(self.file)
Expand All @@ -872,13 +878,17 @@ def _init_from_existing(self, file: str | Path):
cases = Cases(Path(case))
except ValueError:
raise CaseInitError(f"Cases {Path(case)} instantiation error") from ValueError
self.case: Case | None = cases.case_by_name(name=self.res.jspath(path="$.header.case", typ=str, errorMsg=True))
self.case = cases.case_by_name(name=self.res.jspath(path="$.header.case", typ=str, errorMsg=True))
assert isinstance(self.case, Case), f"Case {self.res.jspath('$.header.case', str, True)} not found"
assert isinstance(self.case.cases, Cases), "Cases object not defined"
self._header_transform(False)
self._header_transform(tostring=False)
self.case.add_results_object(self) # make Results object known to self.case

def _init_new(self, case: Case, file: str | Path | None = ""):
def _init_new(
self,
case: Case,
file: str | Path | None = "",
) -> None:
assert isinstance(case, Case), f"Case object expected as 'case' in Results. Found {type(case)}"
self.case = case
if file is not None: # use that for storing results data as Json5
Expand Down
52 changes: 35 additions & 17 deletions src/sim_explorer/system_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
description: str = "",
log_level: str = "fatal",
**kwargs: Any, # noqa: ANN401
):
) -> None:
self.structure_file = Path(structure_file)
self.name = name # overwrite if the system includes that
self.description = description # overwrite if the system includes that
Expand All @@ -76,6 +76,7 @@ def path(self) -> Path:
@staticmethod
def read_system_structure(
file: Path,
*,
fmus_exist: bool = True,
) -> dict[str, Any]:
"""Read the systemStructure file and perform checks.
Expand Down Expand Up @@ -338,7 +339,7 @@ def default_initial(
return "calculated" if only_default else ("calculated", "exact", "approx")
return init if only_default else (init,)

def allowed_action(
def allowed_action( # noqa: C901
self,
action: str,
comp: int | str,
Expand Down Expand Up @@ -513,7 +514,14 @@ def _add_set(
# new set action
actions[time].append((cvar, comp, refs, values))

def _add_get(self, actions: dict, time: float, cvar: str, comp: str, cvar_info: dict):
def _add_get(
self,
actions: dict[float, list[tuple[str, str, Any]]],
time: float,
cvar: str,
comp: str,
cvar_info: dict[str, Any],
) -> None:
"""Perform final processing and add the get action to the list (if appropriate).
Properties of get actions:
Expand All @@ -534,17 +542,17 @@ def _add_get(self, actions: dict, time: float, cvar: str, comp: str, cvar_info:
return # the get action is already registered
actions[time].append((cvar, comp, cvar_info["refs"]))

def add_actions(
def add_actions( # noqa: PLR0913
self,
actions: dict,
actions: dict[float, list[tuple[str, str, Any]]],
act_type: str,
cvar: str,
cvar_info: dict,
values: tuple | None,
cvar_info: dict[str, Any],
values: tuple[TValue] | None,
at_time: float,
stoptime: float,
rng: tuple[int, ...] | None = None,
):
) -> None:
"""Add specified actions to the provided action dict.
The action list is simulator-agnostic and need 'compilation' before they are used in a simulation.
Expand All @@ -566,15 +574,27 @@ def add_actions(
where value-list and rng are only present for set actions
at-time=-1 for get actions denote step actions
"""
assert isinstance(at_time, (float, int)), f"Actions require a defined time as float. Found {at_time}"
assert isinstance(at_time, float | int), f"Actions require a defined time as float. Found {at_time}"
if at_time not in actions:
actions.update({at_time: []}) # make sure that there is a suitable slot
actions[at_time] = [] # make sure that there is a suitable slot
for comp in cvar_info["instances"]:
if act_type == "get" or (act_type == "step" and at_time == -1): # normal get or step without time spec
self._add_get(actions, at_time, cvar, comp, cvar_info)
self._add_get(
actions=actions,
time=at_time,
cvar=cvar,
comp=comp,
cvar_info=cvar_info,
)
elif act_type == "step" and at_time >= 0: # step actions with specified interval
for time in np.arange(start=at_time, stop=stoptime, step=at_time):
self._add_get(actions, time, cvar, comp, cvar_info)
self._add_get(
actions=actions,
time=time,
cvar=cvar,
comp=comp,
cvar_info=cvar_info,
)

elif act_type == "set":
assert values is not None, f"Variable {cvar}: Value needed for 'set' actions."
Expand All @@ -584,7 +604,7 @@ def add_actions(
cvar=cvar,
comp=comp,
cvar_info=cvar_info,
values=tuple([cvar_info["type"](x) for x in values]),
values=tuple(cvar_info["type"](x) for x in values),
rng=rng,
)
else:
Expand All @@ -600,14 +620,12 @@ def action_step(self, act_info: tuple[Any, ...], typ: type) -> Callable[..., Any
"""
raise NotImplementedError("The method 'action_step()' cannot be used in SystemInterface") from None

def init_simulator(self):
def init_simulator(self) -> bool:
"""Instantiate and initialize the simulator, so that simulations can be run.
Perforemd separately from __init__ so that it can be repeated before simulation runs.
"""
raise NotImplementedError("The method 'init_simulator()' cannot be used in SystemInterface") from None
return False

def run_until(self, time: int | float):
def run_until(self, time: int | float) -> bool:
"""Instruct the simulator to simulate until the given time."""
raise NotImplementedError("The method 'run_until()' cannot be used in SystemInterface") from None
return False
25 changes: 11 additions & 14 deletions src/sim_explorer/system_interface_osp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from typing import Any

from libcosimpy.CosimExecution import CosimExecution
from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore
from libcosimpy.CosimManipulator import CosimManipulator # type: ignore
from libcosimpy.CosimObserver import CosimObserver # type: ignore
from libcosimpy.CosimLogging import CosimLogLevel, log_output_level
from libcosimpy.CosimManipulator import CosimManipulator
from libcosimpy.CosimObserver import CosimObserver

from sim_explorer.system_interface import SystemInterface

Expand All @@ -31,19 +31,19 @@ def __init__(
name: str | None = None,
description: str = "",
log_level: str = "fatal",
**kwargs,
):
**kwargs: Any, # noqa: ANN401, ARG002
) -> None:
super().__init__(structure_file, name, description, log_level)
self.full_simulator_available = True # system and components specification + simulation capabilities
# Note: The initialization of the OSP simulator itself is performed in init_simulator()
# Since this needs to be repeated before every simulation

def init_simulator(self):
def init_simulator(self) -> bool:
"""Instantiate and initialize the simulator, so that simulations can be run.
Perforemd separately from __init__ so that it can be repeated before simulation runs.
"""
log_output_level(CosimLogLevel[self.log_level.upper()])
# ck, msg = self._check_system_structure(self.sysconfig)
# ck, msg = self._check_system_structure(self.sysconfig) # noqa: ERA001
# assert ck, msg
assert self.structure_file.exists(), "Simulator initialization requires the structure file."
self.simulator = CosimExecution.from_osp_config_file(str(self.structure_file))
Expand All @@ -60,7 +60,7 @@ def init_simulator(self):
assert self.simulator.status().current_time == 0
return not self.simulator.status().error_code

def _action_func(self, act_type: int, var_type: type):
def _action_func(self, act_type: int, var_type: type) -> Callable[..., Any]:
"""Determine the correct action function and return it."""
if act_type == 0: # initial settings
return {
Expand All @@ -86,15 +86,12 @@ def _action_func(self, act_type: int, var_type: type):

def do_action(self, time: int | float, act_info: tuple[Any, ...], typ: type) -> bool:
"""Do the action described by the tuple using OSP functions."""
if len(act_info) == 4: # set action
if len(act_info) == 4: # set action # noqa: PLR2004
cvar, comp, refs, values = act_info
_comp = self.component_id_from_name(comp)
if time <= 0: # initial setting
func = self._action_func(0, typ)
for r, v in zip(refs, values, strict=False):
if not func(_comp, r, v):
return False
return True
return all(func(_comp, r, v) for r, v in zip(refs, values, strict=False))

return self._action_func(1, typ)(_comp, refs, values)
# get action
Expand All @@ -112,6 +109,6 @@ def action_step(self, act_info: tuple[Any, ...], typ: type) -> Callable[..., Any
_comp = self.component_id_from_name(comp)
return partial(self._action_func(act_type=2, var_type=typ), _comp, refs)

def run_until(self, time: int | float):
def run_until(self, time: int | float) -> bool:
"""Instruct the simulator to simulate until the given time."""
return self.simulator.simulate_until(time)

0 comments on commit f61fa62

Please sign in to comment.