Skip to content

Commit

Permalink
tests: pm: Adding tests based on new approach to PM testing.
Browse files Browse the repository at this point in the history
Introduces scripts for powerShiled on stm32l562e_dk board.
Adds three new power management pytests based on a new approach.

Signed-off-by: Arkadiusz Cholewinski <[email protected]>
  • Loading branch information
gbarkadiusz committed Dec 10, 2024
1 parent 124aae3 commit 980a943
Show file tree
Hide file tree
Showing 29 changed files with 1,345 additions and 0 deletions.
42 changes: 42 additions & 0 deletions scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright: (c) 2024, Intel Corporation

Check warning on line 1 in scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py

View workflow job for this annotation

GitHub Actions / Run compliance checks on patch series (PR)

C0303

scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py:1 Trailing whitespace (trailing-whitespace)
# Author: Arkadiusz Cholewinski <[email protected]>
# MIT License, see (https://opensource.org/license/mit/)

import string
from typing import List

Check warning on line 6 in scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py

View workflow job for this annotation

GitHub Actions / Run compliance checks on patch series (PR)

W0611

scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py:6 Unused List imported from typing (unused-import)
from abc import ABC, abstractmethod

class PowerMonitor(ABC):

@abstractmethod
def init(self, device_id:string):
"""
Abstract method to initialize the power monitor.
Agr:
string: Address of the power monitor
Return:
bool: True of False.
"""

@abstractmethod
def measure(self, duration:int):
"""
Abstract method to measure current with specified measurement time.
Args:
duration (int): The duration of the measurement in seconds.
"""

@abstractmethod
def get_data(self, duration: int) -> list[float]:
"""
Measure current with specified measurement time.
Args:
duration (int): The duration of the measurement in seconds.
Returns:
List[float]: An array of measured current values in amperes.
"""

Check warning on line 42 in scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py

View workflow job for this annotation

GitHub Actions / Run compliance checks on patch series (PR)

C0304

scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py:42 Final newline missing (missing-final-newline)
469 changes: 469 additions & 0 deletions scripts/pm/power_monitor_stm32l562e_dk/PowerShield.py

Large diffs are not rendered by default.

81 changes: 81 additions & 0 deletions scripts/pm/power_monitor_stm32l562e_dk/PowerShieldConfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright: (c) 2024, Intel Corporation
# Author: Arkadiusz Cholewinski <[email protected]>
# MIT License, see (https://opensource.org/license/mit/)

class PowerShieldConf:

class PowerMode:
"""
Class representing power mode
"""
AUTO = "auto" # Power-on when acquisition start
ON = "on" # Power-on manually
OFF = "off" # Power-off manually

class MeasureUnit:
""""
Class representing measure units.
"""
VOLTAGE = "voltage" # Target Volatege
CURRENT_RMS = "current_rms" # Current RMS value
POWER = "power" # Total power consumption
RAW_DATA = "rawdata" # Get Raw Data (current probes)

class TemperatureUnit:
"""
Class representing temperature units.
"""
CELCIUS = "degc" # Celsius temperature unit
FAHRENHEIT = "degf" # Fahrenheit temperature unit

class FunctionMode:
"""
Class representing functional modes of a power monitor.
"""
OPTIM = "optim" # Optimized mode for lower power or efficiency
HIGH = "high" # High performance mode

class DataFormat:
"""
Class representing different data formats for representation.
"""
ASCII_DEC = "ascii_dec" # ASCII encoded decimal format
BIN_HEXA = "bin_hexa" # Binary/hexadecimal format

class SamplingFrequency:
"""
Class representing various sampling frequencies.
"""
FREQ_100K = "100k" # 100 kHz frequency
FREQ_50K = "50k" # 50 kHz frequency
FREQ_20K = "20k" # 20 kHz frequency
FREQ_10K = "10k" # 10 kHz frequency
FREQ_5K = "5k" # 5 kHz frequency
FREQ_2K = "2k" # 2 kHz frequency
FREQ_1K = "1k" # 1 kHz frequency
FREQ_500 = "500" # 500 Hz frequency
FREQ_200 = "200" # 200 Hz frequency
FREQ_100 = "100" # 100 Hz frequency
FREQ_50 = "50" # 50 Hz frequency
FREQ_20 = "20" # 20 Hz frequency
FREQ_10 = "10" # 10 Hz frequency
FREQ_5 = "5" # 5 Hz frequency
FREQ_2 = "2" # 2 Hz frequency
FREQ_1 = "1" # 1 Hz frequency

