Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration branch for FastAPI and other PRs #177

Closed
wants to merge 25 commits into from
Closed
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9e3c756
Migrate from Flask to FastAPI
Lasall Oct 10, 2024
8458684
Move API doc from README to pydantic model classes (swagger)
Lasall Oct 12, 2024
2277d1f
Add package API documentation generation
b0661 Oct 10, 2024
a1cef1e
Enable Google style source commenting and documentation generation.
b0661 Oct 11, 2024
882aae6
Check Google style source commenting.
b0661 Oct 11, 2024
906c652
Add settings and extension recommendations
Oct 11, 2024
f6e8ada
Add pyright section for pylance extension
Oct 11, 2024
6d09e44
Enable pytest and debugging
Oct 11, 2024
53905cb
Prettier files
Oct 11, 2024
685e855
Bump sphinx from 8.0.2 to 8.1.3
dependabot[bot] Oct 14, 2024
7ab9314
Merge branch 'pr_int_fastapi-rebase-pr-163' into pr_int_fastapi
b0661 Oct 19, 2024
1f50eb5
test_load_corrector
NormannK Oct 10, 2024
e08e2b5
Streamline Dockerfile, remove unused deps
Lasall Oct 7, 2024
0923584
Ruff format
Oct 10, 2024
7763b1f
rebase fixes
NormannK Oct 11, 2024
2952cda
ruff changes
NormannK Oct 11, 2024
676b4d8
Cleanup: Fix violin chart labels, remove debug code
Lasall Oct 11, 2024
1f0ee74
Merge branch 'pr_int_fastapi-rebase-pr-172' into pr_int_fastapi
b0661 Oct 19, 2024
4186937
Add documentation to class_pv_forecast.py.
b0661 Oct 11, 2024
1fbdd18
Add CacheFileStore, to_datetime and get_logger utilities.
b0661 Oct 15, 2024
d38b24a
Improve testability of PVForecast
b0661 Oct 12, 2024
7ad1eea
Add test for PVForecast and newly extracted utility modules.
b0661 Oct 12, 2024
ba52724
Merge branch 'pr_int_fastapi-rebase-pr-174' into pr_int_fastapi
b0661 Oct 19, 2024
1f5abf3
Tool to integrate EOS PRs in an integration branch.
b0661 Oct 19, 2024
14a1be6
Merge branch 'pr_int_fastapi-rebase-pr-176' into pr_int_fastapi
b0661 Oct 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@ help:
@echo " docker-build - Rebuild docker image"
@echo " docs - Generate HTML documentation (in build/docs/html/)."
@echo " read-docs - Read HTML documentation in your browser."
@echo " read-docs - Read HTML documentation in your browser."
@echo " run - Run flask_server in the virtual environment (needs install before)."
@echo " dist - Create distribution (in dist/)."
@echo " clean - Remove generated documentation, distribution and virtual environment."
4 changes: 1 addition & 3 deletions src/akkudoktoreos/class_haushaltsgeraet.py
Original file line number Diff line number Diff line change
@@ -16,9 +16,7 @@ class HaushaltsgeraetParameters(BaseModel):
class Haushaltsgeraet:
def __init__(self, parameters: HaushaltsgeraetParameters, hours=24):
self.hours = hours # Total duration for which the planning is done
self.verbrauch_wh = (
parameters.verbrauch_wh # Total energy consumption of the device in kWh
)
self.verbrauch_wh = parameters.verbrauch_wh # Total energy consumption of the device in kWh
self.dauer_h = parameters.dauer_h # Duration of use in hours
self.lastkurve = np.zeros(self.hours) # Initialize the load curve with zeros

244 changes: 163 additions & 81 deletions src/akkudoktoreos/class_load_corrector.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,73 @@
from typing import Optional

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error, r2_score


class LoadPredictionAdjuster:
def __init__(self, measured_data, predicted_data, load_forecast):
self.measured_data = measured_data
self.predicted_data = predicted_data
self.load_forecast = load_forecast
self.merged_data = self._merge_data()
self.train_data = None
self.test_data = None
self.weekday_diff = None
self.weekend_diff = None

