From 238746c59aec9d4214f17910eeefe760d0b23d9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 18 Oct 2024 13:33:25 +0200 Subject: [PATCH 1/4] fix: result returned from perun decorator --- docs/usage.rst | 3 +++ perun/core.py | 17 ++++++++++++++--- perun/monitoring/application.py | 9 +++++---- perun/monitoring/monitor.py | 28 ++++++++++++++++------------ 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/docs/usage.rst b/docs/usage.rst index f224957..d3aa78d 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -86,6 +86,9 @@ The decorator takes the same options as the monitor command, and can be set usin .. caution:: If the decorated function is run multiple times, perun will behave as if it was run multiple times, initializing everything multiple times. To avoid this overhead, ensure the decorated function is called a single time. If information about a particular function which runs multiple times is needed, check out the :ref:`monitoring functions` section. +.. caution:: + If due to configuration options, perun is setup to run for multiple rounds, and the decorated function retuns a value, only the result of the last run will be returned. + Application Name and Run ID ~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/perun/core.py b/perun/core.py index 27ab1f3..5439ff6 100644 --- a/perun/core.py +++ b/perun/core.py @@ -281,13 +281,18 @@ def mark_event(self, region_id: str): def monitor_application( self, app: Application, - ): + ) -> Any: """Execute coordination, monitoring, post-processing, and reporting steps, in that order. Parameters ---------- app : Path App script file path + + Returns + ------- + Any + Last result of the application execution, only when the perun decorator is used. """ log.debug(f"Rank {self.comm.Get_rank()} Backends: {pp.pformat(self.backends)}") @@ -314,7 +319,9 @@ def monitor_application( self.warmup_round = True for i in range(self.config.getint("benchmarking", "warmup_rounds")): log.info(f"Warmup run: {i}") - status, _ = self._monitor.run_application(str(i), record=False) + status, _, last_result = self._monitor.run_application( + str(i), record=False + ) if ( status == MonitorStatus.FILE_NOT_FOUND or status == MonitorStatus.SCRIPT_ERROR @@ -325,7 +332,9 @@ def monitor_application( multirun_nodes: Dict[str, DataNode] = {} self.warmup_round = False for i in range(self.config.getint("benchmarking", "rounds")): - status, runNode = self._monitor.run_application(str(i), record=True) + status, runNode, last_result = self._monitor.run_application( + str(i), record=True + ) if status == MonitorStatus.SCRIPT_ERROR: if runNode is not None: @@ -357,6 +366,8 @@ def monitor_application( self._export_multirun(multirun_node) self._run_postprocess_callbacks(multirun_node) + return last_result + def _export_multirun(self, multirun_node: DataNode): data_out = Path(self.config.get("output", "data_out")) app_name = self.config.get("output", "app_name") diff --git a/perun/monitoring/application.py b/perun/monitoring/application.py index cc568bb..07a54dc 100644 --- a/perun/monitoring/application.py +++ b/perun/monitoring/application.py @@ -7,7 +7,7 @@ import subprocess from configparser import ConfigParser from pathlib import Path -from typing import Callable, Dict, Union +from typing import Any, Callable, Dict, Union log = logging.getLogger("perun") @@ -123,9 +123,9 @@ def _cleanup(self): for i in range(3): gc.collect(i) - def run(self): + def run(self) -> Any: """ - Execute the application. + Execute the application. If callable, returns the function result. Raises ------ @@ -141,8 +141,9 @@ def run(self): ) self._cleanup() elif callable(self._app): - self._app(*self._args, **self._kwargs) + result = self._app(*self._args, **self._kwargs) self._cleanup() + return result else: raise ValueError("Application not found") diff --git a/perun/monitoring/monitor.py b/perun/monitoring/monitor.py index 3bc0674..a15c7bb 100644 --- a/perun/monitoring/monitor.py +++ b/perun/monitoring/monitor.py @@ -8,7 +8,7 @@ from multiprocessing import Event, Process, Queue from multiprocessing.synchronize import Event as EventClass from subprocess import Popen -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from perun.backend.backend import Backend from perun.comm import Comm @@ -109,7 +109,7 @@ def run_application( self, run_id: str, record: bool = True, - ) -> Tuple[MonitorStatus, Optional[DataNode]]: + ) -> Tuple[MonitorStatus, Optional[DataNode], Any]: """ Run the application and returns the monitor status and data node. @@ -122,8 +122,8 @@ def run_application( Returns ------- - Tuple[MonitorStatus, Optional[DataNode]] - A tuple containing the monitor status and the data node. + Tuple[MonitorStatus, Optional[DataNode], Any] + A tuple containing the monitor status and the data node, and the application result. Raises ------ @@ -152,7 +152,7 @@ def run_application( else: try: self.status = MonitorStatus.RUNNING - self._app.run() + result = self._app.run() self.status = MonitorStatus.PROCESSING except SystemExit: self.status = MonitorStatus.PROCESSING @@ -167,9 +167,11 @@ def run_application( s, r = getattr(e, "message", str(e)), getattr(e, "message", repr(e)) log.error(f"Rank {self._comm.Get_rank()}: {s}") log.error(f"Rank {self._comm.Get_rank()}: {r}") - return self.status, None + return self.status, None, result - def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode]]: + def _run_python_app( + self, run_id: str + ) -> Tuple[MonitorStatus, Optional[DataNode], Any]: # 1) Get sensor configuration self.sp_ready_event = Event() self.start_event = Event() @@ -212,7 +214,7 @@ def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode starttime_ns = time.time_ns() self.status = MonitorStatus.RUNNING try: - self._app.run() + app_result = self._app.run() except SystemExit: log.info( "The application exited using exit(), quit() or sys.exit(). This is not the recommended way to exit an application, as it complicates the data collection process. Please refactor your code." @@ -231,7 +233,7 @@ def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode f"Rank {self._comm.Get_rank()}: Set start and stop event forcefully" ) recoveredNodes = self._handle_failed_run() - return self.status, recoveredNodes + return self.status, recoveredNodes, None self.status = MonitorStatus.PROCESSING # run_stoptime = datetime.utcnow() @@ -239,9 +241,11 @@ def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode self.stop_event.set() # type: ignore # 4) App finished, stop subrocess and get data - return self.status, self._process_single_run(run_id, starttime_ns) + return self.status, self._process_single_run(run_id, starttime_ns), app_result - def _run_binary_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode]]: + def _run_binary_app( + self, run_id: str + ) -> Tuple[MonitorStatus, Optional[DataNode], Any]: # 1) Prepare sensors ( timesteps, @@ -287,7 +291,7 @@ def _run_binary_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode runNode = DataNode(id=run_id, type=NodeType.RUN, nodes={hostNode.id: hostNode}) runNode.addRegionData(globalRegions, starttime_ns) - return MonitorStatus.SUCCESS, runNode + return MonitorStatus.SUCCESS, runNode, None def _handle_failed_run(self) -> Optional[DataNode]: availableRanks = self._comm.check_available_ranks() From 0d3576bec221e00334d2851425bc7bd16cddeaa7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 18 Oct 2024 11:35:06 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- CITATION.cff | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CITATION.cff b/CITATION.cff index af3826a..b77dc67 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -27,7 +27,7 @@ preferred-citation: - given-names: Juan Pedro family-names: Gutiérrez Hermosillo Muriedas email: juan.muriedas@kit.edu - affiliation: Karlsruhe Institute of Technology + affiliation: Karlsruhe Institute of Technology orcid: 'https://orcid.org/0000-0001-8439-7145' - given-names: Katharina family-names: Flügel From beaac53b9482e754a3afe610010580bd9d50c7d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 18 Oct 2024 13:42:20 +0200 Subject: [PATCH 3/4] ci: decorator tests return values --- tests/scripts/sleep_decorated.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/scripts/sleep_decorated.py b/tests/scripts/sleep_decorated.py index af1563f..3f93d92 100644 --- a/tests/scripts/sleep_decorated.py +++ b/tests/scripts/sleep_decorated.py @@ -1,12 +1,16 @@ +import random import time import perun @perun.perun() -def sleep(): +def sleep(value): time.sleep(10) + return value if __name__ == "__main__": - sleep() + value = random.randint(0, 100) + result = sleep(value) + assert result == value From 5ec081153c7709bbbe70531ea6576d470b5836bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Fri, 18 Oct 2024 14:01:24 +0200 Subject: [PATCH 4/4] ci: processing tests --- tests/perun/test_processing.py | 132 +++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 tests/perun/test_processing.py diff --git a/tests/perun/test_processing.py b/tests/perun/test_processing.py new file mode 100644 index 0000000..f146aae --- /dev/null +++ b/tests/perun/test_processing.py @@ -0,0 +1,132 @@ +from configparser import ConfigParser + +import numpy as np +import pytest + +from perun.data_model.data import ( + DataNode, + MetricType, + NodeType, + RawData, +) +from perun.data_model.measurement_type import Magnitude, MetricMetaData, Unit +from perun.data_model.sensor import DeviceType +from perun.processing import ( + getInterpolatedValues, + processDataNode, + processEnergyData, + processSensorData, +) + + +def test_processEnergyData(): + raw_data = RawData( + timesteps=np.array([0, 1, 2, 3, 4], dtype=np.float32), + values=np.array([0, 10, 20, 30, 40], dtype=np.float32), + t_md=MetricMetaData( + Unit.SECOND, + Magnitude.ONE, + np.dtype("float32"), + np.int32(0), + np.int32(100), + np.int32(-1), + ), + v_md=MetricMetaData( + Unit.JOULE, + Magnitude.ONE, + np.dtype("float32"), + np.int32(0), + np.int32(100), + np.int32(-1), + ), + ) + energy, power = processEnergyData(raw_data) + assert energy == pytest.approx(40.0) + assert power == pytest.approx(10.0) + + +def test_processSensorData(): + raw_data = RawData( + timesteps=np.array([0, 1, 2, 3, 4], dtype=np.float32), + values=np.array([0, 10, 20, 30, 40], dtype=np.float32), + t_md=MetricMetaData( + Unit.SECOND, + Magnitude.ONE, + np.dtype("float32"), + np.int32(0), + np.int32(100), + np.int32(-1), + ), + v_md=MetricMetaData( + Unit.JOULE, + Magnitude.ONE, + np.dtype("float32"), + np.int32(0), + np.int32(100), + np.int32(-1), + ), + ) + sensor_data = DataNode( + id="test_node", + type=NodeType.SENSOR, + raw_data=raw_data, + deviceType=DeviceType.CPU, + ) + processed_data = processSensorData(sensor_data) + assert MetricType.ENERGY in processed_data.metrics + assert MetricType.POWER in processed_data.metrics + assert sensor_data.metrics[MetricType.ENERGY].value == pytest.approx(40.0) + assert sensor_data.metrics[MetricType.POWER].value == pytest.approx(10.0) + + +def test_processDataNode(): + raw_data = RawData( + timesteps=np.array([0, 1, 2, 3, 4], dtype=np.float32), + values=np.array([0, 10, 20, 30, 40], dtype=np.float32), + t_md=MetricMetaData( + Unit.SECOND, + Magnitude.ONE, + np.dtype("float32"), + np.int32(0), + np.int32(100), + np.int32(-1), + ), + v_md=MetricMetaData( + Unit.JOULE, + Magnitude.ONE, + np.dtype("float32"), + np.int32(0), + np.int32(100), + np.int32(-1), + ), + ) + sensor_data = DataNode( + id="sensor_node", + type=NodeType.SENSOR, + raw_data=raw_data, + deviceType=DeviceType.CPU, + ) + devcie_data = DataNode( + id="app_node", type=NodeType.DEVICE_GROUP, nodes={"sensor": sensor_data} + ) + config = ConfigParser() + config.add_section("post-processing") + config.set("post-processing", "power_overhead", "10.0") + config.set("post-processing", "pue", "1.5") + config.set("post-processing", "emissions_factor", "0.5") + config.set("post-processing", "price_factor", "0.1") + processed_data = processDataNode(devcie_data, config, force_process=True) + print(processed_data.metrics) + assert MetricType.ENERGY in processed_data.metrics + assert MetricType.POWER in processed_data.metrics + assert processed_data.metrics[MetricType.ENERGY].value == pytest.approx(40.0) + assert processed_data.metrics[MetricType.POWER].value == pytest.approx(10.0) + + +def test_getInterpolatedValues(): + t = np.array([0, 1, 2, 3, 4], dtype=np.float32) + x = np.array([0, 10, 20, 30, 40], dtype=np.float32) + start, end = np.float32(1), np.float32(3.5) + new_t, new_x = getInterpolatedValues(t, x, start, end) + assert np.array_equal(new_t, np.array([1, 1, 2, 3, 3.5], dtype=np.float32)) + assert np.array_equal(new_x, np.array([10, 10, 20, 30, 35], dtype=np.float32))