From dc184047b86b6b62c99fa54da3ea75fd427ce75f Mon Sep 17 00:00:00 2001 From: Arkadiusz Cholewinski Date: Thu, 31 Oct 2024 16:25:19 +0100 Subject: [PATCH] tests: pm: Adding tests based on new approach to PM testing. Introduces scripts for powerShiled on stm32l562e_dk board. Adds three new power management pytests based on a new approach. Signed-off-by: Arkadiusz Cholewinski --- .../AbstractPowerMonitor.py | 40 ++ .../PowerShield.py | 488 ++++++++++++++++++ .../PowerShieldConfig.py | 84 +++ .../PowerShieldData.py | 9 + .../SerialHandler.py | 84 +++ .../UnityFunctions.py | 137 +++++ .../pm/power_residency_time/CMakeLists.txt | 9 + .../boards/stm32l562e_dk.overlay | 24 + .../boards/stm32l562e_dk.yaml | 11 + tests/subsys/pm/power_residency_time/prj.conf | 4 + .../power_residency_time/pytest/conftest.py | 33 ++ .../pytest/test_residency_time.py | 38 ++ .../subsys/pm/power_residency_time/src/main.c | 44 ++ .../pm/power_residency_time/testcase.yaml | 14 + tests/subsys/pm/power_states/CMakeLists.txt | 9 + .../power_states/boards/stm32l562e_dk.overlay | 29 ++ .../pm/power_states/boards/stm32l562e_dk.yaml | 11 + tests/subsys/pm/power_states/prj.conf | 5 + .../subsys/pm/power_states/pytest/conftest.py | 33 ++ .../power_states/pytest/test_power_states.py | 38 ++ tests/subsys/pm/power_states/src/main.c | 36 ++ tests/subsys/pm/power_states/testcase.yaml | 14 + .../pm/power_wakeup_timer/CMakeLists.txt | 9 + .../boards/stm32l562e_dk.overlay | 15 + .../boards/stm32l562e_dk.yaml | 11 + tests/subsys/pm/power_wakeup_timer/prj.conf | 7 + .../pm/power_wakeup_timer/pytest/conftest.py | 33 ++ .../pytest/test_wakeup_timer.py | 35 ++ tests/subsys/pm/power_wakeup_timer/src/main.c | 94 ++++ .../pm/power_wakeup_timer/testcase.yaml | 27 + 30 files changed, 1425 insertions(+) create mode 100644 scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py create mode 100644 scripts/pm/power_monitor_stm32l562e_dk/PowerShield.py create mode 100644 scripts/pm/power_monitor_stm32l562e_dk/PowerShieldConfig.py create mode 100644 scripts/pm/power_monitor_stm32l562e_dk/PowerShieldData.py create mode 100644 scripts/pm/power_monitor_stm32l562e_dk/SerialHandler.py create mode 100644 scripts/pm/power_monitor_stm32l562e_dk/UnityFunctions.py create mode 100644 tests/subsys/pm/power_residency_time/CMakeLists.txt create mode 100644 tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.overlay create mode 100644 tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.yaml create mode 100644 tests/subsys/pm/power_residency_time/prj.conf create mode 100644 tests/subsys/pm/power_residency_time/pytest/conftest.py create mode 100644 tests/subsys/pm/power_residency_time/pytest/test_residency_time.py create mode 100644 tests/subsys/pm/power_residency_time/src/main.c create mode 100644 tests/subsys/pm/power_residency_time/testcase.yaml create mode 100644 tests/subsys/pm/power_states/CMakeLists.txt create mode 100644 tests/subsys/pm/power_states/boards/stm32l562e_dk.overlay create mode 100644 tests/subsys/pm/power_states/boards/stm32l562e_dk.yaml create mode 100644 tests/subsys/pm/power_states/prj.conf create mode 100644 tests/subsys/pm/power_states/pytest/conftest.py create mode 100644 tests/subsys/pm/power_states/pytest/test_power_states.py create mode 100644 tests/subsys/pm/power_states/src/main.c create mode 100644 tests/subsys/pm/power_states/testcase.yaml create mode 100644 tests/subsys/pm/power_wakeup_timer/CMakeLists.txt create mode 100644 tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.overlay create mode 100644 tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.yaml create mode 100644 tests/subsys/pm/power_wakeup_timer/prj.conf create mode 100644 tests/subsys/pm/power_wakeup_timer/pytest/conftest.py create mode 100644 tests/subsys/pm/power_wakeup_timer/pytest/test_wakeup_timer.py create mode 100644 tests/subsys/pm/power_wakeup_timer/src/main.c create mode 100644 tests/subsys/pm/power_wakeup_timer/testcase.yaml diff --git a/scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py b/scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py new file mode 100644 index 00000000000000..4ee7eeff99b278 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py @@ -0,0 +1,40 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski + +import string +from abc import ABC, abstractmethod + + +class PowerMonitor(ABC): + @abstractmethod + def init(self, device_id: string): + """ + Abstract method to initialize the power monitor. + + Agr: + string: Address of the power monitor + + Return: + bool: True of False. + """ + + @abstractmethod + def measure(self, duration: int): + """ + Abstract method to measure current with specified measurement time. + + Args: + duration (int): The duration of the measurement in seconds. + """ + + @abstractmethod + def get_data(self, duration: int) -> list[float]: + """ + Measure current with specified measurement time. + + Args: + duration (int): The duration of the measurement in seconds. + + Returns: + List[float]: An array of measured current values in amperes. + """ diff --git a/scripts/pm/power_monitor_stm32l562e_dk/PowerShield.py b/scripts/pm/power_monitor_stm32l562e_dk/PowerShield.py new file mode 100644 index 00000000000000..3b5c16b1d08988 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/PowerShield.py @@ -0,0 +1,488 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski + +import csv +import logging +import queue +import re +import threading +import time + +from scripts.pm.power_monitor_stm32l562e_dk.AbstractPowerMonitor import PowerMonitor +from scripts.pm.power_monitor_stm32l562e_dk.PowerShieldConfig import PowerShieldConf +from scripts.pm.power_monitor_stm32l562e_dk.PowerShieldData import PowerShieldData +from scripts.pm.power_monitor_stm32l562e_dk.SerialHandler import SerialHandler +from scripts.pm.power_monitor_stm32l562e_dk.UnityFunctions import UnityFunctions + + +class PowerShield(PowerMonitor): + def __init__(self): + """ + Initializes the PowerShield. + """ + self.handler = None + self.dataQueue = queue.Queue() + self.acqComplete = False + self.acqStart = False + self.target_voltage = None + self.target_temperature = None + self.acqTimeoutThread = None + self.power_shield_conf = PowerShieldConf() + self.power_shield_data = PowerShieldData() + + def init(self, power_device_path: str): + """ + Initializes the power monitor by setting up a serial handler + with the given device path and a baud rate of 3686400. + """ + self.handler = SerialHandler(power_device_path, 3686400) + self.connect() + # self.reset() + self.take_control() + self.set_voltage(self.power_shield_conf.target_voltage) + self.set_format(self.power_shield_conf.data_format) + self.set_func_mode(self.power_shield_conf.function_mode) + self.target_temperature = self.get_temperature(self.power_shield_conf.temperature_unit) + self.target_voltage = self.get_voltage_level() + + def connect(self): + """Opens the connection using the SerialHandler.""" + self.handler.open() + + def disconnect(self): + """Closes the connection using the SerialHandler.""" + self.handler.close() + + def send_command(self, command: str, expected_ack: str = None, ack: bool = False) -> str: + """ + Sends a command to the device, retrieves the response, + and optionally verifies the acknowledgment. + + :param command: The command to send. + :param expected_ack: The expected acknowledgment response (e.g., "ack htc"). + :return: The response received from the device. + """ + if not self.handler.is_open(): + logging.info(f"Error: Connection is not open. Cannot send command: {command}") + return "" + + logging.debug(f"Sending command: {command}") + self.handler.send_cmd(command) + if ack: + response = self.handler.receive_cmd() + logging.debug(f"Response: {response}") + + # Check if the response contains the expected acknowledgment + if expected_ack and expected_ack not in response: + logging.error(f"Error: Expected acknowledgment '{expected_ack}' not found.") + return "" + + return response + return 0 + + def test_communication(self): + """ + Sends a version command to the device. + """ + if not self.handler.is_open(): + logging.error("Error: Connection is not open. Cannot send version command.") + return "" + command = 'version' + logging.info(f"Sending command: {command}") + self.handler.send_cmd(command) + response = self.handler.receive_cmd() + logging.info(f"Response: {response}") + return response + + def reset(self): + """ + Sends the reset command ('PSRST') to the power monitor device, + closes the connection, waits for the reset process to complete, + and repeatedly attempts to reconnect until successful. + """ + command = "psrst" + + if not self.handler.is_open(): + logging.error("Error: Connection is not open. Cannot reset the device.") + return + + logging.info(f"Sending reset command: {command}") + self.handler.send_cmd(command) + + # Close the connection + self.handler.close() + self.handler.serial_connection = None + + time.sleep(5) + # Attempt to reopen the connection + try: + self.handler.open() + logging.info("Connection reopened after reset.") + except Exception as e: + logging.error(f"Failed to reopen connection after reset: {e}") + + def get_voltage_level(self) -> float: + """ + Sends the 'volt get' command and returns the voltage value as a float. + + :return: The voltage level as a float. + """ + command = 'volt get' + response = self.send_command(command, expected_ack="ack volt get", ack=True) + + # If response contains the expected acknowledgment, extract and return the voltage + if response: + parts = response.split() + try: + if len(parts) >= 5: + # Use regex to find a string that matches the pattern, e.g., "3292-03" + match = re.search(r'(\d+)-(\d+)', parts[5]) + if match: + # Extract the base (3292) and exponent (03) + base = match.group(1) + exponent = match.group(2) + + # Construct the scientific notation string (e.g., 3292e-03) + voltage_str = f"{base}e-{exponent}" + + # Convert the string into a float + voltage = float(voltage_str) + + # Return the voltage as a float + self.target_voltage = round(voltage, 3) + return self.target_voltage + except ValueError: + logging.error("Error: Could not convert temperature value.") + return None + else: + logging.warning("No response for voltage command.") + return None + + def get_temperature(self, unit: str = PowerShieldConf.TemperatureUnit.CELCIUS) -> float: + """ + Sends the temperature command and returns the temperature as a float. + + :param unit: The unit to request the temperature in, either 'degc' or 'degf'. + :return: The temperature value as a float. + """ + # Send the temp command with the unit + response = self.send_command(f"temp {unit}", expected_ack=f"ack temp {unit}", ack=True) + + # If response contains the expected acknowledgment, extract the temperature + if response: + try: + # Example response format: "PowerShield > ack temp degc 28.0" + parts = response.split() + if len(parts) >= 5 and parts[5].replace('.', '', 1).isdigit(): + # Extract temperature and convert to float + self.target_temetarute = float(parts[5]) + logging.info(f"Temperature: {self.target_temetarute} {unit}") + return self.target_temetarute + else: + print("Error: Temperature value not found in response.") + return None + except ValueError: + logging.error("Error: Could not convert temperature value.") + return None + else: + logging.warning("No response for temp command.") + return None + + def take_control(self) -> str: + """ + Sends the 'htc' command and verifies the acknowledgment. + + :return: The acknowledgment response or error message. + """ + return self.send_command("htc", expected_ack="ack htc", ack=True) + + def set_format(self, data_format: str = PowerShieldConf.DataFormat.ASCII_DEC): + """ + Sets the measurement data format. + The format can be either ASCII (decimal) or Binary (hexadecimal). + + :param data_format: The data format to set. + Options are 'ascii_dec' or 'bin_hexa'. + :return: None + """ + # Validate the input format + if data_format not in vars(PowerShieldConf.DataFormat).values(): + logging.warning( + f"Error: Invalid format '{data_format}'. " + "Valid options are 'ascii_dec' or 'bin_hexa'." + ) + return + + command = f"format {data_format}" + response = self.send_command(command, expected_ack=f"ack format {data_format}", ack=True) + + # If response contains the expected acknowledgment, the format was set successfully + if response: + logging.info(f"Data format set to {data_format}.") + else: + logging.warning(f"Failed to set data format to {data_format}.") + + def set_frequency(self, frequency: str): + """ + Sets the sampling frequency for the measurement. + The frequency can be any valid value from the list. + + :param frequency: The sampling frequency to set. + Valid options include: + {100 k, 50 k, 20 k, 10 k, 5 k, 2 k, 1 k, 500, 200, 100, 50, 20, 10, 5, 2, 1}. + + :return: None + """ + # Validate the input frequency + if frequency not in vars(PowerShieldConf.SamplingFrequency).values(): + logging.warning( + f"Error: Invalid frequency '{frequency}'." + "Valid options are:" + "100k, 50k, 20k, 10k, 5k, 2k, 1k, 500, 200, 100, 50, 20, 10, 5, 2, 1." + ) + return + + command = f"freq {frequency}" + response = self.send_command(command, expected_ack=f"ack freq {frequency}", ack=True) + + if response: + logging.info(f"Sampling frequency set to {frequency}.") + else: + logging.warning(f"Failed to set sampling frequency to {frequency}.") + + def set_acquire_time(self, acquire_time: str = '0'): + command = f"acqtime {acquire_time}" + response = self.send_command(command, expected_ack=f"ack acqtime {acquire_time}", ack=True) + + if response: + logging.info(f"Acquisition time set to {acquire_time}.") + else: + logging.warning(f"Failed to set acquisition time to {acquire_time}.") + + def set_voltage(self, voltage: str): + command = f"volt {voltage}" + response = self.send_command(command, expected_ack=f"ack volt {voltage}", ack=True) + + if response: + logging.info(f"Voltage set to {voltage}.") + else: + logging.warning(f"Failed to set voltage to {voltage}.") + + def set_func_mode(self, function_mode: str = PowerShieldConf.FunctionMode.HIGH): + """ + Sets the acquisition mode for current measurement. + The function_mode can be either 'optim' or 'high'. + + - 'optim': Priority on current resolution (100 nA - 10 mA) with max freq at 100 kHz. + - 'high': High current (30 µA - 10 mA), high frequency (50-100 kHz), high resolution. + + :param mode: The acquisition mode. Must be either 'optim' or 'high'. + :return: None + """ + # Validate the input format + if function_mode not in vars(PowerShieldConf.FunctionMode).values(): + logging.warning( + f"Error: Invalid format '{function_mode}'." + "Valid options are 'ascii_dec' or 'bin_hexa'." + ) + return + + command = f"funcmode {function_mode}" + response = self.send_command( + command, expected_ack=f"ack funcmode {function_mode}", ack=True + ) + + if response: + logging.info(f"Data format set to {function_mode}.") + else: + logging.warning(f"Failed to set data format to {function_mode}.") + + def acq_data(self): + """ + Continuously reads data from the serial port and puts it + into a queue until acquisition is complete. + """ + logging.info("Started data acquisition...") + while True: + # Read the first byte + first_byte = self.handler.read_bytes(1) + if len(first_byte) < 1 or self.acqComplete: # Exit conditions + logging.info("Stopping data acquisition...") + return + + # Check if it's metadata + if first_byte == b'\xf0': # Metadata marker + second_byte = self.handler.read_bytes(1) + # Handle metadata types + metadata_type = second_byte[0] + self.handle_metadata(metadata_type) + else: + # Not metadata, treat as data + if self.acqStart: + second_byte = self.handler.read_bytes(1) + data = [] + data.append(first_byte) + if len(second_byte) < 1 or self.acqComplete: + logging.info("Stopping data acquisition...") + return + data.append(second_byte) + amps = UnityFunctions.convert_to_amps( + UnityFunctions.bytes_to_twobyte_values(data) + ) + self.dataQueue.put([amps]) + + def handle_metadata(self, metadata_type): + if metadata_type == 0xF1: + logging.info("Received Metadata: ASCII error message.") + # self.handle_metadata_error() + elif metadata_type == 0xF2: + logging.info("Received Metadata: ASCII information message.") + # self.handle_metadata_info() + elif metadata_type == 0xF3: + logging.info("Received Metadata: Timestamp message.") + self.handle_metadata_timestamp() + self.acqStart = True + elif metadata_type == 0xF4: + logging.info("Received Metadata: End of acquisition tag.") + self.handle_metadata_end() + self.handle_summary() + elif metadata_type == 0xF5: + logging.info("Received Metadata: Overcurrent detected.") + # self.handle_metadata_overcurrent() + else: + logging.warning(f"Unknown Metadata Type: {metadata_type:#04x}") + + def handle_summary(self): + s = "" + while True: + # Read the first byte + x = self.handler.read_bytes(1) + if len(x) < 1 or x == 0xF0: + self.acqComplete = True + return s.replace("\0", "").strip().replace("\r", "").replace("\n\n\n", "\n") + s += str(x, encoding='ascii', errors='ignore') + + def handle_metadata_end(self): + """ + Handle metadata end of acquisition message. + """ + # Read the next 4 bytes + metadata_bytes = self.handler.read_bytes(2) + if len(metadata_bytes) < 2: + print("Incomplete end of acquisition metadata reveived.") + return + # Check for end tags (last 2 bytes) + end_tag_1 = metadata_bytes[0] + end_tag_2 = metadata_bytes[1] + if end_tag_1 != 0xFF or end_tag_2 != 0xFF: + logging.warning("Invalid metadata end tags received.") + return + + def handle_metadata_timestamp(self): + """ + Handle metadata timestamp message. Parses and displays the timestamp and buffer load. + """ + # Read the next 7 bytes (timestamp + buffer load + end tags) + metadata_bytes = self.handler.read_bytes(7) + if len(metadata_bytes) < 7: + logging.warning("Incomplete timestamp metadata received.") + return + + # Parse the timestamp (4 bytes, big-endian) + timestamp_ms = int.from_bytes(metadata_bytes[0:4], byteorder='big', signed=False) + # Parse the buffer Tx load value (1 byte) + buffer_load = metadata_bytes[4] + # Check for end tags (last 2 bytes) + end_tag_1 = metadata_bytes[5] + end_tag_2 = metadata_bytes[6] + if end_tag_1 != 0xFF or end_tag_2 != 0xFF: + logging.warning("Invalid metadata end tags received.") + return + + # Display parsed values + logging.info(f"Metadata Timestamp: {timestamp_ms} ms") + logging.info(f"Buffer Tx Load: {buffer_load}%") + + def start_measurement(self): + """ + Starts the measurement by sending the 'start' command. Once the measurement starts, + data can be received continuously until the 'stop' command is sent. + + :return: None + """ + command = "start" + self.acqComplete = False + self.send_command(command) + + raw_to_file_Thread = threading.Thread( + target=self.raw_to_file, args=(self.power_shield_conf.output_file,) + ) + raw_to_file_Thread.start() + logging.info("Measurement started. Receiving data...") + self.acq_data() + raw_to_file_Thread.join() + + def raw_to_file(self, outputFilePath: str): + # Open a CSV file for writing + with open(outputFilePath, 'w', newline='') as outputFile: + writer = csv.writer(outputFile) + while True: + if self.dataQueue.empty() and bool(self.acqComplete): + outputFile.close() + return + if not self.dataQueue.empty(): + data = self.dataQueue.get() + writer.writerow(data) + outputFile.flush() + else: + time.sleep(0.1) + + def measure(self, time: int, freq: str = None, reset: bool = False): + self.power_shield_conf.acquisition_time = time + _time, self.power_shield_conf.acquisition_time_unit = UnityFunctions.convert_acq_time(time) + + if reset: + self.reset() + self.take_control() + self.set_format(self.power_shield_conf.data_format) + if freq is not None: + self.set_frequency(freq) + else: + self.set_frequency(self.power_shield_conf.sampling_freqency) + self.set_acquire_time( + UnityFunctions.convert_to_scientific_notation( + time=_time, unit=self.power_shield_conf.acquisition_time_unit + ) + ) + self.start_measurement() + + # calculate the data + # self.get_measured_data(measure_unit) + + def get_data(self, unit: str = PowerShieldConf.MeasureUnit.RAW_DATA): + if self.acqComplete: + # Open the CSV file + with open(self.power_shield_conf.output_file) as file: + csv_reader = csv.reader(file) + for row in csv_reader: + self.power_shield_data.data.append(row[0]) + if unit == PowerShieldConf.MeasureUnit.CURRENT_RMS: + self.power_shield_data.current_RMS = UnityFunctions.calculate_rms( + self.power_shield_data.data + ) + return self.power_shield_data.current_RMS + elif unit == PowerShieldConf.MeasureUnit.POWER: + _delta_time = self.power_shield_conf.acquisition_time + self.power_shield_data.power = 0 + for data in self.power_shield_data.data: + self.power_shield_data.power += float( + float(data) * float(_delta_time) * float(self.target_voltage) + ) + return self.power_shield_data.power + elif unit == PowerShieldConf.MeasureUnit.RAW_DATA: + return self.power_shield_data.data + else: + logging.warning("Unknown unit of requested data") + else: + logging.info("Acquisition not complete.") + return None diff --git a/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldConfig.py b/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldConfig.py new file mode 100644 index 00000000000000..105f27d76165c5 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldConfig.py @@ -0,0 +1,84 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski +class PowerShieldConf: + class PowerMode: + """ + Class representing power mode + """ + + AUTO = "auto" # Power-on when acquisition start + ON = "on" # Power-on manually + OFF = "off" # Power-off manually + + class MeasureUnit: + """ + Class representing measure units. + """ + + VOLTAGE = "voltage" # Target Volatege + CURRENT_RMS = "current_rms" # Current RMS value + POWER = "power" # Total power consumption + RAW_DATA = "rawdata" # Get Raw Data (current probes) + + class TemperatureUnit: + """ + Class representing temperature units. + """ + + CELCIUS = "degc" # Celsius temperature unit + FAHRENHEIT = "degf" # Fahrenheit temperature unit + + class FunctionMode: + """ + Class representing functional modes of a power monitor. + """ + + OPTIM = "optim" # Optimized mode for lower power or efficiency + HIGH = "high" # High performance mode + + class DataFormat: + """ + Class representing different data formats for representation. + """ + + ASCII_DEC = "ascii_dec" # ASCII encoded decimal format + BIN_HEXA = "bin_hexa" # Binary/hexadecimal format + + class SamplingFrequency: + """ + Class representing various sampling frequencies. + """ + + FREQ_100K = "100k" # 100 kHz frequency + FREQ_50K = "50k" # 50 kHz frequency + FREQ_20K = "20k" # 20 kHz frequency + FREQ_10K = "10k" # 10 kHz frequency + FREQ_5K = "5k" # 5 kHz frequency + FREQ_2K = "2k" # 2 kHz frequency + FREQ_1K = "1k" # 1 kHz frequency + FREQ_500 = "500" # 500 Hz frequency + FREQ_200 = "200" # 200 Hz frequency + FREQ_100 = "100" # 100 Hz frequency + FREQ_50 = "50" # 50 Hz frequency + FREQ_20 = "20" # 20 Hz frequency + FREQ_10 = "10" # 10 Hz frequency + FREQ_5 = "5" # 5 Hz frequency + FREQ_2 = "2" # 2 Hz frequency + FREQ_1 = "1" # 1 Hz frequency + + def __init__( + self, + data_format: str = DataFormat.BIN_HEXA, + temperature_unit: str = TemperatureUnit.CELCIUS, + target_voltage: str = "3300m", + function_mode: str = FunctionMode.HIGH, + sampling_frequency: str = SamplingFrequency.FREQ_1K, + output_file: str = "rawData.csv", + ): + self.output_file = output_file + self.sampling_freqency = sampling_frequency + self.data_format = data_format + self.function_mode = function_mode + self.target_voltage = target_voltage + self.temperature_unit = temperature_unit + self.acquisition_time = None diff --git a/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldData.py b/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldData.py new file mode 100644 index 00000000000000..01df97ca6074a2 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldData.py @@ -0,0 +1,9 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski + + +class PowerShieldData: + def __init__(self): + self.data = [] + self.current_RMS = None + self.power = None diff --git a/scripts/pm/power_monitor_stm32l562e_dk/SerialHandler.py b/scripts/pm/power_monitor_stm32l562e_dk/SerialHandler.py new file mode 100644 index 00000000000000..f1eef4a3dcf936 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/SerialHandler.py @@ -0,0 +1,84 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski + +import logging + +import serial + + +class SerialHandler: + def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0): + """ + Initializes the class for serial communication. + + :param port: The serial port name (e.g., 'COM1', '/dev/ttyUSB0'). + :param baudrate: The baud rate for the connection (default is 9600). + :param timeout: The timeout for read operations in seconds (default is 1.0). + """ + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.serial_connection = None + + def open(self): + """ + Opens the serial connection. + """ + if self.serial_connection is None: + try: + self.serial_connection = serial.Serial( + self.port, self.baudrate, timeout=self.timeout + ) + logging.info( + "Connection to %s at %d baud opened successfully.", self.port, self.baudrate + ) + except serial.SerialException as e: + logging.error("Error opening serial port %s: %s", self.port, str(e)) + self.serial_connection = None + + def close(self): + """Closes the serial connection.""" + if self.serial_connection and self.serial_connection.is_open: + self.serial_connection.close() + logging.info("Serial connection closed.") + + def send_cmd(self, cmd: str): + """ + Sends a command to the serial device with a newline, and prints it. + + :param cmd: The command to be sent. + """ + if self.serial_connection and self.serial_connection.is_open: + try: + self.serial_connection.write((cmd + "\r\n").encode('ascii')) + except serial.SerialException as e: + logging.error(f"Error sending command: {e}") + + def read_bytes(self, count: int): + if self.serial_connection: + x = self.serial_connection.read(count) + return x + + def receive_cmd(self) -> str: + """ + Reads data from the serial device until no more data is available. + + :return: The processed received data as a string. + """ + s = "" + if self.serial_connection and self.serial_connection.is_open: + while True: + x = self.serial_connection.read() + if len(x) < 1 or x == 0xF0: + return s.replace("\0", "").strip().replace("\r", "").replace("\n\n\n", "\n") + s += str(x, encoding='ascii', errors='ignore') + + def is_open(self) -> bool: + """Checks if the connection is open.""" + return self.serial_connection and self.serial_connection.is_open + + def __del__(self): + """Closes the connection if the object is deleted.""" + self.close() + # Shut down logging + logging.shutdown() diff --git a/scripts/pm/power_monitor_stm32l562e_dk/UnityFunctions.py b/scripts/pm/power_monitor_stm32l562e_dk/UnityFunctions.py new file mode 100644 index 00000000000000..2c5d75a7acc9fe --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/UnityFunctions.py @@ -0,0 +1,137 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski + +import numpy as np +from scipy import signal + + +class UnityFunctions: + @staticmethod + def convert_acq_time(value): + """ + Converts an acquisition time value to a more readable format with units. + - Converts values to m (milli), u (micro), or leaves them as is for whole numbers. + + :param value: The numeric value to convert. + :return: A tuple with the value and the unit as separate elements. + """ + if value < 1e-3: + # If the value is smaller than 1 millisecond (10^-3), express in micro (u) + return f"{value * 1e6:.0f}", "us" + elif value < 1: + # If the value is smaller than 1 but larger than or equal to 1 milli (10^-3) + return f"{value * 1e3:.0f}", "ms" + else: + # If the value is 1 or larger, express in seconds (s) + return f"{value:.0f}", "s" + + @staticmethod + def calculate_rms(data): + """ + Calculate the Root Mean Square (RMS) of a given data array. + + :param data: List or numpy array containing the data + :return: RMS value + """ + # Convert to a numpy array for easier mathematical operations + data_array = np.array(data, dtype=np.float64) # Convert to float64 to avoid type issues + + # Calculate the RMS value + rms = np.sqrt(np.mean(np.square(data_array))) + return rms + + @staticmethod + def bytes_to_twobyte_values(data): + value = int.from_bytes(data[0], 'big') << 8 | int.from_bytes(data[1], 'big') + return value + + @staticmethod + def convert_to_amps(value): + """ + Convert amps to watts + """ + amps = (value & 4095) * (16 ** (0 - (value >> 12))) + return amps + + @staticmethod + def convert_to_scientific_notation(time: int, unit: str) -> str: + """ + Converts time to scientific notation based on the provided unit. + :param time: The time value to convert. + :param unit: The unit of the time ('us', 'ms', or 's'). + :return: A string representing the time in scientific notation. + """ + if unit == 'us': # microseconds + return f"{time}-6" + elif unit == 'ms': # milliseconds + return f"{time}-3" + elif unit == 's': # seconds + return f"{time}" + else: + raise ValueError("Invalid unit. Use 'us', 'ms', or 's'.") + + @staticmethod + def current_RMS(data, exclude_first_600=True, num_peaks=1): + """ + Function to process a given data array, find peaks, split data into chunks, + and then compute the RMS value for each chunk. The first 600 elements can be excluded + based on the parameter, and the number of peaks to consider is configurable. + + Args: + - data (list or numpy array): The input data for RMS calculation. + - exclude_first_600 (bool): Whether to exclude the first 600 elements of the data. + - num_peaks (int): The number of peaks to consider for chunking. + + Returns: + - rms_values (list): A list of RMS values calculated from the data chunks. + """ + + # Optionally exclude the first 600 elements of the data + if exclude_first_600: + data = data[600:] + + # Convert the data to a list of floats for consistency + data = [float(x) for x in data] + + # Find the peaks in the data using the `find_peaks` function + peaks = signal.find_peaks(data, distance=40, height=0.008)[0] + + # Check if we have enough peaks, otherwise raise an exception + if len(peaks) < num_peaks: + raise ValueError( + f"Not enough peaks detected. Expected at least {num_peaks}, but found {len(peaks)}." + ) + + # Limit the number of peaks based on the `num_peaks` parameter + peaks = peaks[:num_peaks] + + # Add the start (index 0) and end (index of the last data point) to the list of peak indices + indices = np.concatenate(([0], np.array(peaks), [len(data)])) + + # Split the data into chunks based on the peak indices + # with padding of 40 elements at both ends + split_data = [data[indices[i] + 40 : indices[i + 1] - 40] for i in range(len(indices) - 1)] + + # Function to calculate RMS for a given list of data chunks + def calculate_rms(chunks): + """ + Helper function to compute RMS values for each data chunk. + + Args: + - chunks (list): A list of data chunks. + + Returns: + - rms (list): A list of RMS values, one for each data chunk. + """ + rms = [] + for chunk in chunks: + # Calculate RMS by taking the square root of the mean of squared values + rms_value = np.sqrt(np.mean(np.square(chunk))) + rms.append(rms_value) + return rms + + # Calculate RMS for each chunk of data + rms_values = calculate_rms(split_data) + + # Return the calculated RMS values + return rms_values diff --git a/tests/subsys/pm/power_residency_time/CMakeLists.txt b/tests/subsys/pm/power_residency_time/CMakeLists.txt new file mode 100644 index 00000000000000..5c6f384a1092f1 --- /dev/null +++ b/tests/subsys/pm/power_residency_time/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(power_residency_time) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.overlay b/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.overlay new file mode 100644 index 00000000000000..ee4d80f9e903d3 --- /dev/null +++ b/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.overlay @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +&stop0{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <1>; + min-residency-us = <500000>; +}; +&stop1{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <2>; + min-residency-us = <1000000>; +}; +&stop2{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <3>; + min-residency-us = <1500000>; +}; diff --git a/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.yaml b/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.yaml new file mode 100644 index 00000000000000..f316ecbef249e7 --- /dev/null +++ b/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.yaml @@ -0,0 +1,11 @@ +power_states: + k_cpu_idle: + rms: 56.0 + state_0: + rms: 4.0 + state_1: + rms: 1.3 + state_2: + rms: 0.27 + active: + rms: 134 diff --git a/tests/subsys/pm/power_residency_time/prj.conf b/tests/subsys/pm/power_residency_time/prj.conf new file mode 100644 index 00000000000000..76be0e2b554363 --- /dev/null +++ b/tests/subsys/pm/power_residency_time/prj.conf @@ -0,0 +1,4 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +CONFIG_PM=y diff --git a/tests/subsys/pm/power_residency_time/pytest/conftest.py b/tests/subsys/pm/power_residency_time/pytest/conftest.py new file mode 100644 index 00000000000000..858705060f3ca9 --- /dev/null +++ b/tests/subsys/pm/power_residency_time/pytest/conftest.py @@ -0,0 +1,33 @@ +# Copyright (c) 2025 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import yaml +from twister_harness import DeviceAdapter + +from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield + + +def pytest_addoption(parser): + parser.addoption("--powershield", help="Path to PowerShield device") + parser.addoption("--expected-values", help="Path to yaml file with expected values") + + +def load_power_state_data(path): + with open(path) as file: + return yaml.safe_load(file) + + +@pytest.fixture +def expected_values(request): + expected_values_file = request.config.getoption("--expected-values") + return load_power_state_data(expected_values_file)['power_states'] + + +@pytest.fixture(scope='module') +def power_monitor_measure(dut: DeviceAdapter, request): + powershield_device = request.config.getoption("--powershield") + PM_Device = PowerShield() + PM_Device.init(power_device_path=powershield_device) + PM_Device.measure(time=8) + return PM_Device.get_data() diff --git a/tests/subsys/pm/power_residency_time/pytest/test_residency_time.py b/tests/subsys/pm/power_residency_time/pytest/test_residency_time.py new file mode 100644 index 00000000000000..96fe51f4d97683 --- /dev/null +++ b/tests/subsys/pm/power_residency_time/pytest/test_residency_time.py @@ -0,0 +1,38 @@ +# Copyright (c) 2025 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import logging + +from scripts.pm.power_monitor_stm32l562e_dk.UnityFunctions import UnityFunctions + + +def test_residency_time(power_monitor_measure, expected_values): + data = power_monitor_measure + pm_states_expected_values = expected_values.items() + + value_in_amps = UnityFunctions.calculate_rms(data, exclude_first_600=True, num_peaks=6) + # Convert to milliamps + value_in_miliamps = [value * 1e4 for value in value_in_amps] + logging.info(f"Current RMS [mA]: {value_in_miliamps}") + + def print_state_rms(state_label, rms_expected_value, rms_measured_value): + tolerance = rms_expected_value / 10 + + logging.info(f"{state_label}:") + logging.info(f"Expected RMS: {rms_expected_value:.2f} mA") + logging.info(f"Tolerance: {tolerance:.2f} mA") + logging.info(f"Current RMS: {rms_measured_value:.2f} mA") + + assert ( + (rms_expected_value - tolerance) < rms_measured_value < (rms_expected_value + tolerance) + ), f"RMS value out of tolerance for {state_label}" + + # Define the state labels + state_labels = ["k_cpu_idle", "state_0", "state_0", "state_1", "state_1", "state_2", "active"] + + # Convert dict_items to dictionary if needed + pm_states_expected_values = dict(pm_states_expected_values) + # Now you can use indexing or key-based access + for state_label, rms_value in zip(state_labels, value_in_miliamps, strict=False): + rms_expected_value = pm_states_expected_values[state_label]['rms'] + print_state_rms(state_label, rms_expected_value, rms_value) diff --git a/tests/subsys/pm/power_residency_time/src/main.c b/tests/subsys/pm/power_residency_time/src/main.c new file mode 100644 index 00000000000000..ae67ab90dbee4e --- /dev/null +++ b/tests/subsys/pm/power_residency_time/src/main.c @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +#define DIFF_RESIDENCY_TIME_STATE_0 400 +#define DIFF_RESIDENCY_TIME_STATE_1 400 +#define DIFF_RESIDENCY_TIME_STATE_2 400 + +int main(void) +{ + + int stop0_min_residency_time = DT_PROP(DT_NODELABEL(stop0), min_residency_us); + int stop1_min_residency_time = DT_PROP(DT_NODELABEL(stop1), min_residency_us); + int stop2_min_residency_time = DT_PROP(DT_NODELABEL(stop2), min_residency_us); + + while (1) { + printk("\nSleep time < min_residency_time of state 0\n"); + k_usleep(stop0_min_residency_time - DIFF_RESIDENCY_TIME_STATE_0); + printk("\nSleep time = min_residency_time of state 0\n"); + k_usleep(stop0_min_residency_time); + printk("\nSleep time < min_residency_time of state 1\n"); + k_usleep(stop1_min_residency_time - DIFF_RESIDENCY_TIME_STATE_1); + printk("\nSleep time = min_residency_time of state 1\n"); + k_usleep(stop1_min_residency_time); + printk("\nSleep time < min_residency_time of state 2\n"); + k_usleep(stop2_min_residency_time - DIFF_RESIDENCY_TIME_STATE_2); + printk("\nSleep time = min_residency_time of state 2\n"); + k_usleep(stop2_min_residency_time); + printk("\nWakeup.\n"); + + /** + * Keeping alive loop + */ + while (1) { + arch_nop(); + } + } + return 0; +} diff --git a/tests/subsys/pm/power_residency_time/testcase.yaml b/tests/subsys/pm/power_residency_time/testcase.yaml new file mode 100644 index 00000000000000..56a5baedc1ffd6 --- /dev/null +++ b/tests/subsys/pm/power_residency_time/testcase.yaml @@ -0,0 +1,14 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +tests: + pm.power_residency_time: + platform_allow: + - stm32l562e_dk + tags: + - pm + harness: pytest + harness_config: + pytest_dut_scope: session + pytest_args: [--powershield=/STMicroelectronics_PowerShield__Virtual_ComPort, + --expected-values=/stm32l562e_dk.yaml] diff --git a/tests/subsys/pm/power_states/CMakeLists.txt b/tests/subsys/pm/power_states/CMakeLists.txt new file mode 100644 index 00000000000000..83e018923b666a --- /dev/null +++ b/tests/subsys/pm/power_states/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(power_states) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/subsys/pm/power_states/boards/stm32l562e_dk.overlay b/tests/subsys/pm/power_states/boards/stm32l562e_dk.overlay new file mode 100644 index 00000000000000..d128ff520ca505 --- /dev/null +++ b/tests/subsys/pm/power_states/boards/stm32l562e_dk.overlay @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ +/ { + zephyr,user { + k_idle_state_min_residency_time = <400000>; + }; +}; + +&stop0{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <1>; + min-residency-us = <500000>; +}; +&stop1{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <2>; + min-residency-us = <1000000>; +}; +&stop2{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <3>; + min-residency-us = <1500000>; +}; diff --git a/tests/subsys/pm/power_states/boards/stm32l562e_dk.yaml b/tests/subsys/pm/power_states/boards/stm32l562e_dk.yaml new file mode 100644 index 00000000000000..f316ecbef249e7 --- /dev/null +++ b/tests/subsys/pm/power_states/boards/stm32l562e_dk.yaml @@ -0,0 +1,11 @@ +power_states: + k_cpu_idle: + rms: 56.0 + state_0: + rms: 4.0 + state_1: + rms: 1.3 + state_2: + rms: 0.27 + active: + rms: 134 diff --git a/tests/subsys/pm/power_states/prj.conf b/tests/subsys/pm/power_states/prj.conf new file mode 100644 index 00000000000000..a4a878acbf1dd0 --- /dev/null +++ b/tests/subsys/pm/power_states/prj.conf @@ -0,0 +1,5 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +CONFIG_GPIO=y +CONFIG_PM=y diff --git a/tests/subsys/pm/power_states/pytest/conftest.py b/tests/subsys/pm/power_states/pytest/conftest.py new file mode 100644 index 00000000000000..5b4726beb42594 --- /dev/null +++ b/tests/subsys/pm/power_states/pytest/conftest.py @@ -0,0 +1,33 @@ +# Copyright (c) 2025 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import yaml +from twister_harness import DeviceAdapter + +from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield + + +def pytest_addoption(parser): + parser.addoption("--powershield", help="Path to PowerShield device") + parser.addoption("--expected-values", help="Path to yaml file with expected values") + + +def load_power_state_data(path): + with open(path) as file: + return yaml.safe_load(file) + + +@pytest.fixture +def expected_values(request): + expected_values_file = request.config.getoption("--expected-values") + return load_power_state_data(expected_values_file)['power_states'] + + +@pytest.fixture(scope='module') +def power_monitor_measure(dut: DeviceAdapter, request): + powershield_device = request.config.getoption("--powershield") + PM_Device = PowerShield() + PM_Device.init(power_device_path=powershield_device) + PM_Device.measure(time=6) + return PM_Device.get_data() diff --git a/tests/subsys/pm/power_states/pytest/test_power_states.py b/tests/subsys/pm/power_states/pytest/test_power_states.py new file mode 100644 index 00000000000000..dc819400edbbcb --- /dev/null +++ b/tests/subsys/pm/power_states/pytest/test_power_states.py @@ -0,0 +1,38 @@ +# Copyright (c) 2025 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import logging + +from scripts.pm.power_monitor_stm32l562e_dk.UnityFunctions import UnityFunctions + + +def test_power_states_without_pm_device(power_monitor_measure, expected_values): + data = power_monitor_measure + pm_states_expected_values = expected_values.items() + + value_in_amps = UnityFunctions.current_RMS(data, exclude_first_600=True, num_peaks=4) + # Convert to milliamps + value_in_miliamps = [value * 1e4 for value in value_in_amps] + logging.info(f"Current RMS [mA]: {value_in_miliamps}") + + def print_state_rms(state_label, rms_expected_value, rms_measured_value): + tolerance = rms_expected_value / 10 + + logging.info(f"{state_label}:") + logging.info(f"Expected RMS: {rms_expected_value:.2f} mA") + logging.info(f"Tolerance: {tolerance:.2f} mA") + logging.info(f"Current RMS: {rms_measured_value:.2f} mA") + + assert ( + (rms_expected_value - tolerance) < rms_measured_value < (rms_expected_value + tolerance) + ), f"RMS value out of tolerance for {state_label}" + + # Define the state labels + state_labels = ["k_cpu_idle", "state_0", "state_1", "state_2", "active"] + + # Convert dict_items to dictionary if needed + pm_states_expected_values = dict(pm_states_expected_values) + # Now you can use indexing or key-based access + for state_label, rms_value in zip(state_labels, value_in_miliamps, strict=False): + rms_expected_value = pm_states_expected_values[state_label]['rms'] + print_state_rms(state_label, rms_expected_value, rms_value) diff --git a/tests/subsys/pm/power_states/src/main.c b/tests/subsys/pm/power_states/src/main.c new file mode 100644 index 00000000000000..fb043b36d780b4 --- /dev/null +++ b/tests/subsys/pm/power_states/src/main.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include + +int main(void) +{ + int k_idle_min_residency_time = + DT_PROP(DT_PATH(zephyr_user), k_idle_state_min_residency_time); + int stop0_min_residency_time = DT_PROP(DT_NODELABEL(stop0), min_residency_us); + int stop1_min_residency_time = DT_PROP(DT_NODELABEL(stop1), min_residency_us); + int stop2_min_residency_time = DT_PROP(DT_NODELABEL(stop2), min_residency_us); + + while (1) { + printk("\nGoing to k_cpu_idle.\n"); + k_usleep(k_idle_min_residency_time); + printk("\nWake Up.\n"); + printk("\nGoing to state 0.\n"); + k_usleep(stop0_min_residency_time); + printk("\nWake Up.\n"); + printk("\nGoing to state 1.\n"); + k_usleep(stop1_min_residency_time); + printk("\nWake Up.\n"); + printk("\nGoing to state 2.\n"); + k_usleep(stop2_min_residency_time); + printk("\nWake Up.\n"); + + /* Anti-sleeping loop */ + while (1) { + arch_nop(); + } + } + return 0; +} diff --git a/tests/subsys/pm/power_states/testcase.yaml b/tests/subsys/pm/power_states/testcase.yaml new file mode 100644 index 00000000000000..b0e263f060b8a2 --- /dev/null +++ b/tests/subsys/pm/power_states/testcase.yaml @@ -0,0 +1,14 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +tests: + pm.power_states: + platform_allow: + - stm32l562e_dk + tags: + - pm + harness: pytest + harness_config: + pytest_dut_scope: session + pytest_args: [--powershield=/STMicroelectronics_PowerShield__Virtual_ComPort, + --expected-values=/stm32l562e_dk.yaml] diff --git a/tests/subsys/pm/power_wakeup_timer/CMakeLists.txt b/tests/subsys/pm/power_wakeup_timer/CMakeLists.txt new file mode 100644 index 00000000000000..1af597083c5560 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(power_wakeup_timer) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.overlay b/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.overlay new file mode 100644 index 00000000000000..5a7550b22e072c --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.overlay @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +&rtc { + compatible = "st,stm32-rtc"; + reg = < 0x40002800 0xc00 >; + interrupts = < 0x2 0x0 >; + clocks = < &rcc 0x58 0x400 >, < &rcc 0x2 0x16890 >; + prescaler = < 0x8000 >; + status = "okay"; + wakeup-source; +}; diff --git a/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.yaml b/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.yaml new file mode 100644 index 00000000000000..0cb756aa52607c --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.yaml @@ -0,0 +1,11 @@ +power_states: + k_cpu_idle: + rms: 56.0 + state_0: + rms: 4.0 + state_1: + rms: 1.3 + state_2: + rms: 0.27 + active: + rms: 94 diff --git a/tests/subsys/pm/power_wakeup_timer/prj.conf b/tests/subsys/pm/power_wakeup_timer/prj.conf new file mode 100644 index 00000000000000..2cb955a58771e5 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/prj.conf @@ -0,0 +1,7 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +CONFIG_RTC=y +CONFIG_RTC_ALARM=y +CONFIG_RTC_STM32=y +CONFIG_PM=y diff --git a/tests/subsys/pm/power_wakeup_timer/pytest/conftest.py b/tests/subsys/pm/power_wakeup_timer/pytest/conftest.py new file mode 100644 index 00000000000000..381b8c6c10f57b --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/pytest/conftest.py @@ -0,0 +1,33 @@ +# Copyright (c) 2025 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import yaml +from twister_harness import DeviceAdapter + +from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield + + +def pytest_addoption(parser): + parser.addoption("--powershield", help="Path to PowerShield device") + parser.addoption("--expected-values", help="Path to yaml file with expected values") + + +def load_power_state_data(path): + with open(path) as file: + return yaml.safe_load(file) + + +@pytest.fixture +def expected_values(request): + expected_values_file = request.config.getoption("--expected-values") + return load_power_state_data(expected_values_file)['power_states'] + + +@pytest.fixture(scope='module') +def power_monitor_measure(dut: DeviceAdapter, request): + powershield_device = request.config.getoption("--powershield") + PM_Device = PowerShield() + PM_Device.init(power_device_path=powershield_device) + PM_Device.measure(time=2) + return PM_Device.get_data() diff --git a/tests/subsys/pm/power_wakeup_timer/pytest/test_wakeup_timer.py b/tests/subsys/pm/power_wakeup_timer/pytest/test_wakeup_timer.py new file mode 100644 index 00000000000000..e3543104de7d25 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/pytest/test_wakeup_timer.py @@ -0,0 +1,35 @@ +# Copyright (c) 2025 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +from scripts.pm.power_monitor_stm32l562e_dk.UnityFunctions import UnityFunctions + + +def test_wakeup_timer(current_measurement_output, expected_values): + data = current_measurement_output + pm_states_expected_values = expected_values.items() + + value_in_amps = UnityFunctions.current_RMS(data, exclude_first_600=True, num_peaks=1) + # Convert to milliamps + value_in_miliamps = [value * 1e4 for value in value_in_amps] + + def print_state_rms(state_label, rms_expected_value, rms_measured_value): + tolerance = rms_expected_value / 10 + + print(f"{state_label}:") + print(f"Expected RMS: {rms_expected_value:.2f} mA") + print(f"Tolerance: {tolerance:.2f} mA") + print(f"Current RMS: {rms_measured_value:.2f} mA") + + assert ( + (rms_expected_value - tolerance) < rms_measured_value < (rms_expected_value + tolerance) + ), f"RMS value out of tolerance for {state_label}" + + # Define the state labels + state_labels = ["stop_2", "active"] + + # Convert dict_items to dictionary if needed + pm_states_expected_values = dict(pm_states_expected_values) + # Now you can use indexing or key-based access + for state_label, rms_value in zip(state_labels, value_in_miliamps, strict=False): + rms_expected_value = pm_states_expected_values[state_label]['rms'] + print_state_rms(state_label, rms_expected_value, rms_value) diff --git a/tests/subsys/pm/power_wakeup_timer/src/main.c b/tests/subsys/pm/power_wakeup_timer/src/main.c new file mode 100644 index 00000000000000..e20d18db697fd7 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/src/main.c @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +#define RTC_ALARM_FIELDS \ + (RTC_ALARM_TIME_MASK_SECOND | RTC_ALARM_TIME_MASK_MINUTE | RTC_ALARM_TIME_MASK_HOUR) + +static const struct device *rtc = DEVICE_DT_GET(DT_NODELABEL(rtc)); + +volatile k_tid_t my_tid; + +#define MY_STACK_SIZE 500 +#define MY_PRIORITY 5 + +struct k_thread my_thread_data; +K_THREAD_STACK_DEFINE(my_stack_area, MY_STACK_SIZE); + +void my_entry_point(void *, void *, void *) +{ + printk("Going sleep.\n"); + k_msleep(3000); +} + +/* Fri Jan 01 2021 13:29:50 GMT+0000 */ +static const struct rtc_time rtc_time = { + .tm_sec = 50, + .tm_min = 29, + .tm_hour = 13, + .tm_mday = 1, + .tm_mon = 0, + .tm_year = 121, + .tm_wday = 5, + .tm_yday = 1, + .tm_isdst = -1, + .tm_nsec = 0, +}; + +/* Fri Jan 01 2021 13:29:51 GMT+0000 */ +static const struct rtc_time alarm_time = { + .tm_sec = 51, + .tm_min = 29, + .tm_hour = 13, + .tm_mday = 1, + .tm_mon = 0, + .tm_year = 121, + .tm_wday = 5, + .tm_yday = 1, + .tm_isdst = -1, + .tm_nsec = 0, +}; + +static void wakeup_cb(const struct device *dev, uint16_t id, void *user_data) +{ + printk("Wake up by alarm.\n"); + k_thread_abort(my_tid); +} + +int main(void) +{ + int ret; + uint16_t mask = RTC_ALARM_FIELDS; + + ret = rtc_set_time(rtc, &rtc_time); + if (ret < 0) { + return 0; + } + + ret = rtc_alarm_set_time(rtc, 0, mask, &alarm_time); + if (ret < 0) { + return 0; + } + + ret = rtc_alarm_set_callback(rtc, 0, wakeup_cb, NULL); + if (ret < 0) { + return 0; + } + + printk("Created the thread.\n"); + my_tid = k_thread_create(&my_thread_data, my_stack_area, + K_THREAD_STACK_SIZEOF(my_stack_area), my_entry_point, NULL, NULL, + NULL, MY_PRIORITY, 0, K_NO_WAIT); + + k_thread_join(my_tid, K_FOREVER); + + while (1) { + arch_nop(); + } + return 0; +} diff --git a/tests/subsys/pm/power_wakeup_timer/testcase.yaml b/tests/subsys/pm/power_wakeup_timer/testcase.yaml new file mode 100644 index 00000000000000..083acda8061133 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/testcase.yaml @@ -0,0 +1,27 @@ +# Copyright (c) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +tests: + pm.power_wakeup_timer: + platform_allow: + - stm32l562e_dk + tags: + - pm + harness: pytest + harness_config: + pytest_dut_scope: session + pytest_args: [--powershield=/STMicroelectronics_PowerShield__Virtual_ComPort, + --expected-values=/stm32l562e_dk.yaml] + pm.power_wakeup_timer.pm_device: + platform_allow: + - stm32l562e_dk + tags: + - pm + extra_args: + - CONFIG_PM_DEVICE=y + - CONFIG_PM_DEVICE_RUNTIME=y + harness: pytest + harness_config: + pytest_dut_scope: session + pytest_args: [--powershield=/STMicroelectronics_PowerShield__Virtual_ComPort, + --expected-values=/stm32l562e_dk.yaml]