def _remove_outliers(self, data, threshold=2):
# Calculate the Z-Score of the 'Last' data
def __init__(
self,
measured_data: pd.DataFrame,
predicted_data: pd.DataFrame,
load_forecast: object,
) -> None:
"""
Initialize the LoadPredictionAdjuster with measured, predicted data, and a load forecast object.
"""
# Store the input dataframes
self.measured_data: pd.DataFrame = measured_data
self.predicted_data: pd.DataFrame = predicted_data
self.load_forecast: object = load_forecast

# Merge measured and predicted data
self.merged_data: pd.DataFrame = self._merge_data()

# Initialize placeholders for train/test data and differences
self.train_data: Optional[pd.DataFrame] = None
self.test_data: Optional[pd.DataFrame] = None
self.weekday_diff: Optional[pd.Series] = None
self.weekend_diff: Optional[pd.Series] = None

def _remove_outliers(self, data: pd.DataFrame, threshold: float = 2.0) -> pd.DataFrame:
"""
Remove outliers based on the Z-score from the 'Last' column.

Args:
data (pd.DataFrame): The input data with 'Last' column.
threshold (float): The Z-score threshold for detecting outliers.

Returns:
pd.DataFrame: Filtered data without outliers.
"""
# Calculate Z-score for 'Last' column and filter based on threshold
data["Z-Score"] = np.abs((data["Last"] - data["Last"].mean()) / data["Last"].std())
# Filter the data based on the threshold
filtered_data = data[data["Z-Score"] < threshold]
return filtered_data.drop(columns=["Z-Score"])
return filtered_data.drop(columns=["Z-Score"]) # Drop Z-score column after filtering

def _merge_data(self) -> pd.DataFrame:
"""
Merge the measured and predicted data on the 'time' column.

Returns:
pd.DataFrame: The merged dataset.
"""
# Convert time columns to datetime in both datasets

def _merge_data(self):
# Convert the time column in both DataFrames to datetime
def _merge_data(self) -> pd.DataFrame:
"""
Merge the measured and predicted data on the 'time' column.

Returns:
pd.DataFrame: The merged dataset.
"""
# Convert time columns to datetime in both datasets
self.predicted_data["time"] = pd.to_datetime(self.predicted_data["time"])
self.measured_data["time"] = pd.to_datetime(self.measured_data["time"])

# Ensure both time columns have the same timezone
# Localize time to UTC and then convert to Berlin time
# Localize time to UTC and then convert to Berlin time
if self.measured_data["time"].dt.tz is None:
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize("UTC")

@@ -36,19 +76,33 @@ def _merge_data(self):
)
self.measured_data["time"] = self.measured_data["time"].dt.tz_convert("Europe/Berlin")

# Optionally: Remove timezone information if only working locally
# Remove timezone information (optional for local work)
# Remove timezone information (optional for local work)
self.predicted_data["time"] = self.predicted_data["time"].dt.tz_localize(None)
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize(None)

# Now you can perform the merge
# Merge the measured and predicted dataframes on 'time'
merged_data = pd.merge(self.measured_data, self.predicted_data, on="time", how="inner")
print(merged_data)

# Extract useful columns such as 'Hour' and 'DayOfWeek'
merged_data["Hour"] = merged_data["time"].dt.hour
merged_data["DayOfWeek"] = merged_data["time"].dt.dayofweek
return merged_data

def calculate_weighted_mean(self, train_period_weeks=9, test_period_weeks=1):
def calculate_weighted_mean(
self, train_period_weeks: int = 9, test_period_weeks: int = 1
) -> None:
"""
Calculate the weighted mean difference between actual and predicted values for training and testing periods.

Args:
train_period_weeks (int): Number of weeks to use for training data.
test_period_weeks (int): Number of weeks to use for testing data.
"""
# Remove outliers from the merged data
self.merged_data = self._remove_outliers(self.merged_data)

