Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@perun decorator returns the last execution result #147

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ preferred-citation:
- given-names: Juan Pedro
family-names: Gutiérrez Hermosillo Muriedas
email: [email protected]
affiliation: Karlsruhe Institute of Technology
affiliation: Karlsruhe Institute of Technology
orcid: 'https://orcid.org/0000-0001-8439-7145'
- given-names: Katharina
family-names: Flügel
Expand Down
3 changes: 3 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ The decorator takes the same options as the monitor command, and can be set usin
.. caution::
If the decorated function is run multiple times, perun will behave as if it was run multiple times, initializing everything multiple times. To avoid this overhead, ensure the decorated function is called a single time. If information about a particular function which runs multiple times is needed, check out the :ref:`monitoring functions` section.

.. caution::
If due to configuration options, perun is setup to run for multiple rounds, and the decorated function retuns a value, only the result of the last run will be returned.


Application Name and Run ID
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
17 changes: 14 additions & 3 deletions perun/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,18 @@
def monitor_application(
self,
app: Application,
):
) -> Any:
"""Execute coordination, monitoring, post-processing, and reporting steps, in that order.

Parameters
----------
app : Path
App script file path

Returns
-------
Any
Last result of the application execution, only when the perun decorator is used.
"""
log.debug(f"Rank {self.comm.Get_rank()} Backends: {pp.pformat(self.backends)}")

Expand All @@ -314,7 +319,9 @@
self.warmup_round = True
for i in range(self.config.getint("benchmarking", "warmup_rounds")):
log.info(f"Warmup run: {i}")
status, _ = self._monitor.run_application(str(i), record=False)
status, _, last_result = self._monitor.run_application(

Check warning on line 322 in perun/core.py

View check run for this annotation

Codecov / codecov/patch

perun/core.py#L322

Added line #L322 was not covered by tests
str(i), record=False
)
if (
status == MonitorStatus.FILE_NOT_FOUND
or status == MonitorStatus.SCRIPT_ERROR
Expand All @@ -325,7 +332,9 @@
multirun_nodes: Dict[str, DataNode] = {}
self.warmup_round = False
for i in range(self.config.getint("benchmarking", "rounds")):
status, runNode = self._monitor.run_application(str(i), record=True)
status, runNode, last_result = self._monitor.run_application(
str(i), record=True
)

if status == MonitorStatus.SCRIPT_ERROR:
if runNode is not None:
Expand Down Expand Up @@ -357,6 +366,8 @@
self._export_multirun(multirun_node)
self._run_postprocess_callbacks(multirun_node)

return last_result

def _export_multirun(self, multirun_node: DataNode):
data_out = Path(self.config.get("output", "data_out"))
app_name = self.config.get("output", "app_name")
Expand Down
9 changes: 5 additions & 4 deletions perun/monitoring/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import subprocess
from configparser import ConfigParser
from pathlib import Path
from typing import Callable, Dict, Union
from typing import Any, Callable, Dict, Union

log = logging.getLogger("perun")

Expand Down Expand Up @@ -123,9 +123,9 @@ def _cleanup(self):
for i in range(3):
gc.collect(i)

def run(self):
def run(self) -> Any:
"""
Execute the application.
Execute the application. If callable, returns the function result.

Raises
------
Expand All @@ -141,8 +141,9 @@ def run(self):
)
self._cleanup()
elif callable(self._app):
self._app(*self._args, **self._kwargs)
result = self._app(*self._args, **self._kwargs)
self._cleanup()
return result
else:
raise ValueError("Application not found")

Expand Down
28 changes: 16 additions & 12 deletions perun/monitoring/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from multiprocessing import Event, Process, Queue
from multiprocessing.synchronize import Event as EventClass
from subprocess import Popen
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

from perun.backend.backend import Backend
from perun.comm import Comm
Expand Down Expand Up @@ -109,7 +109,7 @@
self,
run_id: str,
record: bool = True,
) -> Tuple[MonitorStatus, Optional[DataNode]]:
) -> Tuple[MonitorStatus, Optional[DataNode], Any]:
"""
Run the application and returns the monitor status and data node.

Expand All @@ -122,8 +122,8 @@

Returns
-------
Tuple[MonitorStatus, Optional[DataNode]]
A tuple containing the monitor status and the data node.
Tuple[MonitorStatus, Optional[DataNode], Any]
A tuple containing the monitor status and the data node, and the application result.

Raises
------
Expand Down Expand Up @@ -152,7 +152,7 @@
else:
try:
self.status = MonitorStatus.RUNNING
self._app.run()
result = self._app.run()

Check warning on line 155 in perun/monitoring/monitor.py

View check run for this annotation

Codecov / codecov/patch

perun/monitoring/monitor.py#L155

Added line #L155 was not covered by tests
self.status = MonitorStatus.PROCESSING
except SystemExit:
self.status = MonitorStatus.PROCESSING
Expand All @@ -167,9 +167,11 @@
s, r = getattr(e, "message", str(e)), getattr(e, "message", repr(e))
log.error(f"Rank {self._comm.Get_rank()}: {s}")
log.error(f"Rank {self._comm.Get_rank()}: {r}")
return self.status, None
return self.status, None, result

Check warning on line 170 in perun/monitoring/monitor.py

View check run for this annotation

Codecov / codecov/patch

perun/monitoring/monitor.py#L170

Added line #L170 was not covered by tests

def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode]]:
def _run_python_app(
self, run_id: str
) -> Tuple[MonitorStatus, Optional[DataNode], Any]:
# 1) Get sensor configuration
self.sp_ready_event = Event()
self.start_event = Event()
Expand Down Expand Up @@ -212,7 +214,7 @@
starttime_ns = time.time_ns()
self.status = MonitorStatus.RUNNING
try:
self._app.run()
app_result = self._app.run()
except SystemExit:
log.info(
"The application exited using exit(), quit() or sys.exit(). This is not the recommended way to exit an application, as it complicates the data collection process. Please refactor your code."
Expand All @@ -231,17 +233,19 @@
f"Rank {self._comm.Get_rank()}: Set start and stop event forcefully"
)
recoveredNodes = self._handle_failed_run()
return self.status, recoveredNodes
return self.status, recoveredNodes, None

