From 6fe230bd5889e4a89215a08c4e0b9130c105f920 Mon Sep 17 00:00:00 2001 From: Lasse Date: Thu, 23 May 2024 09:38:20 +0200 Subject: [PATCH 1/7] fix: validate fallback is compatible with output of aggregator --- src/timeseriesflattener/aggregators.py | 29 +++++++++++++++- .../feature_specs/outcome.py | 10 ++++++ .../feature_specs/predictor.py | 5 +++ .../feature_specs/static.py | 2 +- src/timeseriesflattener/flattener.py | 6 ++-- src/timeseriesflattener/test_aggregators.py | 33 ++++++++++++++++++- 6 files changed, 79 insertions(+), 6 deletions(-) diff --git a/src/timeseriesflattener/aggregators.py b/src/timeseriesflattener/aggregators.py index bb8c24c4..72650054 100644 --- a/src/timeseriesflattener/aggregators.py +++ b/src/timeseriesflattener/aggregators.py @@ -7,8 +7,20 @@ from attr import dataclass +def _validate_compatible_fallback_type_for_aggregator( + aggregator: Aggregator, fallback: str | int | float | None +) -> None: + try: + pl.Series([aggregator.output_type]).fill_null(fallback) + except: + raise ValueError( + f"Invalid fallback value {fallback} for aggregator {aggregator.__class__.__name__}. Fallback of type {type(fallback)} is not compatible with the aggregator's output type of {type(aggregator.output_type)}." + ) + + class Aggregator(ABC): name: str + output_type: float | int | bool @abstractmethod def __call__(self, column_name: str) -> pl.Expr: @@ -22,6 +34,7 @@ class MinAggregator(Aggregator): """Returns the minimum value in the look window.""" name: str = "min" + output_type = float() def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).min().alias(self.new_col_name(column_name)) @@ -31,6 +44,7 @@ class MaxAggregator(Aggregator): """Returns the maximum value in the look window.""" name: str = "max" + output_type = float() def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).max().alias(self.new_col_name(column_name)) @@ -40,6 +54,7 @@ class MeanAggregator(Aggregator): """Returns the mean value in the look window.""" name: str = "mean" + output_type = float() def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).mean().alias(self.new_col_name(column_name)) @@ -49,6 +64,7 @@ class CountAggregator(Aggregator): """Returns the count of non-null values in the look window.""" name: str = "count" + output_type = int() def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).count().alias(self.new_col_name(column_name)) @@ -60,6 +76,7 @@ class EarliestAggregator(Aggregator): timestamp_col_name: str name: str = "earliest" + output_type = float() def __call__(self, column_name: str) -> pl.Expr: return ( @@ -76,6 +93,7 @@ class LatestAggregator(Aggregator): timestamp_col_name: str name: str = "latest" + output_type = float() def __call__(self, column_name: str) -> pl.Expr: return ( @@ -90,6 +108,7 @@ class SumAggregator(Aggregator): """Returns the sum of all values in the look window.""" name: str = "sum" + output_type = float() def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).sum().alias(self.new_col_name(column_name)) @@ -99,15 +118,17 @@ class VarianceAggregator(Aggregator): """Returns the variance of the values in the look window""" name: str = "var" + output_type = float() def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).var().alias(self.new_col_name(column_name)) class HasValuesAggregator(Aggregator): - """Examines whether any values exist in the column. If so, returns 1, else 0.""" + """Examines whether any values exist in the column. If so, returns True, else False.""" name: str = "bool" + output_type = bool() def __call__(self, column_name: str) -> pl.Expr: return ( @@ -126,6 +147,7 @@ class SlopeAggregator(Aggregator): timestamp_col_name: str name: str = "slope" + output_type = float() def __call__(self, column_name: str) -> pl.Expr: # Convert to days for the slope. Arbitrarily chosen to be the number of days since 1970-01-01. @@ -135,3 +157,8 @@ def __call__(self, column_name: str) -> pl.Expr: numerator = pl.corr(x_col, y_col, propagate_nans=True) * y_col.std() denominator = x_col.std() return (numerator / denominator).alias(self.new_col_name(column_name)) + + +if __name__ == "__main__": + agg = MeanAggregator() + print(agg("value")) diff --git a/src/timeseriesflattener/feature_specs/outcome.py b/src/timeseriesflattener/feature_specs/outcome.py index 37806aab..a19b4368 100644 --- a/src/timeseriesflattener/feature_specs/outcome.py +++ b/src/timeseriesflattener/feature_specs/outcome.py @@ -8,6 +8,7 @@ from .._frame_validator import _validate_col_name_columns_exist from .meta import ValueFrame, _lookdistance_to_normalised_lookperiod +from ..aggregators import _validate_compatible_fallback_type_for_aggregator if TYPE_CHECKING: from collections.abc import Sequence @@ -34,6 +35,10 @@ def __post_init__( for lookdistance in lookahead_distances ] _validate_col_name_columns_exist(obj=self) + for aggregator in self.aggregators: + _validate_compatible_fallback_type_for_aggregator( + aggregator=aggregator, fallback=self.fallback + ) @property def df(self) -> pl.LazyFrame: @@ -56,6 +61,11 @@ class BooleanOutcomeSpec: column_prefix: str = "outc" def __post_init__(self, init_frame: TimestampValueFrame): + for aggregator in self.aggregators: + _validate_compatible_fallback_type_for_aggregator( + aggregator=aggregator, fallback=self.fallback + ) + self.normalised_lookperiod = [ _lookdistance_to_normalised_lookperiod(lookdistance=lookdistance, direction="ahead") for lookdistance in self.lookahead_distances diff --git a/src/timeseriesflattener/feature_specs/predictor.py b/src/timeseriesflattener/feature_specs/predictor.py index c5cef219..ff41fcf8 100644 --- a/src/timeseriesflattener/feature_specs/predictor.py +++ b/src/timeseriesflattener/feature_specs/predictor.py @@ -6,6 +6,7 @@ from .._frame_validator import _validate_col_name_columns_exist from .meta import ValueFrame, _lookdistance_to_normalised_lookperiod +from ..aggregators import _validate_compatible_fallback_type_for_aggregator if TYPE_CHECKING: from collections.abc import Sequence @@ -39,6 +40,10 @@ def __post_init__( for lookdistance in lookbehind_distances ] _validate_col_name_columns_exist(obj=self) + for aggregator in self.aggregators: + _validate_compatible_fallback_type_for_aggregator( + aggregator=aggregator, fallback=self.fallback + ) @property def df(self) -> pl.LazyFrame: diff --git a/src/timeseriesflattener/feature_specs/static.py b/src/timeseriesflattener/feature_specs/static.py index 4b6bc725..38d16231 100644 --- a/src/timeseriesflattener/feature_specs/static.py +++ b/src/timeseriesflattener/feature_specs/static.py @@ -32,7 +32,7 @@ class StaticSpec: The value_frame must contain columns: entity_id_col_name: The name of the column containing the entity ids. Must be a string, and the column's values must be strings which are unique. - additional columns containing the values of the static feature. The name of the columns will be used for feature naming. + additional columns containing the values of the static feature. The names of the columns will be used for feature naming. """ value_frame: StaticFrame diff --git a/src/timeseriesflattener/flattener.py b/src/timeseriesflattener/flattener.py index 82887549..626507dc 100644 --- a/src/timeseriesflattener/flattener.py +++ b/src/timeseriesflattener/flattener.py @@ -110,11 +110,11 @@ def aggregate_timeseries( self, specs: Sequence[ValueSpecification], step_size: dt.timedelta | None = None ) -> AggregatedFrame: """Perform the aggregation/flattening. - + Args: specs: The specifications for the features to be created. - step_size: The step size for the aggregation. - If not None, will aggregate prediction times in chunks of step_size. + step_size: The step size for the aggregation. + If not None, will aggregate prediction times in chunks of step_size. Reduce if you encounter memory issues.""" if self.compute_lazily: print( diff --git a/src/timeseriesflattener/test_aggregators.py b/src/timeseriesflattener/test_aggregators.py index ee7ad1ea..299c753b 100644 --- a/src/timeseriesflattener/test_aggregators.py +++ b/src/timeseriesflattener/test_aggregators.py @@ -20,6 +20,7 @@ SlopeAggregator, SumAggregator, VarianceAggregator, + _validate_compatible_fallback_type_for_aggregator, ) from .spec_processors.temporal import _aggregate_masked_frame from .test_flattener import assert_frame_equal @@ -91,7 +92,10 @@ def expected_output(self) -> pl.DataFrame: aggregator=VarianceAggregator(), input_values=[1, 2], expected_output_values=[0.5] ), SingleVarAggregatorExample( - aggregator=HasValuesAggregator(), input_values=[1, 2], expected_output_values=[1], fallback_str="False" + aggregator=HasValuesAggregator(), + input_values=[1, 2], + expected_output_values=[1], + fallback_str="False", ), SingleVarAggregatorExample( aggregator=HasValuesAggregator(), @@ -159,3 +163,30 @@ def test_aggregator(example: AggregatorExampleType): ) assert_frame_equal(result.collect(), example.expected_output) + + +@pytest.mark.parametrize( + ("aggregator", "fallback", "valid_fallback"), + [ + (MeanAggregator(), 1, True), + (MeanAggregator(), np.nan, True), + (HasValuesAggregator(), np.nan, False), + (HasValuesAggregator(), False, True), + (HasValuesAggregator(), 1, False), + ], +) +def test_valid_fallback_for_aggregator( + aggregator: Aggregator, fallback: float | int | bool | None, valid_fallback: bool +): + if valid_fallback: + assert ( + _validate_compatible_fallback_type_for_aggregator( + aggregator=aggregator, fallback=fallback + ) + is None + ) + else: + with pytest.raises(ValueError): + _validate_compatible_fallback_type_for_aggregator( + aggregator=aggregator, fallback=fallback + ) From 95b0ef91859e626d81929e522ca913b622bc098f Mon Sep 17 00:00:00 2001 From: Lasse Date: Thu, 23 May 2024 09:40:40 +0200 Subject: [PATCH 2/7] tests: remove boolean from legacy test --- src/timeseriesflattener/feature_specs/test_from_legacy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/timeseriesflattener/feature_specs/test_from_legacy.py b/src/timeseriesflattener/feature_specs/test_from_legacy.py index 1316699b..6f34a0dd 100644 --- a/src/timeseriesflattener/feature_specs/test_from_legacy.py +++ b/src/timeseriesflattener/feature_specs/test_from_legacy.py @@ -41,7 +41,6 @@ def test_create_predictorspec_from_legacy(): summed, count, variance, - boolean, change_per_day, ], fallback=[0], From b96c0a4e01ed5cf47d391fa4e11df0088591c2f2 Mon Sep 17 00:00:00 2001 From: Lasse Date: Thu, 23 May 2024 09:42:15 +0200 Subject: [PATCH 3/7] misc --- src/timeseriesflattener/aggregators.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/timeseriesflattener/aggregators.py b/src/timeseriesflattener/aggregators.py index 72650054..5631c857 100644 --- a/src/timeseriesflattener/aggregators.py +++ b/src/timeseriesflattener/aggregators.py @@ -157,8 +157,3 @@ def __call__(self, column_name: str) -> pl.Expr: numerator = pl.corr(x_col, y_col, propagate_nans=True) * y_col.std() denominator = x_col.std() return (numerator / denominator).alias(self.new_col_name(column_name)) - - -if __name__ == "__main__": - agg = MeanAggregator() - print(agg("value")) From 582dd9753c18be14f06d76892e11452255f488f5 Mon Sep 17 00:00:00 2001 From: Lasse Date: Thu, 23 May 2024 09:52:46 +0200 Subject: [PATCH 4/7] tutorial: fix from legacy tutorial --- docs/tutorials/04_from_legacy.ipynb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/tutorials/04_from_legacy.ipynb b/docs/tutorials/04_from_legacy.ipynb index d3274ef3..ec1770a5 100644 --- a/docs/tutorials/04_from_legacy.ipynb +++ b/docs/tutorials/04_from_legacy.ipynb @@ -84,7 +84,7 @@ " summed,\n", " count,\n", " variance,\n", - " boolean,\n", + " # boolean, requires a fallback to be a bool\n", " change_per_day,\n", " ],\n", " fallback=[0],\n", @@ -144,7 +144,7 @@ " summed,\n", " count,\n", " variance,\n", - " boolean,\n", + " # boolean, requires a fallback to be a bool\n", " change_per_day,\n", " ],\n", " fallback=[0],\n", From 76f3252e5ad6ac85736b93977fd69b576737cff3 Mon Sep 17 00:00:00 2001 From: Lasse Date: Thu, 23 May 2024 09:54:50 +0200 Subject: [PATCH 5/7] chore: typo --- docs/tutorials/04_from_legacy.ipynb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/tutorials/04_from_legacy.ipynb b/docs/tutorials/04_from_legacy.ipynb index ec1770a5..55a6a743 100644 --- a/docs/tutorials/04_from_legacy.ipynb +++ b/docs/tutorials/04_from_legacy.ipynb @@ -84,7 +84,7 @@ " summed,\n", " count,\n", " variance,\n", - " # boolean, requires a fallback to be a bool\n", + " # boolean, requires the fallback to be a bool\n", " change_per_day,\n", " ],\n", " fallback=[0],\n", @@ -144,7 +144,7 @@ " summed,\n", " count,\n", " variance,\n", - " # boolean, requires a fallback to be a bool\n", + " # boolean, requires the fallback to be a bool\n", " change_per_day,\n", " ],\n", " fallback=[0],\n", From 46c670727c25ae8e5552a412e867f2e3791dc473 Mon Sep 17 00:00:00 2001 From: Lasse Date: Thu, 23 May 2024 10:40:53 +0200 Subject: [PATCH 6/7] fix: misc --- docs/tutorials/04_from_legacy.ipynb | 1 - src/timeseriesflattener/feature_specs/outcome.py | 9 ++++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/tutorials/04_from_legacy.ipynb b/docs/tutorials/04_from_legacy.ipynb index 55a6a743..b5af93a5 100644 --- a/docs/tutorials/04_from_legacy.ipynb +++ b/docs/tutorials/04_from_legacy.ipynb @@ -41,7 +41,6 @@ "\n", "import pandas as pd\n", "from timeseriesflattener.v1.aggregation_fns import (\n", - " boolean,\n", " change_per_day,\n", " count,\n", " earliest,\n", diff --git a/src/timeseriesflattener/feature_specs/outcome.py b/src/timeseriesflattener/feature_specs/outcome.py index a19b4368..70d48499 100644 --- a/src/timeseriesflattener/feature_specs/outcome.py +++ b/src/timeseriesflattener/feature_specs/outcome.py @@ -61,17 +61,16 @@ class BooleanOutcomeSpec: column_prefix: str = "outc" def __post_init__(self, init_frame: TimestampValueFrame): - for aggregator in self.aggregators: - _validate_compatible_fallback_type_for_aggregator( - aggregator=aggregator, fallback=self.fallback - ) - self.normalised_lookperiod = [ _lookdistance_to_normalised_lookperiod(lookdistance=lookdistance, direction="ahead") for lookdistance in self.lookahead_distances ] self.fallback = 0 + for aggregator in self.aggregators: + _validate_compatible_fallback_type_for_aggregator( + aggregator=aggregator, fallback=self.fallback + ) self.value_frame = ValueFrame( init_df=init_frame.df.with_columns((pl.lit(1)).alias(self.output_name)), From e3276650ad1751d4c325256b91856c94b1567b99 Mon Sep 17 00:00:00 2001 From: Lasse Date: Thu, 23 May 2024 12:51:00 +0200 Subject: [PATCH 7/7] fix: change to type instead of instantiated type --- src/timeseriesflattener/aggregators.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/timeseriesflattener/aggregators.py b/src/timeseriesflattener/aggregators.py index 5631c857..834368a8 100644 --- a/src/timeseriesflattener/aggregators.py +++ b/src/timeseriesflattener/aggregators.py @@ -11,7 +11,7 @@ def _validate_compatible_fallback_type_for_aggregator( aggregator: Aggregator, fallback: str | int | float | None ) -> None: try: - pl.Series([aggregator.output_type]).fill_null(fallback) + pl.Series([aggregator.output_type()]).fill_null(fallback) except: raise ValueError( f"Invalid fallback value {fallback} for aggregator {aggregator.__class__.__name__}. Fallback of type {type(fallback)} is not compatible with the aggregator's output type of {type(aggregator.output_type)}." @@ -20,7 +20,7 @@ def _validate_compatible_fallback_type_for_aggregator( class Aggregator(ABC): name: str - output_type: float | int | bool + output_type: type[float | int | bool] @abstractmethod def __call__(self, column_name: str) -> pl.Expr: @@ -34,7 +34,7 @@ class MinAggregator(Aggregator): """Returns the minimum value in the look window.""" name: str = "min" - output_type = float() + output_type = float def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).min().alias(self.new_col_name(column_name)) @@ -44,7 +44,7 @@ class MaxAggregator(Aggregator): """Returns the maximum value in the look window.""" name: str = "max" - output_type = float() + output_type = float def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).max().alias(self.new_col_name(column_name)) @@ -54,7 +54,7 @@ class MeanAggregator(Aggregator): """Returns the mean value in the look window.""" name: str = "mean" - output_type = float() + output_type = float def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).mean().alias(self.new_col_name(column_name)) @@ -64,7 +64,7 @@ class CountAggregator(Aggregator): """Returns the count of non-null values in the look window.""" name: str = "count" - output_type = int() + output_type = int def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).count().alias(self.new_col_name(column_name)) @@ -76,7 +76,7 @@ class EarliestAggregator(Aggregator): timestamp_col_name: str name: str = "earliest" - output_type = float() + output_type = float def __call__(self, column_name: str) -> pl.Expr: return ( @@ -93,7 +93,7 @@ class LatestAggregator(Aggregator): timestamp_col_name: str name: str = "latest" - output_type = float() + output_type = float def __call__(self, column_name: str) -> pl.Expr: return ( @@ -108,7 +108,7 @@ class SumAggregator(Aggregator): """Returns the sum of all values in the look window.""" name: str = "sum" - output_type = float() + output_type = float def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).sum().alias(self.new_col_name(column_name)) @@ -118,7 +118,7 @@ class VarianceAggregator(Aggregator): """Returns the variance of the values in the look window""" name: str = "var" - output_type = float() + output_type = float def __call__(self, column_name: str) -> pl.Expr: return pl.col(column_name).var().alias(self.new_col_name(column_name)) @@ -128,7 +128,7 @@ class HasValuesAggregator(Aggregator): """Examines whether any values exist in the column. If so, returns True, else False.""" name: str = "bool" - output_type = bool() + output_type = bool def __call__(self, column_name: str) -> pl.Expr: return ( @@ -147,7 +147,7 @@ class SlopeAggregator(Aggregator): timestamp_col_name: str name: str = "slope" - output_type = float() + output_type = float def __call__(self, column_name: str) -> pl.Expr: # Convert to days for the slope. Arbitrarily chosen to be the number of days since 1970-01-01.