# Define training and testing periods based on weeks
train_end_date = self.merged_data["time"].max() - pd.Timedelta(weeks=test_period_weeks)
train_start_date = train_end_date - pd.Timedelta(weeks=train_period_weeks)

@@ -57,49 +111,90 @@ def calculate_weighted_mean(self, train_period_weeks=9, test_period_weeks=1):
test_start_date + pd.Timedelta(weeks=test_period_weeks) - pd.Timedelta(hours=1)
)

# Split merged data into training and testing datasets
# Split merged data into training and testing datasets
self.train_data = self.merged_data[
(self.merged_data["time"] >= train_start_date)
& (self.merged_data["time"] <= train_end_date)
]

self.test_data = self.merged_data[
(self.merged_data["time"] >= test_start_date)
& (self.merged_data["time"] <= test_end_date)
]

# Calculate the difference between actual ('Last') and predicted ('Last Pred')
self.train_data["Difference"] = self.train_data["Last"] - self.train_data["Last Pred"]

# Separate training data into weekdays and weekends
# Separate training data into weekdays and weekends
weekdays_train_data = self.train_data[self.train_data["DayOfWeek"] < 5]
weekends_train_data = self.train_data[self.train_data["DayOfWeek"] >= 5]

# Calculate weighted mean differences for both weekdays and weekends
# Calculate weighted mean differences for both weekdays and weekends
self.weekday_diff = (
weekdays_train_data.groupby("Hour").apply(self._weighted_mean_diff).dropna()
)
self.weekend_diff = (
weekends_train_data.groupby("Hour").apply(self._weighted_mean_diff).dropna()
)

def _weighted_mean_diff(self, data):
def _weighted_mean_diff(self, data: pd.DataFrame) -> float:
"""
Compute the weighted mean difference between actual and predicted values.

Args:
data (pd.DataFrame): Data for a specific hour.

Returns:
float: Weighted mean difference for that hour.
"""
# Weigh recent data more by using days difference from the last date in the training set
train_end_date = self.train_data["time"].max()
weights = 1 / (train_end_date - data["time"]).dt.days.replace(0, np.nan)
weighted_mean = (data["Difference"] * weights).sum() / weights.sum()
return weighted_mean

def adjust_predictions(self):
def adjust_predictions(self) -> None:
"""
Adjust predictions for both training and test data using the calculated weighted differences.
"""
# Apply adjustments to both training and testing data
self.train_data["Adjusted Pred"] = self.train_data.apply(self._adjust_row, axis=1)
self.test_data["Adjusted Pred"] = self.test_data.apply(self._adjust_row, axis=1)

def _adjust_row(self, row):
def _adjust_row(self, row: pd.Series) -> float:
"""
Adjust a single row's prediction based on the hour and day of the week.

Args:
row (pd.Series): A single row of data.

Returns:
float: Adjusted prediction.
"""
# Adjust predictions based on whether it's a weekday or weekend
if row["DayOfWeek"] < 5:
return row["Last Pred"] + self.weekday_diff.get(row["Hour"], 0)
else:
return row["Last Pred"] + self.weekend_diff.get(row["Hour"], 0)

def plot_results(self):
def plot_results(self) -> None:
"""
Plot the actual, predicted, and adjusted predicted values for both training and testing data.
"""
# Plot results for training and testing data
self._plot_data(self.train_data, "Training")
self._plot_data(self.test_data, "Testing")

