diff --git a/.gitignore b/.gitignore index fdeafadb..de1c9be9 100644 --- a/.gitignore +++ b/.gitignore @@ -168,4 +168,4 @@ poetry.lock **/.tmp/ *nbconvert* -.testmondata +.testmondata* diff --git a/src/timeseriesflattener/testing/utils_for_testing.py b/src/timeseriesflattener/testing/utils_for_testing.py index 77888b1f..0c54102c 100644 --- a/src/timeseriesflattener/testing/utils_for_testing.py +++ b/src/timeseriesflattener/testing/utils_for_testing.py @@ -1,10 +1,10 @@ """Utilities for testing.""" - from io import StringIO from typing import Any, List, Optional, Sequence, Union import numpy as np import pandas as pd +import polars as pl from pandas import DataFrame from pandas.testing import assert_series_equal from timeseriesflattener import TimeseriesFlattener @@ -31,6 +31,22 @@ def convert_cols_with_matching_colnames_to_datetime( return df +def str_to_pl_df( + string: str, + convert_timestamp_to_datetime: bool = True, + convert_np_nan_to_nan: bool = True, + convert_str_to_float: bool = False, +) -> pl.DataFrame: + df = str_to_df( + string=string, + convert_timestamp_to_datetime=convert_timestamp_to_datetime, + convert_np_nan_to_nan=convert_np_nan_to_nan, + convert_str_to_float=convert_str_to_float, + ) + + return pl.from_pandas(df) + + def str_to_df( string: str, convert_timestamp_to_datetime: bool = True, diff --git a/src/timeseriesflattenerv2/feature_specs.py b/src/timeseriesflattenerv2/feature_specs.py index df93da01..abb4ec24 100644 --- a/src/timeseriesflattenerv2/feature_specs.py +++ b/src/timeseriesflattenerv2/feature_specs.py @@ -88,6 +88,9 @@ class TimedeltaFrame: timedelta_col_name: str = "time_from_prediction_to_value" value_col_name: str = "value" + def get_timedeltas(self) -> Sequence[dt.datetime]: + return self.df.collect().get_column(self.timedelta_col_name).to_list() + ValueSpecification = Union[PredictorSpec, OutcomeSpec] diff --git a/src/timeseriesflattenerv2/flattener.py b/src/timeseriesflattenerv2/flattener.py index 4c503d25..63124dbf 100644 --- a/src/timeseriesflattenerv2/flattener.py +++ b/src/timeseriesflattenerv2/flattener.py @@ -68,47 +68,64 @@ def _horizontally_concatenate_dfs(dfs: Sequence[pl.LazyFrame]) -> pl.LazyFrame: return pl.concat(dfs, how="horizontal") -@dataclass -class Flattener: - predictiontime_frame: PredictionTimeFrame +def _get_timedelta_frame( + predictiontime_frame: PredictionTimeFrame, value_frame: ValueFrame +) -> TimedeltaFrame: + # Join the prediction time dataframe + joined_frame = predictiontime_frame.to_lazyframe_with_uuid().join( + value_frame.df, on=predictiontime_frame.entity_id_col_name + ) - def _get_timedelta_frame(self, spec: ValueSpecification) -> TimedeltaFrame: - # Join the prediction time dataframe - joined_frame = self.predictiontime_frame.to_lazyframe_with_uuid().join( - spec.value_frame.df, on=self.predictiontime_frame.entity_id_col_name - ) + # Get timedelta + timedelta_frame = joined_frame.with_columns( + ( + pl.col(value_frame.value_timestamp_col_name) + - pl.col(predictiontime_frame.timestamp_col_name) + ).alias("time_from_prediction_to_value") + ) - # Get timedelta - timedelta_frame = joined_frame.with_columns( - ( - pl.col(spec.value_frame.value_timestamp_col_name) - - pl.col(self.predictiontime_frame.timestamp_col_name) - ).alias("time_from_prediction_to_value") - ) + return TimedeltaFrame(timedelta_frame) - return TimedeltaFrame(timedelta_frame) - def _process_spec(self, spec: ValueSpecification) -> ValueFrame: - lookdistances = _normalise_lookdistances(spec) - timedelta_frame = self._get_timedelta_frame(spec) +def _process_spec( + predictiontime_frame: PredictionTimeFrame, spec: ValueSpecification +) -> ValueFrame: + lookdistances = _normalise_lookdistances(spec) + timedelta_frame = _get_timedelta_frame( + predictiontime_frame=predictiontime_frame, value_frame=spec.value_frame + ) - aggregated_value_frames = ( - Iter(lookdistances) - .map( - lambda distance: _slice_and_aggregate_spec( - timedelta_frame=timedelta_frame, distance=distance, aggregators=spec.aggregators - ) + aggregated_value_frames = ( + Iter(lookdistances) + .map( + lambda distance: _slice_and_aggregate_spec( + timedelta_frame=timedelta_frame, distance=distance, aggregators=spec.aggregators ) - .flatten() ) + .flatten() + ) + + return ValueFrame( + df=_horizontally_concatenate_dfs([f.df for f in aggregated_value_frames.to_list()]), + value_type=spec.value_frame.value_type, + entity_id_col_name=spec.value_frame.entity_id_col_name, + value_timestamp_col_name=spec.value_frame.value_timestamp_col_name, + ) - return ValueFrame( - df=_horizontally_concatenate_dfs([f.df for f in aggregated_value_frames.to_list()]), - value_type=spec.value_frame.value_type, - entity_id_col_name=spec.value_frame.entity_id_col_name, - value_timestamp_col_name=spec.value_frame.value_timestamp_col_name, - ) + +@dataclass +class Flattener: + predictiontime_frame: PredictionTimeFrame def aggregate_timeseries(self, specs: Sequence[ValueSpecification]) -> AggregatedValueFrame: - dfs = Iter(specs).map(self._process_spec).map(lambda x: x.df).to_list() + dfs = ( + Iter(specs) + .map( + lambda spec: _process_spec( + predictiontime_frame=self.predictiontime_frame, spec=spec + ) + ) + .map(lambda x: x.df) + .to_list() + ) return AggregatedValueFrame(df=_horizontally_concatenate_dfs(dfs)) diff --git a/src/timeseriesflattenerv2/test_flattener.py b/src/timeseriesflattenerv2/test_flattener.py index ae8b2b3e..9689c66c 100644 --- a/src/timeseriesflattenerv2/test_flattener.py +++ b/src/timeseriesflattenerv2/test_flattener.py @@ -2,10 +2,9 @@ from dataclasses import dataclass import polars as pl -from timeseriesflattener.testing.utils_for_testing import str_to_df - -from timeseriesflattenerv2.flattener import Flattener +from timeseriesflattener.testing.utils_for_testing import str_to_pl_df +from . import flattener from .feature_specs import ( AggregatedValueFrame, Aggregator, @@ -30,26 +29,24 @@ def apply(self, sliced_frame: SlicedFrame, column_name: str) -> AggregatedValueF def test_flattener(): - pred_frame = str_to_df( + pred_frame = str_to_pl_df( """entity_id,pred_timestamp 1,2021-01-03""" ) - value_frame = str_to_df( + value_frame = str_to_pl_df( """entity_id,value,value_timestamp 1,1,2021-01-01 1,2,2021-01-02 1,3,2021-01-03""" ) - result = Flattener( - predictiontime_frame=PredictionTimeFrame(df=pl.from_pandas(pred_frame).lazy()) + result = flattener.Flattener( + predictiontime_frame=PredictionTimeFrame(df=pred_frame.lazy()) ).aggregate_timeseries( specs=[ PredictorSpec( - value_frame=ValueFrame( - df=pl.from_pandas(value_frame).lazy(), value_type="test_value" - ), + value_frame=ValueFrame(df=value_frame.lazy(), value_type="test_value"), lookbehind_distances=[dt.timedelta(days=1)], aggregators=[MeanAggregator()], fallbacks=["NaN"], @@ -58,3 +55,26 @@ def test_flattener(): ) assert isinstance(result, AggregatedValueFrame) + + +def test_get_timedelta_frame(): + pred_frame = str_to_pl_df( + """entity_id,pred_timestamp + 1,2021-01-03""" + ) + + value_frame = str_to_pl_df( + """entity_id,value,value_timestamp + 1,1,2021-01-01 + 1,2,2021-01-02 + 1,3,2021-01-03""" + ) + + expected_timedeltas = [dt.timedelta(days=-2), dt.timedelta(days=-1), dt.timedelta(days=0)] + + result = flattener._get_timedelta_frame( + predictiontime_frame=PredictionTimeFrame(df=pred_frame.lazy()), + value_frame=ValueFrame(df=value_frame.lazy(), value_type="test_value"), + ) + + assert result.get_timedeltas() == expected_timedeltas