diff --git a/scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py b/scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py new file mode 100644 index 000000000000000..54dd12d99b0c851 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/AbstractPowerMonitor.py @@ -0,0 +1,42 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski +# MIT License, see (https://opensource.org/license/mit/) + +import string +from typing import List +from abc import ABC, abstractmethod + +class PowerMonitor(ABC): + + @abstractmethod + def init(self, device_id:string): + """ + Abstract method to initialize the power monitor. + + Agr: + string: Address of the power monitor + + Return: + bool: True of False. + """ + + @abstractmethod + def measure(self, duration:int): + """ + Abstract method to measure current with specified measurement time. + + Args: + duration (int): The duration of the measurement in seconds. + """ + + @abstractmethod + def get_data(self, duration: int) -> list[float]: + """ + Measure current with specified measurement time. + + Args: + duration (int): The duration of the measurement in seconds. + + Returns: + List[float]: An array of measured current values in amperes. + """ \ No newline at end of file diff --git a/scripts/pm/power_monitor_stm32l562e_dk/PowerShield.py b/scripts/pm/power_monitor_stm32l562e_dk/PowerShield.py new file mode 100644 index 000000000000000..5bc7ca971e1292a --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/PowerShield.py @@ -0,0 +1,469 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski +# MIT License, see (https://opensource.org/license/mit/) + +import time +import queue +import threading +import csv +import re +import logging + +from scripts.pm.power_monitor_stm32l562e_dk.SerialHandler import * +from scripts.pm.power_monitor_stm32l562e_dk.PowerShieldConfig import * +from scripts.pm.power_monitor_stm32l562e_dk.PowerShieldData import * +from scripts.pm.power_monitor_stm32l562e_dk.UnityFunctions import * +from scripts.pm.power_monitor_stm32l562e_dk.AbstractPowerMonitor import * + +class PowerShield(PowerMonitor): + + def __init__(self): + """ + Initializes the PowerShield. + """ + self.handler = None + self.dataQueue = queue.Queue() + self.acqComplete = False + self.acqStart = False + self.target_voltage = None + self.target_temperature = None + self.acqTimeoutThread = None + self.power_shield_conf = PowerShieldConf() + self.power_shield_data = PowerShieldData() + + + + def init(self, power_device_path: str): + """ + Initializes the power monitor by setting up a serial handler + with the given device path and a baud rate of 3686400. + """ + self.handler = SerialHandler(power_device_path, 3686400) + self.connect() + # self.reset() + self.take_controll() + self.set_voltage(self.power_shield_conf.target_voltage) + self.set_format(self.power_shield_conf.data_format) + self.set_func_mode(self.power_shield_conf.function_mode) + self.target_temperature = self.get_temperature(self.power_shield_conf.temperature_unit) + self.target_voltage = self.get_voltage_level() + + def connect(self): + """Opens the connection using the SerialHandler.""" + self.handler.open() + + def disconnect(self): + """Closes the connection using the SerialHandler.""" + self.handler.close() + + def send_command(self, command: str, expected_ack: str = None, ack: bool = False) -> str: + """ + Sends a command to the device, retrieves the response, and optionally verifies the acknowledgment. + + :param command: The command to send. + :param expected_ack: The expected acknowledgment response (e.g., "ack htc"). + :return: The response received from the device. + """ + if not self.handler.is_open(): + logging.info(f"Error: Connection is not open. Cannot send command: {command}") + return "" + + logging.debug(f"Sending command: {command}") + self.handler.send_cmd(command) + if ack: + response = self.handler.receive_cmd() + logging.debug(f"Response: {response}") + + # Check if the response contains the expected acknowledgment + if expected_ack and expected_ack not in response: + logging.error(f"Error: Expected acknowledgment '{expected_ack}' not found in response.") + return "" + + return response + return 0 + + def test_communication(self): + """ + Sends a version command to the device. + """ + if not self.handler.is_open(): + logging.error(f"Error: Connection is not open. Cannot send version command: {e}") + return "" + command = 'version' + logging.info(f"Sending command: {command}") + self.handler.send_cmd(command) + response = self.handler.receive_cmd() + logging.info(f"Response: {response}") + return response + + def reset(self): + """ + Sends the reset command ('PSRST') to the power monitor device, closes the connection, + waits for the reset process to complete, and repeatedly attempts to reconnect until successful. + """ + command = "psrst" + + if not self.handler.is_open(): + logging.error("Error: Connection is not open. Cannot reset the device.") + return + + logging.info(f"Sending reset command: {command}") + self.handler.send_cmd(command) + + # Close the connection + self.handler.close() + self.handler.serial_connection = None + import time + time.sleep(5) + # Attempt to reopen the connection + try: + self.handler.open() + logging.info("Connection reopened after reset.") + except Exception as e: + logging.error(f"Failed to reopen connection after reset: {e}") + + def get_voltage_level(self) -> float: + """ + Sends the 'volt get' command and returns the voltage value as a float. + + :return: The voltage level as a float. + """ + command = 'volt get' + response = self.send_command(command, expected_ack="ack volt get", ack=True) + + # If response contains the expected acknowledgment, extract and return the voltage + if response: + parts = response.split() + try: + if len(parts) >= 5: + # Use regex to find a string that matches the pattern, e.g., "3292-03" + match = re.search(r'(\d+)-(\d+)', parts[5]) + if match: + # Extract the base (3292) and exponent (03) + base = match.group(1) + exponent = match.group(2) + + # Construct the scientific notation string (e.g., 3292e-03) + voltage_str = f"{base}e-{exponent}" + + # Convert the string into a float + voltage = float(voltage_str) + + # Return the voltage as a float + self.target_voltage = round(voltage, 3) + return self.target_voltage + except ValueError: + logging.error("Error: Could not convert temperature value.") + return None + else: + logging.warning("No response for voltage command.") + return None + + def get_temperature(self, unit: str = PowerShieldConf.TemperatureUnit.CELCIUS) -> float: + """ + Sends the temperature command and returns the temperature as a float. + + :param unit: The unit to request the temperature in, either 'degc' or 'degf'. + :return: The temperature value as a float. + """ + # Send the temp command with the unit + response = self.send_command(f"temp {unit}", expected_ack=f"ack temp {unit}", ack=True) + + # If response contains the expected acknowledgment, extract the temperature + if response: + try: + # Example response format: "PowerShield > ack temp degc 28.0" + parts = response.split() + if len(parts) >= 5 and parts[5].replace('.', '', 1).isdigit(): + self.target_temetarute = float(parts[5]) # Extract temperature and convert to float + logging.info(f"Temperature: {self.target_temetarute} {unit}") + return self.target_temetarute + else: + print("Error: Temperature value not found in response.") + return None + except ValueError: + logging.error("Error: Could not convert temperature value.") + return None + else: + logging.warning("No response for temp command.") + return None + + def take_controll(self) -> str: + """ + Sends the 'htc' command and verifies the acknowledgment. + + :return: The acknowledgment response or error message. + """ + return self.send_command("htc", expected_ack="ack htc", ack=True) + + def set_format(self, data_format: str = PowerShieldConf.DataFormat.ASCII_DEC): + """ + Sets the measurement data format. The format can be either ASCII (decimal) or Binary (hexadecimal). + + :param data_format: The data format to set. Options are 'ascii_dec' or 'bin_hexa'. + :return: None + """ + # Validate the input format + if data_format not in [PowerShieldConf.DataFormat.ASCII_DEC, PowerShieldConf.DataFormat.BIN_HEXA]: + print(f"Error: Invalid format '{data_format}'. Valid options are 'ascii_dec' or 'bin_hexa'.") + return + + command = f"format {data_format}" + response = self.send_command(command, expected_ack=f"ack format {data_format}", ack=True) + + # If response contains the expected acknowledgment, the format was set successfully + if response: + logging.info(f"Data format set to {data_format}.") + else: + logging.warning(f"Failed to set data format to {data_format}.") + + def set_frequency(self, frequency: str): + """ + Sets the sampling frequency for the measurement. The frequency can be any valid value from the list. + + :param frequency: The sampling frequency to set. Valid options include {100 k, 50 k, 20 k, 10 k, 5 k, 2 k, 1 k, 500, 200, 100, 50, 20, 10, 5, 2, 1}. + :return: None + """ + # Validate the input frequency + if frequency not in [PowerShieldConf.SamplingFrequency.FREQ_100K, PowerShieldConf.SamplingFrequency.FREQ_50K, PowerShieldConf.SamplingFrequency.FREQ_20K, + PowerShieldConf.SamplingFrequency.FREQ_10K, PowerShieldConf.SamplingFrequency.FREQ_5K, PowerShieldConf.SamplingFrequency.FREQ_2K, + PowerShieldConf.SamplingFrequency.FREQ_1K, PowerShieldConf.SamplingFrequency.FREQ_500, PowerShieldConf.SamplingFrequency.FREQ_200, + PowerShieldConf.SamplingFrequency.FREQ_100, PowerShieldConf.SamplingFrequency.FREQ_50, PowerShieldConf.SamplingFrequency.FREQ_20, + PowerShieldConf.SamplingFrequency.FREQ_10, PowerShieldConf.SamplingFrequency.FREQ_5, PowerShieldConf.SamplingFrequency.FREQ_2, + PowerShieldConf.SamplingFrequency.FREQ_1]: + logging.warning(f"Error: Invalid frequency '{frequency}'. Valid options are: 100k, 50k, 20k, 10k, 5k, 2k, 1k, 500, 200, 100, 50, 20, 10, 5, 2, 1.") + return + + command = f"freq {frequency}" + response = self.send_command(command, expected_ack=f"ack freq {frequency}", ack=True) + + # If response contains the expected acknowledgment, the frequency was set successfully + if response: + logging.info(f"Sampling frequency set to {frequency}.") + else: + logging.warning(f"Failed to set sampling frequency to {frequency}.") + + def set_acquire_time(self, acquire_time: str = '0'): + command = f"acqtime {acquire_time}" + response = self.send_command(command, expected_ack=f"ack acqtime {acquire_time}", ack=True) + + # If response contains the expected acknowledgment, the acquisition time was set successfully + if response: + logging.info(f"Acquisition time set to {acquire_time}.") + else: + logging.warning(f"Failed to set acquisition time to {acquire_time}.") + + def set_voltage(self, voltage: str): + command = f"volt {voltage}" + response = self.send_command(command, expected_ack=f"ack volt {voltage}", ack=True) + + # If response contains the expected acknowledgment, the voltage was set successfully + if response: + logging.info(f"Voltage set to {voltage}.") + else: + logging.warning(f"Failed to set voltage to {voltage}.") + + def set_func_mode(self, function_mode: str = PowerShieldConf.FunctionMode.HIGH): + """ + Sets the acquisition mode for current measurement. The function_mode can be either 'optim' or 'high'. + + - 'optim': Priority on current resolution (100 nA - 10 mA) with max sampling frequency at 100 kHz. + - 'high': High current (30 µA - 10 mA), high sampling frequency (50-100 kHz), high resolution. + + :param mode: The acquisition mode. Must be either 'optim' or 'high'. + :return: None + """ + # Validate the input format + if function_mode not in [PowerShieldConf.FunctionMode.HIGH, PowerShieldConf.FunctionMode.OPTIM]: + print(f"Error: Invalid format '{function_mode}'. Valid options are 'ascii_dec' or 'bin_hexa'.") + return + + command = f"funcmode {function_mode}" + response = self.send_command(command, expected_ack=f"ack funcmode {function_mode}", ack=True) + + # If response contains the expected acknowledgment, the format was set successfully + if response: + logging.info(f"Data format set to {function_mode}.") + else: + logging.warning(f"Failed to set data format to {function_mode}.") + + def acq_data(self): + """ + Continuously reads data from the serial port and puts it into a queue until acquisition is complete. + """ + logging.info("Started data acquisition...") + while True: + # Read the first byte + first_byte = self.handler.read_bytes(1) + if len(first_byte) < 1 or self.acqComplete: # Exit conditions + logging.info("Stopping data acquisition...") + return + + # Check if it's metadata + if first_byte == b'\xF0': # Metadata marker + second_byte = self.handler.read_bytes(1) + # Handle metadata types + metadata_type = second_byte[0] + self.handle_metadata(metadata_type) + else: + # Not metadata, treat as data + if self.acqStart: + second_byte = self.handler.read_bytes(1) + data = [] + data.append(first_byte) + if len(second_byte)<1 or self.acqComplete: + logging.info("Stopping data acquisition...") + return + data.append(second_byte) + amps = UnityFunctions.convert_to_amps(UnityFunctions.bytes_to_twobyte_values(data)) + self.dataQueue.put([amps]) + + def handle_metadata(self, metadata_type): + if metadata_type == 0xF1: + logging.info("Received Metadata: ASCII error message.") + # self.handle_metadata_error() + elif metadata_type == 0xF2: + logging.info("Received Metadata: ASCII information message.") + # self.handle_metadata_info() + elif metadata_type == 0xF3: + logging.info("Received Metadata: Timestamp message.") + self.handle_metadata_timestamp() + self.acqStart = True + elif metadata_type == 0xF4: + logging.info("Received Metadata: End of acquisition tag.") + self.handle_metadata_end() + summary = self.handle_summary() + elif metadata_type == 0xF5: + logging.info("Received Metadata: Overcurrent detected.") + # self.handle_metadata_overcurrent() + else: + logging.warning(f"Unknown Metadata Type: {metadata_type:#04x}") + + def handle_summary(self): + s = "" + while True: + # Read the first byte + x = self.handler.read_bytes(1) + if len(x) < 1 or x == 0xf0: + self.acqComplete=True + return s.replace("\0", "").strip().replace("\r", "").replace("\n\n\n", "\n") + s += str(x, encoding='ascii', errors='ignore') + + def handle_metadata_end(self): + """ + Handle metadata end of acquisition message. + """ + # Read the next 4 bytes + metadata_bytes = self.handler.read_bytes(2) + if len(metadata_bytes) < 2: + print("Incomplete end of acquisition metadata reveived.") + return + # Check for end tags (last 2 bytes) + end_tag_1 = metadata_bytes[0] + end_tag_2 = metadata_bytes[1] + if end_tag_1 != 0xFF or end_tag_2 != 0xFF: + logging.warning("Invalid metadata end tags received.") + return + + def handle_metadata_timestamp(self): + """ + Handle metadata timestamp message. Parses and displays the timestamp and buffer load. + """ + # Read the next 7 bytes (timestamp + buffer load + end tags) + metadata_bytes = self.handler.read_bytes(7) + if len(metadata_bytes) < 7: + logging.warning("Incomplete timestamp metadata received.") + return + + # Parse the timestamp (4 bytes, big-endian) + timestamp_ms = int.from_bytes(metadata_bytes[0:4], byteorder='big', signed=False) + # Parse the buffer Tx load value (1 byte) + buffer_load = metadata_bytes[4] + # Check for end tags (last 2 bytes) + end_tag_1 = metadata_bytes[5] + end_tag_2 = metadata_bytes[6] + if end_tag_1 != 0xFF or end_tag_2 != 0xFF: + logging.warning("Invalid metadata end tags received.") + return + + # Display parsed values + logging.info(f"Metadata Timestamp: {timestamp_ms} ms") + logging.info(f"Buffer Tx Load: {buffer_load}%") + + def start_measurement(self): + """ + Starts the measurement by sending the 'start' command. Once the measurement starts, + data can be received continuously until the 'stop' command is sent. + + :return: None + """ + command = "start" + self.acqComplete = False + self.send_command(command) + + raw_to_file_Thread = threading.Thread(target=self.raw_to_file , args=(self.power_shield_conf.output_file,)) + raw_to_file_Thread.start() + logging.info("Measurement started. Receiving data...") + self.acq_data() + raw_to_file_Thread.join() + + def raw_to_file(self, outputFilePath: str): + # Open a CSV file for writing + with open(outputFilePath, 'w', newline='') as outputFile: + writer = csv.writer(outputFile) + while True: + if self.dataQueue.empty() and bool(self.acqComplete): + outputFile.close() + return + if not self.dataQueue.empty(): + data = self.dataQueue.get() + writer.writerow(data) + outputFile.flush() + else: + time.sleep(0.1) + + + + def measure(self, time: int, freq: str=None, reset: bool = False): + self.power_shield_conf.acquisition_time = time + _time, self.power_shield_conf.acquisition_time_unit = UnityFunctions.convert_acq_time(time) + + if reset: + self.reset() + + self.take_controll() + self.set_format(self.power_shield_conf.data_format) + if freq != None: + self.set_frequency(freq) + else: + self.set_frequency(self.power_shield_conf.sampling_freqency) + self.set_acquire_time(UnityFunctions.convert_to_scientific_notation(time=_time, unit=self.power_shield_conf.acquisition_time_unit)) + self.start_measurement() + + # calculate the data + # self.get_measured_data(measure_unit) + + + def get_data(self, unit: str = PowerShieldConf.MeasureUnit.RAW_DATA): + if self.acqComplete: + # Open the CSV file + with open(self.power_shield_conf.output_file, mode='r', newline='') as file: + csv_reader = csv.reader(file) + for row in csv_reader: + self.power_shield_data.data.append(row[0]) + if unit == PowerShieldConf.MeasureUnit.CURRENT_RMS: + self.power_shield_data.current_RMS = UnityFunctions.calculate_rms(self.power_shield_data.data) + return self.power_shield_data.current_RMS + elif unit == PowerShieldConf.MeasureUnit.POWER: + _delta_time = self.power_shield_conf.acquisition_time + self.power_shield_data.power = 0 + for data in self.power_shield_data.data: + self.power_shield_data.power += float(float(data)*float(_delta_time)*float(self.target_voltage)) + return self.power_shield_data.power + elif unit == PowerShieldConf.MeasureUnit.RAW_DATA: + return self.power_shield_data.data + else: + logging.warning("Unknown unit of requested data") + else: + logging.info("Acquisition not complete.") + return None \ No newline at end of file diff --git a/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldConfig.py b/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldConfig.py new file mode 100644 index 000000000000000..50dcb141381a357 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldConfig.py @@ -0,0 +1,81 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski +# MIT License, see (https://opensource.org/license/mit/) + +class PowerShieldConf: + + class PowerMode: + """ + Class representing power mode + """ + AUTO = "auto" # Power-on when acquisition start + ON = "on" # Power-on manually + OFF = "off" # Power-off manually + + class MeasureUnit: + """" + Class representing measure units. + """ + VOLTAGE = "voltage" # Target Volatege + CURRENT_RMS = "current_rms" # Current RMS value + POWER = "power" # Total power consumption + RAW_DATA = "rawdata" # Get Raw Data (current probes) + + class TemperatureUnit: + """ + Class representing temperature units. + """ + CELCIUS = "degc" # Celsius temperature unit + FAHRENHEIT = "degf" # Fahrenheit temperature unit + + class FunctionMode: + """ + Class representing functional modes of a power monitor. + """ + OPTIM = "optim" # Optimized mode for lower power or efficiency + HIGH = "high" # High performance mode + + class DataFormat: + """ + Class representing different data formats for representation. + """ + ASCII_DEC = "ascii_dec" # ASCII encoded decimal format + BIN_HEXA = "bin_hexa" # Binary/hexadecimal format + + class SamplingFrequency: + """ + Class representing various sampling frequencies. + """ + FREQ_100K = "100k" # 100 kHz frequency + FREQ_50K = "50k" # 50 kHz frequency + FREQ_20K = "20k" # 20 kHz frequency + FREQ_10K = "10k" # 10 kHz frequency + FREQ_5K = "5k" # 5 kHz frequency + FREQ_2K = "2k" # 2 kHz frequency + FREQ_1K = "1k" # 1 kHz frequency + FREQ_500 = "500" # 500 Hz frequency + FREQ_200 = "200" # 200 Hz frequency + FREQ_100 = "100" # 100 Hz frequency + FREQ_50 = "50" # 50 Hz frequency + FREQ_20 = "20" # 20 Hz frequency + FREQ_10 = "10" # 10 Hz frequency + FREQ_5 = "5" # 5 Hz frequency + FREQ_2 = "2" # 2 Hz frequency + FREQ_1 = "1" # 1 Hz frequency + + def __init__(self, data_format: str = DataFormat.BIN_HEXA, + temperature_unit: str = TemperatureUnit.CELCIUS, + target_voltage: str = "3300m", + function_mode: str = FunctionMode.HIGH, + sampling_frequency: str = SamplingFrequency.FREQ_1K, + output_file: str = "rawData.csv", + ): + + self.output_file = output_file + self.sampling_freqency = sampling_frequency + self.data_format = data_format + self.function_mode = function_mode + self.target_voltage = target_voltage + self.temperature_unit = temperature_unit + self.acquisition_time = None + \ No newline at end of file diff --git a/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldData.py b/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldData.py new file mode 100644 index 000000000000000..b466ed733512670 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/PowerShieldData.py @@ -0,0 +1,10 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski +# MIT License, see (https://opensource.org/license/mit/) + +class PowerShieldData: + + def __init__(self): + self.data = [] + self.current_RMS = None + self.power = None \ No newline at end of file diff --git a/scripts/pm/power_monitor_stm32l562e_dk/SerialHandler.py b/scripts/pm/power_monitor_stm32l562e_dk/SerialHandler.py new file mode 100644 index 000000000000000..29e919cc57f4625 --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/SerialHandler.py @@ -0,0 +1,81 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski +# MIT License, see (https://opensource.org/license/mit/) + +import serial +import logging + +class SerialHandler: + def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0): + """ + Initializes the class for serial communication. + + :param port: The serial port name (e.g., 'COM1', '/dev/ttyUSB0'). + :param baudrate: The baud rate for the connection (default is 9600). + :param timeout: The timeout for read operations in seconds (default is 1.0). + """ + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.serial_connection = None + + def open(self): + """ + Opens the serial connection. + """ + if self.serial_connection is None: + try: + self.serial_connection = serial.Serial(self.port, self.baudrate, timeout=self.timeout) + logging.info("Connection to %s at %d baud opened successfully.", self.port, self.baudrate) + except serial.SerialException as e: + logging.error("Error opening serial port %s: %s", self.port, str(e)) + self.serial_connection = None + + def close(self): + """Closes the serial connection.""" + if self.serial_connection and self.serial_connection.is_open: + self.serial_connection.close() + logging.info("Serial connection closed.") + + def send_cmd(self, cmd: str): + """ + Sends a command to the serial device with a newline, and prints it. + + :param cmd: The command to be sent. + """ + if self.serial_connection and self.serial_connection.is_open: + try: + self.serial_connection.write((cmd + "\r\n").encode('ascii')) + except serial.SerialException as e: + logging.error(f"Error sending command: {e}") + + def read_bytes(self, count: int): + if self.serial_connection: + x = self.serial_connection.read(count) + return x + + + def receive_cmd(self) -> str: + """ + Reads data from the serial device until no more data is available. + + :return: The processed received data as a string. + """ + s = "" + if self.serial_connection and self.serial_connection.is_open: + while True: + x = self.serial_connection.read() + if len(x) < 1 or x == 0xf0: + return s.replace("\0", "").strip().replace("\r", "").replace("\n\n\n", "\n") + s += str(x, encoding='ascii', errors='ignore') + + def is_open(self) -> bool: + """Checks if the connection is open.""" + return self.serial_connection and self.serial_connection.is_open + + def __del__(self): + """Closes the connection if the object is deleted.""" + self.close() + # Shut down logging + logging.shutdown() + diff --git a/scripts/pm/power_monitor_stm32l562e_dk/UnityFunctions.py b/scripts/pm/power_monitor_stm32l562e_dk/UnityFunctions.py new file mode 100644 index 000000000000000..c155bbd998006cd --- /dev/null +++ b/scripts/pm/power_monitor_stm32l562e_dk/UnityFunctions.py @@ -0,0 +1,72 @@ +# Copyright: (c) 2024, Intel Corporation +# Author: Arkadiusz Cholewinski +# MIT License, see (https://opensource.org/license/mit/) + +import numpy as np + +class UnityFunctions: + + @staticmethod + def convert_acq_time(value): + """ + Converts an acquisition time value to a more readable format with units. + - Converts values to m (milli), u (micro), or leaves them as is for whole numbers. + + :param value: The numeric value to convert. + :return: A tuple with the value and the unit as separate elements. + """ + if value < 1e-3: + # If the value is smaller than 1 millisecond (10^-3), express in micro (u) + return f"{value * 1e6:.0f}", "us" + elif value < 1: + # If the value is smaller than 1 but larger than or equal to 1 milli (10^-3), express in milli (m) + return f"{value * 1e3:.0f}", "ms" + else: + # If the value is 1 or larger, express in seconds (s) + return f"{value:.0f}", "s" + + @staticmethod + def calculate_rms(data): + """ + Calculate the Root Mean Square (RMS) of a given data array. + + :param data: List or numpy array containing the data + :return: RMS value + """ + # Convert to a numpy array for easier mathematical operations + data_array = np.array(data, dtype=np.float64) # Convert to float64 to avoid type issues + + + # Calculate the RMS value + rms = np.sqrt(np.mean(np.square(data_array))) + return rms + + @staticmethod + def bytes_to_twobyte_values(data): + value = int.from_bytes(data[0],'big')<<8 | int.from_bytes(data[1], 'big') + return value + + @staticmethod + def convert_to_amps(value): + """ + Convert amps to watts + """ + amps = (value & 4095) * (16**(0 - (value >> 12))) + return amps + + @staticmethod + def convert_to_scientific_notation(time: int, unit: str) -> str: + """ + Converts time to scientific notation based on the provided unit. + :param time: The time value to convert. + :param unit: The unit of the time ('us', 'ms', or 's'). + :return: A string representing the time in scientific notation. + """ + if unit == 'us': # microseconds + return f"{time}-6" + elif unit == 'ms': # milliseconds + return f"{time}-3" + elif unit == 's': # seconds + return f"{time}" + else: + raise ValueError("Invalid unit. Use 'us', 'ms', or 's'.") \ No newline at end of file diff --git a/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.overlay b/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.overlay new file mode 100644 index 000000000000000..fdbb9aac2b3634e --- /dev/null +++ b/tests/subsys/pm/power_residency_time/boards/stm32l562e_dk.overlay @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +&stop0{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <1>; + min-residency-us = <500000>; +}; +&stop1{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <2>; + min-residency-us = <1000000>; +}; +&stop2{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <3>; + min-residency-us = <1500000>; +}; diff --git a/tests/subsys/pm/power_residency_time/prj.conf b/tests/subsys/pm/power_residency_time/prj.conf new file mode 100644 index 000000000000000..635c21a0cf51e6f --- /dev/null +++ b/tests/subsys/pm/power_residency_time/prj.conf @@ -0,0 +1,4 @@ +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +CONFIG_PM=y diff --git a/tests/subsys/pm/power_residency_time/pytest/conftest.py b/tests/subsys/pm/power_residency_time/pytest/conftest.py new file mode 100644 index 000000000000000..f20d2916a49f74b --- /dev/null +++ b/tests/subsys/pm/power_residency_time/pytest/conftest.py @@ -0,0 +1,20 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import time +from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield +from twister_harness import DeviceAdapter + +def pytest_addoption(parser): + parser.addoption("--powershield", + help="Path to PowerShield device") + +@pytest.fixture(scope='module') +def power_monitor_measure(dut: DeviceAdapter, request): + powershield_device = request.config.getoption("--powershield") + PM_Device = PowerShield() + PM_Device.init(power_device_path = powershield_device) + PM_Device.measure(time=8) + return PM_Device.get_data() + \ No newline at end of file diff --git a/tests/subsys/pm/power_residency_time/pytest/current_analyzer.py b/tests/subsys/pm/power_residency_time/pytest/current_analyzer.py new file mode 100644 index 000000000000000..abf7e1097f10e0f --- /dev/null +++ b/tests/subsys/pm/power_residency_time/pytest/current_analyzer.py @@ -0,0 +1,34 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np +from scipy import signal +import logging +def current_RMS(data): + data = data[600:] + data = [float(x) for x in data] + peaks, data = find_peaks(data) + peaks = peaks[0:6] + logging.info(f"Found peaks: {peaks}") + # Adding the start (0) and end (length of data) to the list of indices + indices = np.concatenate(([0], np.array(peaks), [len(data)])) + + # Splitting the data based on index ranges + split_data = [data[indices[i]+40:indices[i+1]-40] for i in range(len(indices)-1)] + + # Function to calculate RMS for a chunk of data + def calculate_rms(chunks): + rms = [] + for chunk in chunks: + rms_value = np.sqrt(np.mean(np.square(chunk))) + rms.append(rms_value) + return rms + + # Calculate RMS for each chunk + rms_values = calculate_rms(split_data) + + return rms_values + +def find_peaks(data): + peaks = signal.find_peaks(data, distance=40, height=0.008)[0] + return peaks, data \ No newline at end of file diff --git a/tests/subsys/pm/power_residency_time/pytest/test_residency_time.py b/tests/subsys/pm/power_residency_time/pytest/test_residency_time.py new file mode 100644 index 000000000000000..d1070abce6997d3 --- /dev/null +++ b/tests/subsys/pm/power_residency_time/pytest/test_residency_time.py @@ -0,0 +1,40 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import os, logging +from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield +from current_analyzer import current_RMS +from twister_harness import DeviceAdapter + +def test_residency_time(power_monitor_measure): + data = power_monitor_measure + # Empirical RMS values in mA with descriptive keys + rms_expected = { + "k_cpu_idle": 56.0, # State 0 + "state 0": 4.0, # State 0 + "state 1": 1.3, # State 1 + "state 2": 0.27, # State 2 + "active": 94 # Active + } + + value_in_amps = current_RMS(data) + # Convert to milliamps + value_in_miliamps = [value * 1e4 for value in value_in_amps] + logging.info(f"Current RMS [mA]: {value_in_miliamps}") + def print_state_rms(state_label, rms_expected_value, rms_measured_value): + tolerance = rms_expected_value / 10 + + logging.info(f"{state_label}:") + logging.info(f"Expected RMS: {rms_expected_value:.2f} mA") + logging.info(f"Tolerance: {tolerance:.2f} mA") + logging.info(f"Current RMS: {rms_measured_value:.2f} mA") + + assert (rms_expected_value - tolerance) < rms_measured_value < (rms_expected_value + tolerance), \ + f"RMS value out of tolerance for {state_label}" + + # Define the state labels + state_labels = ["k_cpu_idle", "state 0", "state 0", "state 1", "state 1", "state 2", "active"] + + # Loop through the states and print RMS values + for state_label, rms_value in zip(state_labels, value_in_miliamps): + print_state_rms(state_label, rms_expected[state_label], rms_value) diff --git a/tests/subsys/pm/power_residency_time/src/main.c b/tests/subsys/pm/power_residency_time/src/main.c new file mode 100644 index 000000000000000..40518a6a8a4be1e --- /dev/null +++ b/tests/subsys/pm/power_residency_time/src/main.c @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +#define DIFF_RESIDENCY_TIME_STATE_0 400 +#define DIFF_RESIDENCY_TIME_STATE_1 400 +#define DIFF_RESIDENCY_TIME_STATE_2 400 + +int main(void) +{ + + int stop0_min_residency_time = DT_PROP(DT_NODELABEL(stop0), min_residency_us); + int stop1_min_residency_time = DT_PROP(DT_NODELABEL(stop1), min_residency_us); + int stop2_min_residency_time = DT_PROP(DT_NODELABEL(stop2), min_residency_us); + + while (1) { + printk("\nSleep time < min_residency_time of state 0 \n"); + k_usleep(stop0_min_residency_time-DIFF_RESIDENCY_TIME_STATE_0); + printk("\nSleep time = min_residency_time of state 0 \n"); + k_usleep(stop0_min_residency_time); + printk("\nSleep time < min_residency_time of state 1 \n"); + k_usleep(stop1_min_residency_time-DIFF_RESIDENCY_TIME_STATE_1); + printk("\nSleep time = min_residency_time of state 1 \n"); + k_usleep(stop1_min_residency_time); + printk("\nSleep time < min_residency_time of state 2 \n"); + k_usleep(stop2_min_residency_time-DIFF_RESIDENCY_TIME_STATE_2); + printk("\nSleep time = min_residency_time of state 2 \n"); + k_usleep(stop2_min_residency_time); + printk("\nWakeup. \n"); + + /** + * Keeping alive loop + */ + while(1){ + arch_nop(); + } + } + return 0; +} diff --git a/tests/subsys/pm/power_residency_time/testcase.yaml b/tests/subsys/pm/power_residency_time/testcase.yaml new file mode 100644 index 000000000000000..7c7dddf6e1d2f3d --- /dev/null +++ b/tests/subsys/pm/power_residency_time/testcase.yaml @@ -0,0 +1,13 @@ +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +tests: + pm.power_residency_time: + platform_allow: + - stm32l562e_dk + tags: + - pm + harness: pytest + harness_config: + pytest_dut_scope: session + pytest_args: [--powershield=/STMicroelectronics_PowerShield__Virtual_ComPort] diff --git a/tests/subsys/pm/power_states/CMakeLists.txt b/tests/subsys/pm/power_states/CMakeLists.txt new file mode 100644 index 000000000000000..6f6656f5003ae7f --- /dev/null +++ b/tests/subsys/pm/power_states/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(power_states) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/subsys/pm/power_states/boards/stm32l562e_dk.overlay b/tests/subsys/pm/power_states/boards/stm32l562e_dk.overlay new file mode 100644 index 000000000000000..968e098b2e5b8e6 --- /dev/null +++ b/tests/subsys/pm/power_states/boards/stm32l562e_dk.overlay @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ +/ { + zephyr,user { + k_idle_state_min_residency_time = <400000>; + }; +}; + +&stop0{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <1>; + min-residency-us = <500000>; +}; +&stop1{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <2>; + min-residency-us = <1000000>; +}; +&stop2{ + compatible = "zephyr,power-state"; + power-state-name = "suspend-to-idle"; + substate-id = <3>; + min-residency-us = <1500000>; +}; diff --git a/tests/subsys/pm/power_states/prj.conf b/tests/subsys/pm/power_states/prj.conf new file mode 100644 index 000000000000000..e10b212c1fc0055 --- /dev/null +++ b/tests/subsys/pm/power_states/prj.conf @@ -0,0 +1,5 @@ +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +CONFIG_GPIO=y +CONFIG_PM=y diff --git a/tests/subsys/pm/power_states/pytest/conftest.py b/tests/subsys/pm/power_states/pytest/conftest.py new file mode 100644 index 000000000000000..c93c5b073f5d5bc --- /dev/null +++ b/tests/subsys/pm/power_states/pytest/conftest.py @@ -0,0 +1,20 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import time +from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield +from twister_harness import DeviceAdapter + +def pytest_addoption(parser): + parser.addoption("--powershield", + help="Path to PowerShield device") + +@pytest.fixture(scope='module') +def power_monitor_measure(dut: DeviceAdapter, request): + powershield_device = request.config.getoption("--powershield") + PM_Device = PowerShield() + PM_Device.init(power_device_path = powershield_device) + PM_Device.measure(time=6) + return PM_Device.get_data() + \ No newline at end of file diff --git a/tests/subsys/pm/power_states/pytest/current_analyzer.py b/tests/subsys/pm/power_states/pytest/current_analyzer.py new file mode 100644 index 000000000000000..e88aba375ff3d4e --- /dev/null +++ b/tests/subsys/pm/power_states/pytest/current_analyzer.py @@ -0,0 +1,33 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np +import logging +from scipy import signal + +def current_RMS(data): + data = data[600:] + data = [float(x) for x in data] + peaks, data = find_peaks(data) + peaks = peaks[0:4] + logging.info(f"Found peaks: {peaks}") + # Adding the start (0) and end (length of data) to the list of indices + indices = np.concatenate(([0], np.array(peaks), [len(data)])) + + # Splitting the data based on index ranges + split_data = [data[indices[i]+40:indices[i+1]-40] for i in range(len(indices)-1)] + + # Function to calculate RMS for a chunk of data + def calculate_rms(chunks): + rms = [] + for chunk in chunks: + rms_value = np.sqrt(np.mean(np.square(chunk))) + rms.append(rms_value) + return rms + # Calculate RMS for each chunk + rms_values = calculate_rms(split_data) + return rms_values + +def find_peaks(data): + peaks = signal.find_peaks(data, distance=40, height=0.008)[0] + return peaks, data diff --git a/tests/subsys/pm/power_states/pytest/test_power_states.py b/tests/subsys/pm/power_states/pytest/test_power_states.py new file mode 100644 index 000000000000000..aad2c532e793aa1 --- /dev/null +++ b/tests/subsys/pm/power_states/pytest/test_power_states.py @@ -0,0 +1,40 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import os, logging +from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield +from current_analyzer import current_RMS +from twister_harness import DeviceAdapter + +def test_power_states_without_pm_device(power_monitor_measure): + data = power_monitor_measure + # Empirical RMS values in mA with descriptive keys + rms_expected = { + "k_cpu_idle": 56.0, # State 0 + "state 0": 4.0, # State 0 + "state 1": 1.3, # State 1 + "state 2": 0.27, # State 2 + "active": 134 # Active + } + + value_in_amps = current_RMS(data) + # Convert to milliamps + value_in_miliamps = [value * 1e4 for value in value_in_amps] + logging.info(f"Current RMS [mA]: {value_in_miliamps}") + def print_state_rms(state_label, rms_expected_value, rms_measured_value): + tolerance = rms_expected_value / 10 + + logging.info(f"{state_label}:") + logging.info(f"Expected RMS: {rms_expected_value:.2f} mA") + logging.info(f"Tolerance: {tolerance:.2f} mA") + logging.info(f"Current RMS: {rms_measured_value:.2f} mA") + + assert (rms_expected_value - tolerance) < rms_measured_value < (rms_expected_value + tolerance), \ + f"RMS value out of tolerance for {state_label}" + + # Define the state labels + state_labels = ["k_cpu_idle", "state 0", "state 1", "state 2", "active"] + + # Loop through the states and print RMS values + for state_label, rms_value in zip(state_labels, value_in_miliamps): + print_state_rms(state_label, rms_expected[state_label], rms_value) diff --git a/tests/subsys/pm/power_states/src/main.c b/tests/subsys/pm/power_states/src/main.c new file mode 100644 index 000000000000000..489c89365136ca4 --- /dev/null +++ b/tests/subsys/pm/power_states/src/main.c @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include + +int main(void) +{ + int k_idle_min_residency_time = DT_PROP(DT_PATH(zephyr_user), k_idle_state_min_residency_time); + int stop0_min_residency_time = DT_PROP(DT_NODELABEL(stop0), min_residency_us); + int stop1_min_residency_time = DT_PROP(DT_NODELABEL(stop1), min_residency_us); + int stop2_min_residency_time = DT_PROP(DT_NODELABEL(stop2), min_residency_us); + + while (1) { + printk("\nGoing to k_cpu_idle.\n"); + k_usleep(k_idle_min_residency_time); + printk("\nWake Up.\n"); + printk("\nGoing to state 0.\n"); + k_usleep(stop0_min_residency_time); + printk("\nWake Up.\n"); + printk("\nGoing to state 1.\n"); + k_usleep(stop1_min_residency_time); + printk("\nWake Up.\n"); + printk("\nGoing to state 2.\n"); + k_usleep(stop2_min_residency_time); + printk("\nWake Up.\n"); + + /* Anti-sleeping loop */ + while (1) { + arch_nop(); + } + } + return 0; +} diff --git a/tests/subsys/pm/power_states/testcase.yaml b/tests/subsys/pm/power_states/testcase.yaml new file mode 100644 index 000000000000000..3f6d769826d0044 --- /dev/null +++ b/tests/subsys/pm/power_states/testcase.yaml @@ -0,0 +1,13 @@ +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +tests: + pm.power_states: + platform_allow: + - stm32l562e_dk + tags: + - pm + harness: pytest + harness_config: + pytest_dut_scope: session + pytest_args: [--powershield=/STMicroelectronics_PowerShield__Virtual_ComPort] diff --git a/tests/subsys/pm/power_wakeup_timer/CMakeLists.txt b/tests/subsys/pm/power_wakeup_timer/CMakeLists.txt new file mode 100644 index 000000000000000..e0b2f1d2c834d79 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(power_wakeup_timer) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.overlay b/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.overlay new file mode 100644 index 000000000000000..8ebef935ab2dd10 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/boards/stm32l562e_dk.overlay @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +&rtc { + compatible = "st,stm32-rtc"; + reg = < 0x40002800 0xc00 >; + interrupts = < 0x2 0x0 >; + clocks = < &rcc 0x58 0x400 >, < &rcc 0x2 0x16890 >; + prescaler = < 0x8000 >; + status = "okay"; + wakeup-source; +}; diff --git a/tests/subsys/pm/power_wakeup_timer/prj.conf b/tests/subsys/pm/power_wakeup_timer/prj.conf new file mode 100644 index 000000000000000..4ce440062c8f997 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/prj.conf @@ -0,0 +1,7 @@ +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +CONFIG_RTC=y +CONFIG_RTC_ALARM=y +CONFIG_RTC_STM32=y +CONFIG_PM=y diff --git a/tests/subsys/pm/power_wakeup_timer/pytest/conftest.py b/tests/subsys/pm/power_wakeup_timer/pytest/conftest.py new file mode 100644 index 000000000000000..e6b62bbcb93a36f --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/pytest/conftest.py @@ -0,0 +1,19 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import time +from scripts.pm.power_monitor_stm32l562e_dk.PowerShield import PowerShield +from twister_harness import DeviceAdapter + +def pytest_addoption(parser): + parser.addoption("--powershield", + help="Path to PowerShield device") + +@pytest.fixture(scope='module') +def power_monitor_measure(dut: DeviceAdapter, request): + powershield_device = request.config.getoption("--powershield") + PM_Device = PowerShield() + PM_Device.init(power_device_path = powershield_device) + PM_Device.measure(time=2) + return PM_Device.get_data() diff --git a/tests/subsys/pm/power_wakeup_timer/pytest/current_analyzer.py b/tests/subsys/pm/power_wakeup_timer/pytest/current_analyzer.py new file mode 100644 index 000000000000000..41cdca1d4abb615 --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/pytest/current_analyzer.py @@ -0,0 +1,35 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np +import logging +from scipy import signal + +def current_RMS(data): + data = data[600:] + data = [float(x) for x in data] + peaks, data = find_peaks(data) + peaks = peaks[0:1] + logging.info(f"Found peaks: {peaks}") + # Adding the start (0) and end (length of data) to the list of indices + indices = np.concatenate(([0], np.array(peaks), [len(data)])) + + # Splitting the data based on index ranges + split_data = [data[indices[i]+40:indices[i+1]-40] for i in range(len(indices)-1)] + + # Function to calculate RMS for a chunk of data + def calculate_rms(chunks): + rms = [] + for chunk in chunks: + rms_value = np.sqrt(np.mean(np.square(chunk))) + rms.append(rms_value) + return rms + + # Calculate RMS for each chunk + rms_values = calculate_rms(split_data) + + return rms_values + +def find_peaks(data): + peaks = signal.find_peaks(data, distance=40, height=0.008)[0] + return peaks, data \ No newline at end of file diff --git a/tests/subsys/pm/power_wakeup_timer/pytest/test_wakeup_timer.py b/tests/subsys/pm/power_wakeup_timer/pytest/test_wakeup_timer.py new file mode 100644 index 000000000000000..392a3aaf4527c8a --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/pytest/test_wakeup_timer.py @@ -0,0 +1,35 @@ +# Copyright (c) 2024 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +from current_analyzer import current_RMS + +def test_wakeup_timer(current_measurement_output): + data = current_measurement_output + # Empirical RMS values in mA with descriptive keys + rms_expected = { + "stop 2": 0.26, # State 0 + "active": 95, # State 1 + } + + value_in_amps = current_RMS(data) + value_in_amps = value_in_amps[0:2] + # Convert to milliamps + value_in_miliamps = [value * 1e4 for value in value_in_amps] + + def print_state_rms(state_label, rms_expected_value, rms_measured_value): + tolerance = rms_expected_value / 10 + + print(f"{state_label}:") + print(f"Expected RMS: {rms_expected_value:.2f} mA") + print(f"Tolerance: {tolerance:.2f} mA") + print(f"Current RMS: {rms_measured_value:.2f} mA") + + assert (rms_expected_value - tolerance) < rms_measured_value < (rms_expected_value + tolerance), \ + f"RMS value out of tolerance for {state_label}" + + # Define the state labels + state_labels = ["stop 2", "active"] + + # Loop through the states and print RMS values + for state_label, rms_value in zip(state_labels, value_in_miliamps): + print_state_rms(state_label, rms_expected[state_label], rms_value) diff --git a/tests/subsys/pm/power_wakeup_timer/src/main.c b/tests/subsys/pm/power_wakeup_timer/src/main.c new file mode 100644 index 000000000000000..e2ee9b2d4b07c9a --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/src/main.c @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +#define RTC_ALARM_FIELDS \ + (RTC_ALARM_TIME_MASK_SECOND | RTC_ALARM_TIME_MASK_MINUTE | RTC_ALARM_TIME_MASK_HOUR) + +static const struct device *rtc = DEVICE_DT_GET(DT_NODELABEL(rtc)); + +volatile k_tid_t my_tid; + +#define MY_STACK_SIZE 500 +#define MY_PRIORITY 5 + +struct k_thread my_thread_data; +K_THREAD_STACK_DEFINE(my_stack_area, MY_STACK_SIZE); + +void my_entry_point(void *, void *, void *) +{ + printk("Going sleep.\n"); + k_msleep(3000); +} + +/* Fri Jan 01 2021 13:29:50 GMT+0000 */ +static const struct rtc_time rtc_time = { + .tm_sec = 50, + .tm_min = 29, + .tm_hour = 13, + .tm_mday = 1, + .tm_mon = 0, + .tm_year = 121, + .tm_wday = 5, + .tm_yday = 1, + .tm_isdst = -1, + .tm_nsec = 0, +}; + +/* Fri Jan 01 2021 13:29:51 GMT+0000 */ +static const struct rtc_time alarm_time = { + .tm_sec = 51, + .tm_min = 29, + .tm_hour = 13, + .tm_mday = 1, + .tm_mon = 0, + .tm_year = 121, + .tm_wday = 5, + .tm_yday = 1, + .tm_isdst = -1, + .tm_nsec = 0, +}; + +static void wakeup_cb(const struct device *dev, uint16_t id, void *user_data) +{ + printk("Wake up by alarm.\n"); + k_thread_abort(my_tid); +} + +int main(void) +{ + int ret; + uint16_t mask = RTC_ALARM_FIELDS; + + ret = rtc_set_time(rtc, &rtc_time); + if (ret < 0) { + return 0; + } + + ret = rtc_alarm_set_time(rtc, 0, mask, &alarm_time); + if (ret < 0) { + return 0; + } + + ret = rtc_alarm_set_callback(rtc, 0, wakeup_cb, NULL); + if (ret < 0) { + return 0; + } + + printk("Created the thread.\n"); + my_tid = k_thread_create(&my_thread_data, my_stack_area, + K_THREAD_STACK_SIZEOF(my_stack_area), my_entry_point, NULL, NULL, + NULL, MY_PRIORITY, 0, K_NO_WAIT); + + k_thread_join(my_tid, K_FOREVER); + + while (1) { + arch_nop(); + } + return 0; +} diff --git a/tests/subsys/pm/power_wakeup_timer/testcase.yaml b/tests/subsys/pm/power_wakeup_timer/testcase.yaml new file mode 100644 index 000000000000000..538d0bdba38854d --- /dev/null +++ b/tests/subsys/pm/power_wakeup_timer/testcase.yaml @@ -0,0 +1,13 @@ +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +tests: + pm.power_wakeup_timer: + platform_allow: + - stm32l562e_dk + tags: + - pm + harness: pytest + harness_config: + pytest_dut_scope: session + pytest_args: [--powershield=/STMicroelectronics_PowerShield__Virtual_ComPort]