def _plot_data(self, data, data_type):
def _plot_data(self, data: pd.DataFrame, data_type: str) -> None:
"""
Helper function to plot the data.

Args:
data (pd.DataFrame): Data to plot (training or testing).
data_type (str): Label to identify whether it's training or testing data.
"""
plt.figure(figsize=(14, 7))
plt.plot(data["time"], data["Last"], label=f"Actual Last - {data_type}", color="blue")
plt.plot(
@@ -123,76 +218,63 @@ def _plot_data(self, data, data_type):
plt.grid(True)
plt.show()

def evaluate_model(self):
def evaluate_model(self) -> None:
"""
Evaluate the model performance using Mean Squared Error and R-squared metrics.
"""
# Calculate Mean Squared Error and R-squared for the adjusted predictions
mse = mean_squared_error(self.test_data["Last"], self.test_data["Adjusted Pred"])
r2 = r2_score(self.test_data["Last"], self.test_data["Adjusted Pred"])
print(f"Mean Squared Error: {mse}")
print(f"R-squared: {r2}")

def predict_next_hours(self, hours_ahead):
def predict_next_hours(self, hours_ahead: int) -> pd.DataFrame:
"""
Predict load for the next given number of hours.

Args:
hours_ahead (int): Number of hours to predict.

Returns:
pd.DataFrame: DataFrame with future predicted and adjusted load.
"""
# Get the latest time in the merged data
last_date = self.merged_data["time"].max()

# Generate future timestamps for the next 'hours_ahead'
future_dates = [last_date + pd.Timedelta(hours=i) for i in range(1, hours_ahead + 1)]
future_df = pd.DataFrame({"time": future_dates})

# Extract hour and day of the week for the future predictions

# Extract hour and day of the week for the future predictions
future_df["Hour"] = future_df["time"].dt.hour
future_df["DayOfWeek"] = future_df["time"].dt.dayofweek

# Predict the load and apply adjustments for future predictions

# Predict the load and apply adjustments for future predictions
future_df["Last Pred"] = future_df["time"].apply(self._forecast_next_hours)
future_df["Adjusted Pred"] = future_df.apply(self._adjust_row, axis=1)

return future_df

def _forecast_next_hours(self, timestamp):
def _forecast_next_hours(self, timestamp: pd.Timestamp) -> float:
"""
Helper function to forecast the load for the next hours using the load_forecast object.

Args:
timestamp (pd.Timestamp): The time for which to predict the load.

Returns:
float: Predicted load for the given time.
"""
# Use the load_forecast object to get the hourly forecast for the given timestamp
date_str = timestamp.strftime("%Y-%m-%d")
hour = timestamp.hour
daily_forecast = self.load_forecast.get_daily_stats(date_str)
return daily_forecast[0][hour] if hour < len(daily_forecast[0]) else np.nan


# if __name__ == '__main__':
# estimator = LastEstimator()
# start_date = "2024-06-01"
# end_date = "2024-08-01"
# last_df = estimator.get_last(start_date, end_date)

# selected_columns = last_df[['timestamp', 'Last']]
# selected_columns['time'] = pd.to_datetime(selected_columns['timestamp']).dt.floor('H')
# selected_columns['Last'] = pd.to_numeric(selected_columns['Last'], errors='coerce')

# # Drop rows with NaN values
# cleaned_data = selected_columns.dropna()

# print(cleaned_data)
# # Create an instance of LoadForecast
# lf = LoadForecast(filepath=r'.\load_profiles.npz', year_energy=6000*1000)
# Return forecast for the specific hour, or NaN if hour is out of range

# # Initialize an empty DataFrame to hold the forecast data
# forecast_list = []

# # Loop through each day in the date range
# for single_date in pd.date_range(cleaned_data['time'].min().date(), cleaned_data['time'].max().date()):
# date_str = single_date.strftime('%Y-%m-%d')
# daily_forecast = lf.get_daily_stats(date_str)
# mean_values = daily_forecast[0] # Extract the mean values
# hours = [single_date + pd.Timedelta(hours=i) for i in range(24)]
# daily_forecast_df = pd.DataFrame({'time': hours, 'Last Pred': mean_values})
# forecast_list.append(daily_forecast_df)

# # Concatenate all daily forecasts into a single DataFrame
# forecast_df = pd.concat(forecast_list, ignore_index=True)

# # Create an instance of the LoadPredictionAdjuster class
# adjuster = LoadPredictionAdjuster(cleaned_data, forecast_df, lf)

# # Calculate the weighted mean differences
# adjuster.calculate_weighted_mean()

# # Adjust the predictions
# adjuster.adjust_predictions()

# # Plot the results
# adjuster.plot_results()

# # Evaluate the model
# adjuster.evaluate_model()

# # Predict the next x hours
# future_predictions = adjuster.predict_next_hours(48)
# print(future_predictions)
# Return forecast for the specific hour, or NaN if hour is out of range
return daily_forecast[0][hour] if hour < len(daily_forecast[0]) else np.nan
2 changes: 1 addition & 1 deletion src/akkudoktoreos/visualize.py
Original file line number Diff line number Diff line change
@@ -270,7 +270,7 @@ def visualisiere_ergebnisse(

# First violin plot for losses
axs[0].violinplot(data[0], positions=[1], showmeans=True, showmedians=True)
axs[1].set(title="Losses", xticks=[1], xticklabels=["Losses"])
axs[0].set(title="Losses", xticks=[1], xticklabels=["Losses"])

# Second violin plot for balance
axs[1].violinplot(data[1], positions=[1], showmeans=True, showmedians=True)
187 changes: 187 additions & 0 deletions tests/test_load_corrector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
from unittest.mock import MagicMock

import numpy as np
import pandas as pd
import pytest

from akkudoktoreos.class_load_corrector import LoadPredictionAdjuster


@pytest.fixture
def setup_data() -> tuple[pd.DataFrame, pd.DataFrame, MagicMock]:
"""
Fixture to create mock measured_data, predicted_data, and a mock load_forecast.
These mocks are returned as a tuple for testing purposes.
"""
# Create mock measured_data (real measured load data)
measured_data = pd.DataFrame(
{
"time": pd.date_range(start="2023-10-01", periods=24, freq="H"),
"Last": np.random.rand(24) * 100, # Random measured load values
}
)

# Create mock predicted_data (forecasted load data)
predicted_data = pd.DataFrame(
{
"time": pd.date_range(start="2023-10-01", periods=24, freq="H"),
"Last Pred": np.random.rand(24) * 100, # Random predicted load values
}
)

# Mock the load_forecast object
load_forecast = MagicMock()
load_forecast.get_daily_stats = MagicMock(
return_value=([np.random.rand(24) * 100],) # Simulate daily statistics
)

return measured_data, predicted_data, load_forecast


def test_merge_data(setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock]) -> None:
"""
Test the _merge_data method to ensure it merges measured and predicted data correctly.
"""
measured_data, predicted_data, load_forecast = setup_data
adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast)