Check warning on line 236 in perun/monitoring/monitor.py

View check run for this annotation

Codecov / codecov/patch

perun/monitoring/monitor.py#L236

Added line #L236 was not covered by tests

self.status = MonitorStatus.PROCESSING
# run_stoptime = datetime.utcnow()
log.info(f"Rank {self._comm.Get_rank()}: App Stopped")
self.stop_event.set() # type: ignore

# 4) App finished, stop subrocess and get data
return self.status, self._process_single_run(run_id, starttime_ns)
return self.status, self._process_single_run(run_id, starttime_ns), app_result

def _run_binary_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode]]:
def _run_binary_app(
self, run_id: str
) -> Tuple[MonitorStatus, Optional[DataNode], Any]:
# 1) Prepare sensors
(
timesteps,
Expand Down Expand Up @@ -287,7 +291,7 @@
runNode = DataNode(id=run_id, type=NodeType.RUN, nodes={hostNode.id: hostNode})
runNode.addRegionData(globalRegions, starttime_ns)

return MonitorStatus.SUCCESS, runNode
return MonitorStatus.SUCCESS, runNode, None

def _handle_failed_run(self) -> Optional[DataNode]:
availableRanks = self._comm.check_available_ranks()
Expand Down
132 changes: 132 additions & 0 deletions tests/perun/test_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from configparser import ConfigParser

import numpy as np
import pytest

from perun.data_model.data import (
DataNode,
MetricType,
NodeType,
RawData,
)
from perun.data_model.measurement_type import Magnitude, MetricMetaData, Unit
from perun.data_model.sensor import DeviceType
from perun.processing import (
getInterpolatedValues,
processDataNode,
processEnergyData,
processSensorData,
)


def test_processEnergyData():
raw_data = RawData(
timesteps=np.array([0, 1, 2, 3, 4], dtype=np.float32),
values=np.array([0, 10, 20, 30, 40], dtype=np.float32),
t_md=MetricMetaData(
Unit.SECOND,
Magnitude.ONE,
np.dtype("float32"),
np.int32(0),
np.int32(100),
np.int32(-1),
),
v_md=MetricMetaData(
Unit.JOULE,
Magnitude.ONE,
np.dtype("float32"),
np.int32(0),
np.int32(100),
np.int32(-1),
),
)
energy, power = processEnergyData(raw_data)
assert energy == pytest.approx(40.0)
assert power == pytest.approx(10.0)


def test_processSensorData():
raw_data = RawData(
timesteps=np.array([0, 1, 2, 3, 4], dtype=np.float32),
values=np.array([0, 10, 20, 30, 40], dtype=np.float32),
t_md=MetricMetaData(
Unit.SECOND,
Magnitude.ONE,
np.dtype("float32"),
np.int32(0),
np.int32(100),
np.int32(-1),
),
v_md=MetricMetaData(
Unit.JOULE,
Magnitude.ONE,
np.dtype("float32"),
np.int32(0),
np.int32(100),
np.int32(-1),
),
)
sensor_data = DataNode(
id="test_node",
type=NodeType.SENSOR,
raw_data=raw_data,
deviceType=DeviceType.CPU,
)
processed_data = processSensorData(sensor_data)
assert MetricType.ENERGY in processed_data.metrics
assert MetricType.POWER in processed_data.metrics
assert sensor_data.metrics[MetricType.ENERGY].value == pytest.approx(40.0)
assert sensor_data.metrics[MetricType.POWER].value == pytest.approx(10.0)


def test_processDataNode():
raw_data = RawData(
timesteps=np.array([0, 1, 2, 3, 4], dtype=np.float32),
values=np.array([0, 10, 20, 30, 40], dtype=np.float32),
t_md=MetricMetaData(
Unit.SECOND,
Magnitude.ONE,
np.dtype("float32"),
np.int32(0),
np.int32(100),
np.int32(-1),
),
v_md=MetricMetaData(
Unit.JOULE,
Magnitude.ONE,
np.dtype("float32"),
np.int32(0),
np.int32(100),
np.int32(-1),
),
)
sensor_data = DataNode(
id="sensor_node",
type=NodeType.SENSOR,
raw_data=raw_data,
deviceType=DeviceType.CPU,
)
devcie_data = DataNode(
id="app_node", type=NodeType.DEVICE_GROUP, nodes={"sensor": sensor_data}
)
config = ConfigParser()
config.add_section("post-processing")
config.set("post-processing", "power_overhead", "10.0")
config.set("post-processing", "pue", "1.5")
config.set("post-processing", "emissions_factor", "0.5")
config.set("post-processing", "price_factor", "0.1")
processed_data = processDataNode(devcie_data, config, force_process=True)
print(processed_data.metrics)
assert MetricType.ENERGY in processed_data.metrics
assert MetricType.POWER in processed_data.metrics
assert processed_data.metrics[MetricType.ENERGY].value == pytest.approx(40.0)
assert processed_data.metrics[MetricType.POWER].value == pytest.approx(10.0)


def test_getInterpolatedValues():
t = np.array([0, 1, 2, 3, 4], dtype=np.float32)
x = np.array([0, 10, 20, 30, 40], dtype=np.float32)
start, end = np.float32(1), np.float32(3.5)
new_t, new_x = getInterpolatedValues(t, x, start, end)
assert np.array_equal(new_t, np.array([1, 1, 2, 3, 3.5], dtype=np.float32))
assert np.array_equal(new_x, np.array([10, 10, 20, 30, 35], dtype=np.float32))
8 changes: 6 additions & 2 deletions tests/scripts/sleep_decorated.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import random
import time

import perun


@perun.perun()
def sleep():
def sleep(value):
time.sleep(10)
return value


if __name__ == "__main__":
sleep()
value = random.randint(0, 100)
result = sleep(value)
assert result == value