From 15afc6e7c8c57431820595838b6e813f99104631 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Sun, 4 Apr 2021 09:25:28 +0300 Subject: [PATCH] Aggregations emit NaN values where a value cannot be calculated due to lack of data. (#187) * Aggregations emit NaN values where a value cannot be calculated due to lack of data. * max_value * Convert Inf and -Inf to NaN only in calculate_features. --- integration/test_aggregation_integration.py | 7 +++--- storey/aggregation_utils.py | 6 ++--- storey/table.py | 28 +++++++++------------ tests/test_aggregate_by_key.py | 12 ++++----- 4 files changed, 25 insertions(+), 28 deletions(-) diff --git a/integration/test_aggregation_integration.py b/integration/test_aggregation_integration.py index fbe76e1f..29b29b3c 100644 --- a/integration/test_aggregation_integration.py +++ b/integration/test_aggregation_integration.py @@ -123,7 +123,7 @@ def test_query_virtual_aggregations_flow(setup_teardown_test): controller.terminate() actual = controller.await_termination() expected_results = [ - {'col1': 0, 'number_of_stuff_avg_24h': 0.0, 'number_of_stuff_stddev_24h': 0, 'number_of_stuff_stdvar_24h': 0}, + {'col1': 0, 'number_of_stuff_avg_24h': 0.0, 'number_of_stuff_stddev_24h': math.nan, 'number_of_stuff_stdvar_24h': math.nan}, {'col1': 1, 'number_of_stuff_avg_24h': 0.5, 'number_of_stuff_stddev_24h': math.sqrt(0.5), 'number_of_stuff_stdvar_24h': 0.5}, {'col1': 2, 'number_of_stuff_avg_24h': 1.0, 'number_of_stuff_stddev_24h': 1.0, 'number_of_stuff_stdvar_24h': 1.0}, {'col1': 3, 'number_of_stuff_avg_24h': 1.5, 'number_of_stuff_stddev_24h': math.sqrt(1.6666666666666667), @@ -1139,7 +1139,8 @@ def test_aggregate_multiple_keys(setup_teardown_test): "first_name": ["moshe", "yosi", "yosi"], "last_name": ["cohen", "levi", "levi"], "some_data": [1, 2, 3], - "time": [current_time - pd.Timedelta(minutes=25), current_time - pd.Timedelta(minutes=30), current_time - pd.Timedelta(minutes=35)] + "time": [current_time - pd.Timedelta(minutes=25), current_time - pd.Timedelta(minutes=30), + current_time - pd.Timedelta(minutes=35)] } ) @@ -1176,4 +1177,4 @@ def test_aggregate_multiple_keys(setup_teardown_test): ] assert actual == expected_results, \ - f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}' \ No newline at end of file + f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}' diff --git a/storey/aggregation_utils.py b/storey/aggregation_utils.py index f56d05a6..3a9adc3c 100644 --- a/storey/aggregation_utils.py +++ b/storey/aggregation_utils.py @@ -50,14 +50,14 @@ def _avg(args): count = args[0] sum = args[1] if count == 0: - return 0 + return math.nan return sum / count def _stddev(args): count = args[0] if count == 0 or count == 1: - return 0 + return math.nan sum = args[1] sqr = args[2] @@ -67,7 +67,7 @@ def _stddev(args): def _stdvar(args): count = args[0] if count == 0 or count == 1: - return 0 + return math.nan sum = args[1] sqr = args[2] return (count * sqr - sum * sum) / (count * (count - 1)) diff --git a/storey/table.py b/storey/table.py index 1bc86c29..4926f0db 100644 --- a/storey/table.py +++ b/storey/table.py @@ -1,11 +1,13 @@ -from typing import List import copy +import math from datetime import datetime -from .drivers import Driver -from .utils import _split_path, get_hashed_key -from .dtypes import FieldAggregator, SlidingWindows, FixedWindows +from typing import List + from .aggregation_utils import is_raw_aggregate, get_virtual_aggregation_func, get_implied_aggregates, get_all_raw_aggregates, \ get_all_raw_aggregates_with_hidden +from .drivers import Driver +from .dtypes import FieldAggregator, SlidingWindows, FixedWindows +from .utils import _split_path, get_hashed_key class Table: @@ -667,9 +669,10 @@ def get_features(self, timestamp): class AggregationValue: - default_value = None + default_value = math.nan def __init__(self, max_value=None, set_data=None): + self.value = self.default_value self.time = datetime.min self._max_value = max_value self._set_value = self._set_value_with_max if max_value else self._set_value_without_max @@ -723,7 +726,6 @@ class MinValue(AggregationValue): default_value = float('inf') def __init__(self, max_value=None, set_data=None): - self.value = max_value or self.default_value super().__init__(max_value, set_data) def aggregate(self, time, value): @@ -746,7 +748,6 @@ class MaxValue(AggregationValue): default_value = float('-inf') def __init__(self, max_value=None, set_data=None): - self.value = self.default_value super().__init__(max_value, set_data) def aggregate(self, time, value): @@ -762,7 +763,6 @@ class SumValue(AggregationValue): default_value = 0 def __init__(self, max_value=None, set_data=None): - self.value = self.default_value super().__init__(max_value, set_data) def aggregate(self, time, value): @@ -774,7 +774,6 @@ class CountValue(AggregationValue): default_value = 0 def __init__(self, max_value=None, set_data=None): - self.value = self.default_value super().__init__(max_value, set_data) def aggregate(self, time, value): @@ -786,7 +785,6 @@ class SqrValue(AggregationValue): default_value = 0 def __init__(self, max_value=None, set_data=None): - self.value = self.default_value super().__init__(max_value, set_data) def aggregate(self, time, value): @@ -795,10 +793,8 @@ def aggregate(self, time, value): class LastValue(AggregationValue): name = 'last' - default_value = None def __init__(self, max_value=None, set_data=None): - self.value = self.default_value super().__init__(max_value, set_data) def aggregate(self, time, value): @@ -812,10 +808,8 @@ def get_update_expression(self, old): class FirstValue(AggregationValue): name = 'first' - default_value = None def __init__(self, max_value=None, set_data=None): - self.value = self.default_value super().__init__(max_value, set_data) self._first_time = datetime.max @@ -1076,8 +1070,10 @@ def get_features(self, timestamp): # In case our pre aggregates already have the answer for aggregation_name in self._explicit_raw_aggregations: for (window_millis, window_str) in self.explicit_windows.windows: - result[f'{self.name}_{aggregation_name}_{window_str}'] = \ - self._current_aggregate_values[(aggregation_name, window_millis)].value + value = self._current_aggregate_values[(aggregation_name, window_millis)].value + if value == math.inf or value == -math.inf: + value = math.nan + result[f'{self.name}_{aggregation_name}_{window_str}'] = value self.augment_virtual_features(result) return result diff --git a/tests/test_aggregate_by_key.py b/tests/test_aggregate_by_key.py index fdda4bca..fc321f81 100644 --- a/tests/test_aggregate_by_key.py +++ b/tests/test_aggregate_by_key.py @@ -85,9 +85,9 @@ def test_sliding_window_sparse_data(): 'number_of_stuff1_max_1h': 0, 'number_of_stuff1_max_24h': 0, 'number_of_stuff1_max_2h': 0, 'number_of_stuff1_min_1h': 0, 'number_of_stuff1_min_24h': 0, 'number_of_stuff1_min_2h': 0, 'number_of_stuff1_sum_1h': 0, 'number_of_stuff1_sum_24h': 0, 'number_of_stuff1_sum_2h': 0, - 'number_of_stuff2_avg_1h': 0, 'number_of_stuff2_avg_24h': 0, 'number_of_stuff2_avg_2h': 0, - 'number_of_stuff2_max_1h': -math.inf, 'number_of_stuff2_max_24h': -math.inf, 'number_of_stuff2_max_2h': -math.inf, - 'number_of_stuff2_min_1h': math.inf, 'number_of_stuff2_min_24h': math.inf, 'number_of_stuff2_min_2h': math.inf, + 'number_of_stuff2_avg_1h': math.nan, 'number_of_stuff2_avg_24h': math.nan, 'number_of_stuff2_avg_2h': math.nan, + 'number_of_stuff2_max_1h': math.nan, 'number_of_stuff2_max_24h': math.nan, 'number_of_stuff2_max_2h': math.nan, + 'number_of_stuff2_min_1h': math.nan, 'number_of_stuff2_min_24h': math.nan, 'number_of_stuff2_min_2h': math.nan, 'number_of_stuff2_sum_1h': 0, 'number_of_stuff2_sum_24h': 0, 'number_of_stuff2_sum_2h': 0}, {'col2': 0, 'number_of_stuff1_avg_1h': 0.0, 'number_of_stuff1_avg_24h': 0.0, 'number_of_stuff1_avg_2h': 0.0, 'number_of_stuff1_max_1h': 0, 'number_of_stuff1_max_24h': 0, 'number_of_stuff1_max_2h': 0, @@ -266,9 +266,9 @@ def test_sliding_window_sparse_data_uneven_feature_occurrence(): 'number_of_stuff1_max_1h': 0, 'number_of_stuff1_max_24h': 0, 'number_of_stuff1_max_2h': 0, 'number_of_stuff1_min_1h': 0, 'number_of_stuff1_min_24h': 0, 'number_of_stuff1_min_2h': 0, 'number_of_stuff1_sum_1h': 0, 'number_of_stuff1_sum_24h': 0, 'number_of_stuff1_sum_2h': 0, - 'number_of_stuff2_avg_1h': 0, 'number_of_stuff2_avg_24h': 0, 'number_of_stuff2_avg_2h': 0, - 'number_of_stuff2_max_1h': -math.inf, 'number_of_stuff2_max_24h': -math.inf, 'number_of_stuff2_max_2h': -math.inf, - 'number_of_stuff2_min_1h': math.inf, 'number_of_stuff2_min_24h': math.inf, 'number_of_stuff2_min_2h': math.inf, + 'number_of_stuff2_avg_1h': math.nan, 'number_of_stuff2_avg_24h': math.nan, 'number_of_stuff2_avg_2h': math.nan, + 'number_of_stuff2_max_1h': math.nan, 'number_of_stuff2_max_24h': math.nan, 'number_of_stuff2_max_2h': math.nan, + 'number_of_stuff2_min_1h': math.nan, 'number_of_stuff2_min_24h': math.nan, 'number_of_stuff2_min_2h': math.nan, 'number_of_stuff2_sum_1h': 0, 'number_of_stuff2_sum_24h': 0, 'number_of_stuff2_sum_2h': 0}, {'col2': 0, 'number_of_stuff1_avg_1h': 0.0, 'number_of_stuff1_avg_24h': 0.0, 'number_of_stuff1_avg_2h': 0.0, 'number_of_stuff1_max_1h': 0, 'number_of_stuff1_max_24h': 0, 'number_of_stuff1_max_2h': 0,