# Call the method to merge data
merged_data = adjuster._merge_data()

# Assert the merged data is a DataFrame
assert isinstance(merged_data, pd.DataFrame), "Merged data should be a DataFrame"
# Assert certain columns are present in the merged data
assert "Hour" in merged_data.columns, "Merged data should contain 'Hour' column"
assert "DayOfWeek" in merged_data.columns, "Merged data should contain 'DayOfWeek' column"
assert len(merged_data) > 0, "Merged data should not be empty"


def test_remove_outliers(
setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock],
) -> None:
"""
Test the _remove_outliers method to ensure it filters outliers from the data.
"""
measured_data, predicted_data, load_forecast = setup_data
adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast)

# Create data with explicit outliers for testing
normal_values = np.random.rand(98) * 100 # Normal load values
outliers = np.array([500, -500]) # Explicit extreme outlier values
data_with_outliers = np.concatenate([normal_values, outliers])

# Simulate the merged_data with outliers to test the _remove_outliers method
adjuster.merged_data = pd.DataFrame({"Last": data_with_outliers})

# Apply the _remove_outliers method with default threshold
filtered_data = adjuster._remove_outliers(adjuster.merged_data)

# Assert that the output is a DataFrame and that outliers were removed
assert isinstance(filtered_data, pd.DataFrame), "Filtered data should be a DataFrame"
assert len(filtered_data) < len(
adjuster.merged_data
), "Filtered data should remove some outliers"
assert len(filtered_data) == 98, "Filtered data should have removed exactly 2 outliers"