def __init__(self, data_format: str = DataFormat.BIN_HEXA,
temperature_unit: str = TemperatureUnit.CELCIUS,
target_voltage: str = "3300m",
function_mode: str = FunctionMode.HIGH,
sampling_frequency: str = SamplingFrequency.FREQ_1K,
output_file: str = "rawData.csv",
):

self.output_file = output_file
self.sampling_freqency = sampling_frequency
self.data_format = data_format
self.function_mode = function_mode
self.target_voltage = target_voltage
self.temperature_unit = temperature_unit
self.acquisition_time = None

10 changes: 10 additions & 0 deletions scripts/pm/power_monitor_stm32l562e_dk/PowerShieldData.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright: (c) 2024, Intel Corporation
# Author: Arkadiusz Cholewinski <[email protected]>
# MIT License, see (https://opensource.org/license/mit/)

class PowerShieldData:

def __init__(self):
self.data = []
self.current_RMS = None
self.power = None
81 changes: 81 additions & 0 deletions scripts/pm/power_monitor_stm32l562e_dk/SerialHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright: (c) 2024, Intel Corporation
# Author: Arkadiusz Cholewinski <[email protected]>
# MIT License, see (https://opensource.org/license/mit/)

import serial
import logging

class SerialHandler:
def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
"""
Initializes the class for serial communication.
:param port: The serial port name (e.g., 'COM1', '/dev/ttyUSB0').
:param baudrate: The baud rate for the connection (default is 9600).
:param timeout: The timeout for read operations in seconds (default is 1.0).
"""
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.serial_connection = None

def open(self):
"""
Opens the serial connection.
"""
if self.serial_connection is None:
try:
self.serial_connection = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
logging.info("Connection to %s at %d baud opened successfully.", self.port, self.baudrate)
except serial.SerialException as e:
logging.error("Error opening serial port %s: %s", self.port, str(e))
self.serial_connection = None

def close(self):
"""Closes the serial connection."""
if self.serial_connection and self.serial_connection.is_open:
self.serial_connection.close()
logging.info("Serial connection closed.")

def send_cmd(self, cmd: str):
"""
Sends a command to the serial device with a newline, and prints it.
:param cmd: The command to be sent.
"""
if self.serial_connection and self.serial_connection.is_open:
try:
self.serial_connection.write((cmd + "\r\n").encode('ascii'))
except serial.SerialException as e:
logging.error(f"Error sending command: {e}")

def read_bytes(self, count: int):
if self.serial_connection:
x = self.serial_connection.read(count)
return x


def receive_cmd(self) -> str:
"""
Reads data from the serial device until no more data is available.
:return: The processed received data as a string.
"""
s = ""
if self.serial_connection and self.serial_connection.is_open:
while True:
x = self.serial_connection.read()
if len(x) < 1 or x == 0xf0:
return s.replace("\0", "").strip().replace("\r", "").replace("\n\n\n", "\n")
s += str(x, encoding='ascii', errors='ignore')

def is_open(self) -> bool:
"""Checks if the connection is open."""
return self.serial_connection and self.serial_connection.is_open

def __del__(self):
"""Closes the connection if the object is deleted."""
self.close()
# Shut down logging
logging.shutdown()

72 changes: 72 additions & 0 deletions scripts/pm/power_monitor_stm32l562e_dk/UnityFunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright: (c) 2024, Intel Corporation
# Author: Arkadiusz Cholewinski <[email protected]>
# MIT License, see (https://opensource.org/license/mit/)

import numpy as np

class UnityFunctions:

