Skip to content

Commit

Permalink
tests(#362): add tests for each module (#368)
Browse files Browse the repository at this point in the history
- [ ] I have assigned ranges (e.g. `>=0.1, <0.2`) to all new dependencies (allows dependabot to keep dependency ranges wide for better compatibility)

Fixes #[issue_nr_here].

## Notes for reviewers
Reviewers can skip X, but should pay attention to Y.
  • Loading branch information
MartinBernstorff authored Feb 9, 2024
2 parents 3767ec2 + c63c6a0 commit eb99ff2
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,4 @@ poetry.lock
**/.tmp/

*nbconvert*
.testmondata
.testmondata*
18 changes: 17 additions & 1 deletion src/timeseriesflattener/testing/utils_for_testing.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Utilities for testing."""

from io import StringIO
from typing import Any, List, Optional, Sequence, Union

import numpy as np
import pandas as pd
import polars as pl
from pandas import DataFrame
from pandas.testing import assert_series_equal
from timeseriesflattener import TimeseriesFlattener
Expand All @@ -31,6 +31,22 @@ def convert_cols_with_matching_colnames_to_datetime(
return df


def str_to_pl_df(
string: str,
convert_timestamp_to_datetime: bool = True,
convert_np_nan_to_nan: bool = True,
convert_str_to_float: bool = False,
) -> pl.DataFrame:
df = str_to_df(
string=string,
convert_timestamp_to_datetime=convert_timestamp_to_datetime,
convert_np_nan_to_nan=convert_np_nan_to_nan,
convert_str_to_float=convert_str_to_float,
)

return pl.from_pandas(df)


def str_to_df(
string: str,
convert_timestamp_to_datetime: bool = True,
Expand Down
3 changes: 3 additions & 0 deletions src/timeseriesflattenerv2/feature_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class TimedeltaFrame:
timedelta_col_name: str = "time_from_prediction_to_value"
value_col_name: str = "value"

def get_timedeltas(self) -> Sequence[dt.datetime]:
return self.df.collect().get_column(self.timedelta_col_name).to_list()


ValueSpecification = Union[PredictorSpec, OutcomeSpec]

Expand Down
83 changes: 50 additions & 33 deletions src/timeseriesflattenerv2/flattener.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,47 +68,64 @@ def _horizontally_concatenate_dfs(dfs: Sequence[pl.LazyFrame]) -> pl.LazyFrame:
return pl.concat(dfs, how="horizontal")


@dataclass
class Flattener:
predictiontime_frame: PredictionTimeFrame
def _get_timedelta_frame(
predictiontime_frame: PredictionTimeFrame, value_frame: ValueFrame
) -> TimedeltaFrame:
# Join the prediction time dataframe
joined_frame = predictiontime_frame.to_lazyframe_with_uuid().join(
value_frame.df, on=predictiontime_frame.entity_id_col_name
)

def _get_timedelta_frame(self, spec: ValueSpecification) -> TimedeltaFrame:
# Join the prediction time dataframe
joined_frame = self.predictiontime_frame.to_lazyframe_with_uuid().join(
spec.value_frame.df, on=self.predictiontime_frame.entity_id_col_name
)
# Get timedelta
timedelta_frame = joined_frame.with_columns(
(
pl.col(value_frame.value_timestamp_col_name)
- pl.col(predictiontime_frame.timestamp_col_name)
).alias("time_from_prediction_to_value")
)

# Get timedelta
timedelta_frame = joined_frame.with_columns(
(
pl.col(spec.value_frame.value_timestamp_col_name)
- pl.col(self.predictiontime_frame.timestamp_col_name)
).alias("time_from_prediction_to_value")
)
return TimedeltaFrame(timedelta_frame)

return TimedeltaFrame(timedelta_frame)

def _process_spec(self, spec: ValueSpecification) -> ValueFrame:
lookdistances = _normalise_lookdistances(spec)
timedelta_frame = self._get_timedelta_frame(spec)
def _process_spec(
predictiontime_frame: PredictionTimeFrame, spec: ValueSpecification
) -> ValueFrame:
lookdistances = _normalise_lookdistances(spec)
timedelta_frame = _get_timedelta_frame(
predictiontime_frame=predictiontime_frame, value_frame=spec.value_frame
)

aggregated_value_frames = (
Iter(lookdistances)
.map(
lambda distance: _slice_and_aggregate_spec(
timedelta_frame=timedelta_frame, distance=distance, aggregators=spec.aggregators
)
aggregated_value_frames = (
Iter(lookdistances)
.map(
lambda distance: _slice_and_aggregate_spec(
timedelta_frame=timedelta_frame, distance=distance, aggregators=spec.aggregators
)
.flatten()
)
.flatten()
)

return ValueFrame(
df=_horizontally_concatenate_dfs([f.df for f in aggregated_value_frames.to_list()]),
value_type=spec.value_frame.value_type,
entity_id_col_name=spec.value_frame.entity_id_col_name,
value_timestamp_col_name=spec.value_frame.value_timestamp_col_name,
)

return ValueFrame(
df=_horizontally_concatenate_dfs([f.df for f in aggregated_value_frames.to_list()]),
value_type=spec.value_frame.value_type,
entity_id_col_name=spec.value_frame.entity_id_col_name,
value_timestamp_col_name=spec.value_frame.value_timestamp_col_name,
)

@dataclass
class Flattener:
predictiontime_frame: PredictionTimeFrame

def aggregate_timeseries(self, specs: Sequence[ValueSpecification]) -> AggregatedValueFrame:
dfs = Iter(specs).map(self._process_spec).map(lambda x: x.df).to_list()
dfs = (
Iter(specs)
.map(
lambda spec: _process_spec(
predictiontime_frame=self.predictiontime_frame, spec=spec
)
)
.map(lambda x: x.df)
.to_list()
)
return AggregatedValueFrame(df=_horizontally_concatenate_dfs(dfs))
40 changes: 30 additions & 10 deletions src/timeseriesflattenerv2/test_flattener.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from dataclasses import dataclass

import polars as pl
from timeseriesflattener.testing.utils_for_testing import str_to_df

from timeseriesflattenerv2.flattener import Flattener
from timeseriesflattener.testing.utils_for_testing import str_to_pl_df

from . import flattener
from .feature_specs import (
AggregatedValueFrame,
Aggregator,
Expand All @@ -30,26 +29,24 @@ def apply(self, sliced_frame: SlicedFrame, column_name: str) -> AggregatedValueF


def test_flattener():
pred_frame = str_to_df(
pred_frame = str_to_pl_df(
"""entity_id,pred_timestamp
1,2021-01-03"""
)

value_frame = str_to_df(
value_frame = str_to_pl_df(
"""entity_id,value,value_timestamp
1,1,2021-01-01
1,2,2021-01-02
1,3,2021-01-03"""
)

result = Flattener(
predictiontime_frame=PredictionTimeFrame(df=pl.from_pandas(pred_frame).lazy())
result = flattener.Flattener(
predictiontime_frame=PredictionTimeFrame(df=pred_frame.lazy())
).aggregate_timeseries(
specs=[
PredictorSpec(
value_frame=ValueFrame(
df=pl.from_pandas(value_frame).lazy(), value_type="test_value"
),
value_frame=ValueFrame(df=value_frame.lazy(), value_type="test_value"),
lookbehind_distances=[dt.timedelta(days=1)],
aggregators=[MeanAggregator()],
fallbacks=["NaN"],
Expand All @@ -58,3 +55,26 @@ def test_flattener():
)

assert isinstance(result, AggregatedValueFrame)


def test_get_timedelta_frame():
pred_frame = str_to_pl_df(
"""entity_id,pred_timestamp
1,2021-01-03"""
)

value_frame = str_to_pl_df(
"""entity_id,value,value_timestamp
1,1,2021-01-01
1,2,2021-01-02
1,3,2021-01-03"""
)

expected_timedeltas = [dt.timedelta(days=-2), dt.timedelta(days=-1), dt.timedelta(days=0)]

result = flattener._get_timedelta_frame(
predictiontime_frame=PredictionTimeFrame(df=pred_frame.lazy()),
value_frame=ValueFrame(df=value_frame.lazy(), value_type="test_value"),
)

assert result.get_timedeltas() == expected_timedeltas

0 comments on commit eb99ff2

Please sign in to comment.