def test_calculate_weighted_mean(
setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock],
) -> None:
"""
Test the calculate_weighted_mean method to ensure weighted means for weekday and weekend differences are calculated correctly.
"""
measured_data, predicted_data, load_forecast = setup_data

# Create time range and new data for 14 days (2 weeks)
time_range = pd.date_range(start="2023-09-25", periods=24 * 14, freq="H")

# Create new measured_data and predicted_data matching the time range
measured_data = pd.DataFrame(
{
"time": time_range,
"Last": np.random.rand(len(time_range)) * 100, # Random 'Last' values
}
)

predicted_data = pd.DataFrame(
{
"time": time_range,
"Last Pred": np.random.rand(len(time_range)) * 100, # Random 'Last Pred' values
}
)

adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast)
adjuster.merged_data = adjuster._merge_data()

# Calculate the weighted mean over training and testing periods
adjuster.calculate_weighted_mean(train_period_weeks=1, test_period_weeks=1)

# Assert that weekday and weekend differences are calculated and non-empty
assert adjuster.weekday_diff is not None, "Weekday differences should be calculated"
assert adjuster.weekend_diff is not None, "Weekend differences should be calculated"
assert len(adjuster.weekday_diff) > 0, "Weekday differences should not be empty"
assert len(adjuster.weekend_diff) > 0, "Weekend differences should not be empty"


def test_adjust_predictions(
setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock],
) -> None:
"""
Test the adjust_predictions method to ensure it correctly adds the 'Adjusted Pred' column to train and test data.
"""
measured_data, predicted_data, load_forecast = setup_data
adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast)
adjuster.merged_data = adjuster._merge_data()

# Calculate the weighted mean and adjust predictions
adjuster.calculate_weighted_mean(train_period_weeks=1, test_period_weeks=1)
adjuster.adjust_predictions()

# Assert that the 'Adjusted Pred' column is present in both train and test data
assert (
"Adjusted Pred" in adjuster.train_data.columns
), "Train data should have 'Adjusted Pred' column"
assert (
"Adjusted Pred" in adjuster.test_data.columns
), "Test data should have 'Adjusted Pred' column"


def test_evaluate_model(
setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock],
capsys: pytest.CaptureFixture,
) -> None:
"""
Test the evaluate_model method to ensure it prints evaluation metrics (MSE and R-squared).
"""
measured_data, predicted_data, load_forecast = setup_data
adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast)
adjuster.merged_data = adjuster._merge_data()

# Calculate weighted mean, adjust predictions, and evaluate the model
adjuster.calculate_weighted_mean(train_period_weeks=1, test_period_weeks=1)
adjuster.adjust_predictions()
adjuster.evaluate_model()

# Capture printed output and assert that evaluation metrics are printed
captured = capsys.readouterr()
assert "Mean Squared Error" in captured.out, "Evaluation should print Mean Squared Error"
assert "R-squared" in captured.out, "Evaluation should print R-squared"


def test_predict_next_hours(
setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock],
) -> None:
"""
Test the predict_next_hours method to ensure future predictions are made and contain 'Adjusted Pred'.
"""
measured_data, predicted_data, load_forecast = setup_data
adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast)
adjuster.merged_data = adjuster._merge_data()

# Calculate weighted mean and predict the next 5 hours
adjuster.calculate_weighted_mean(train_period_weeks=1, test_period_weeks=1)
future_df = adjuster.predict_next_hours(5)

# Assert that the correct number of future hours are predicted and that 'Adjusted Pred' is present
assert len(future_df) == 5, "Should predict for 5 future hours"
assert "Adjusted Pred" in future_df.columns, "Future data should have 'Adjusted Pred' column"