@staticmethod
def convert_acq_time(value):
"""
Converts an acquisition time value to a more readable format with units.
- Converts values to m (milli), u (micro), or leaves them as is for whole numbers.
:param value: The numeric value to convert.
:return: A tuple with the value and the unit as separate elements.
"""
if value < 1e-3:
# If the value is smaller than 1 millisecond (10^-3), express in micro (u)
return f"{value * 1e6:.0f}", "us"
elif value < 1:
# If the value is smaller than 1 but larger than or equal to 1 milli (10^-3), express in milli (m)
return f"{value * 1e3:.0f}", "ms"
else:
# If the value is 1 or larger, express in seconds (s)
return f"{value:.0f}", "s"

@staticmethod
def calculate_rms(data):
"""
Calculate the Root Mean Square (RMS) of a given data array.
:param data: List or numpy array containing the data
:return: RMS value
"""
# Convert to a numpy array for easier mathematical operations
data_array = np.array(data, dtype=np.float64) # Convert to float64 to avoid type issues


# Calculate the RMS value
rms = np.sqrt(np.mean(np.square(data_array)))
return rms

@staticmethod
def bytes_to_twobyte_values(data):
value = int.from_bytes(data[0],'big')<<8 | int.from_bytes(data[1], 'big')
return value

@staticmethod
def convert_to_amps(value):
"""
Convert amps to watts
"""
amps = (value & 4095) * (16**(0 - (value >> 12)))
return amps

@staticmethod
def convert_to_scientific_notation(time: int, unit: str) -> str:
"""
Converts time to scientific notation based on the provided unit.
:param time: The time value to convert.
:param unit: The unit of the time ('us', 'ms', or 's').
:return: A string representing the time in scientific notation.
"""
if unit == 'us': # microseconds
return f"{time}-6"
elif unit == 'ms': # milliseconds
return f"{time}-3"
elif unit == 's': # seconds
return f"{time}"
else:
raise ValueError("Invalid unit. Use 'us', 'ms', or 's'.")
24 changes: 24 additions & 0 deletions tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.overlay
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2024 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/

&stop0{
compatible = "zephyr,power-state";
power-state-name = "suspend-to-idle";
substate-id = <1>;
min-residency-us = <500000>;
};
&stop1{
compatible = "zephyr,power-state";
power-state-name = "suspend-to-idle";
substate-id = <2>;
min-residency-us = <1000000>;
};
&stop2{
compatible = "zephyr,power-state";
power-state-name = "suspend-to-idle";
substate-id = <3>;
min-residency-us = <1500000>;
};
4 changes: 4 additions & 0 deletions tests/subsys/pm/power_residency_time/prj.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

CONFIG_PM=y
20 changes: 20 additions & 0 deletions tests/subsys/pm/power_residency_time/pytest/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (c) 2024 Intel Corporation.
# SPDX-License-Identifier: Apache-2.0

import pytest
import time
from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield
from twister_harness import DeviceAdapter

def pytest_addoption(parser):
parser.addoption("--powershield",
help="Path to PowerShield device")

@pytest.fixture(scope='module')
def power_monitor_measure(dut: DeviceAdapter, request):
powershield_device = request.config.getoption("--powershield")
PM_Device = PowerShield()
PM_Device.init(power_device_path = powershield_device)
PM_Device.measure(time=8)
return PM_Device.get_data()

34 changes: 34 additions & 0 deletions tests/subsys/pm/power_residency_time/pytest/current_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2024 Intel Corporation.
# SPDX-License-Identifier: Apache-2.0

import numpy as np
from scipy import signal
import logging
def current_RMS(data):
data = data[600:]
data = [float(x) for x in data]
peaks, data = find_peaks(data)
peaks = peaks[0:6]
logging.info(f"Found peaks: {peaks}")
# Adding the start (0) and end (length of data) to the list of indices
indices = np.concatenate(([0], np.array(peaks), [len(data)]))

# Splitting the data based on index ranges
split_data = [data[indices[i]+40:indices[i+1]-40] for i in range(len(indices)-1)]

# Function to calculate RMS for a chunk of data
def calculate_rms(chunks):
rms = []
for chunk in chunks:
rms_value = np.sqrt(np.mean(np.square(chunk)))
rms.append(rms_value)
return rms

# Calculate RMS for each chunk
rms_values = calculate_rms(split_data)

return rms_values

def find_peaks(data):
peaks = signal.find_peaks(data, distance=40, height=0.008)[0]
return peaks, data
Loading

0 comments on commit 980a943

Please sign in to comment.