Skip to content

Commit

Permalink
Aggregations emit NaN values where a value cannot be calculated due t…
Browse files Browse the repository at this point in the history
…o lack of data. (#187)

* Aggregations emit NaN values where a value cannot be calculated due to lack of data.

* max_value

* Convert Inf and -Inf to NaN only in calculate_features.
  • Loading branch information
Gal Topper authored Apr 4, 2021
1 parent cc66f6c commit 15afc6e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 28 deletions.
7 changes: 4 additions & 3 deletions integration/test_aggregation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_query_virtual_aggregations_flow(setup_teardown_test):
controller.terminate()
actual = controller.await_termination()
expected_results = [
{'col1': 0, 'number_of_stuff_avg_24h': 0.0, 'number_of_stuff_stddev_24h': 0, 'number_of_stuff_stdvar_24h': 0},
{'col1': 0, 'number_of_stuff_avg_24h': 0.0, 'number_of_stuff_stddev_24h': math.nan, 'number_of_stuff_stdvar_24h': math.nan},
{'col1': 1, 'number_of_stuff_avg_24h': 0.5, 'number_of_stuff_stddev_24h': math.sqrt(0.5), 'number_of_stuff_stdvar_24h': 0.5},
{'col1': 2, 'number_of_stuff_avg_24h': 1.0, 'number_of_stuff_stddev_24h': 1.0, 'number_of_stuff_stdvar_24h': 1.0},
{'col1': 3, 'number_of_stuff_avg_24h': 1.5, 'number_of_stuff_stddev_24h': math.sqrt(1.6666666666666667),
Expand Down Expand Up @@ -1139,7 +1139,8 @@ def test_aggregate_multiple_keys(setup_teardown_test):
"first_name": ["moshe", "yosi", "yosi"],
"last_name": ["cohen", "levi", "levi"],
"some_data": [1, 2, 3],
"time": [current_time - pd.Timedelta(minutes=25), current_time - pd.Timedelta(minutes=30), current_time - pd.Timedelta(minutes=35)]
"time": [current_time - pd.Timedelta(minutes=25), current_time - pd.Timedelta(minutes=30),
current_time - pd.Timedelta(minutes=35)]
}
)

Expand Down Expand Up @@ -1176,4 +1177,4 @@ def test_aggregate_multiple_keys(setup_teardown_test):
]

assert actual == expected_results, \
f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}'
f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}'
6 changes: 3 additions & 3 deletions storey/aggregation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ def _avg(args):
count = args[0]
sum = args[1]
if count == 0:
return 0
return math.nan
return sum / count


def _stddev(args):
count = args[0]
if count == 0 or count == 1:
return 0
return math.nan
sum = args[1]
sqr = args[2]

Expand All @@ -67,7 +67,7 @@ def _stddev(args):
def _stdvar(args):
count = args[0]
if count == 0 or count == 1:
return 0
return math.nan
sum = args[1]
sqr = args[2]
return (count * sqr - sum * sum) / (count * (count - 1))
Expand Down
28 changes: 12 additions & 16 deletions storey/table.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import List
import copy
import math
from datetime import datetime
from .drivers import Driver
from .utils import _split_path, get_hashed_key
from .dtypes import FieldAggregator, SlidingWindows, FixedWindows
from typing import List

from .aggregation_utils import is_raw_aggregate, get_virtual_aggregation_func, get_implied_aggregates, get_all_raw_aggregates, \
get_all_raw_aggregates_with_hidden
from .drivers import Driver
from .dtypes import FieldAggregator, SlidingWindows, FixedWindows
from .utils import _split_path, get_hashed_key


class Table:
Expand Down Expand Up @@ -667,9 +669,10 @@ def get_features(self, timestamp):


class AggregationValue:
default_value = None
default_value = math.nan

def __init__(self, max_value=None, set_data=None):
self.value = self.default_value
self.time = datetime.min
self._max_value = max_value
self._set_value = self._set_value_with_max if max_value else self._set_value_without_max
Expand Down Expand Up @@ -723,7 +726,6 @@ class MinValue(AggregationValue):
default_value = float('inf')

def __init__(self, max_value=None, set_data=None):
self.value = max_value or self.default_value
super().__init__(max_value, set_data)

def aggregate(self, time, value):
Expand All @@ -746,7 +748,6 @@ class MaxValue(AggregationValue):
default_value = float('-inf')

def __init__(self, max_value=None, set_data=None):
self.value = self.default_value
super().__init__(max_value, set_data)

def aggregate(self, time, value):
Expand All @@ -762,7 +763,6 @@ class SumValue(AggregationValue):
default_value = 0

def __init__(self, max_value=None, set_data=None):
self.value = self.default_value
super().__init__(max_value, set_data)

def aggregate(self, time, value):
Expand All @@ -774,7 +774,6 @@ class CountValue(AggregationValue):
default_value = 0

def __init__(self, max_value=None, set_data=None):
self.value = self.default_value
super().__init__(max_value, set_data)

def aggregate(self, time, value):
Expand All @@ -786,7 +785,6 @@ class SqrValue(AggregationValue):
default_value = 0

def __init__(self, max_value=None, set_data=None):
self.value = self.default_value
super().__init__(max_value, set_data)

def aggregate(self, time, value):
Expand All @@ -795,10 +793,8 @@ def aggregate(self, time, value):

class LastValue(AggregationValue):
name = 'last'
default_value = None

def __init__(self, max_value=None, set_data=None):
self.value = self.default_value
super().__init__(max_value, set_data)

def aggregate(self, time, value):
Expand All @@ -812,10 +808,8 @@ def get_update_expression(self, old):

class FirstValue(AggregationValue):
name = 'first'
default_value = None

def __init__(self, max_value=None, set_data=None):
self.value = self.default_value
super().__init__(max_value, set_data)
self._first_time = datetime.max

Expand Down Expand Up @@ -1076,8 +1070,10 @@ def get_features(self, timestamp):
# In case our pre aggregates already have the answer
for aggregation_name in self._explicit_raw_aggregations:
for (window_millis, window_str) in self.explicit_windows.windows:
result[f'{self.name}_{aggregation_name}_{window_str}'] = \
self._current_aggregate_values[(aggregation_name, window_millis)].value
value = self._current_aggregate_values[(aggregation_name, window_millis)].value
if value == math.inf or value == -math.inf:
value = math.nan
result[f'{self.name}_{aggregation_name}_{window_str}'] = value

self.augment_virtual_features(result)
return result
Expand Down
12 changes: 6 additions & 6 deletions tests/test_aggregate_by_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ def test_sliding_window_sparse_data():
'number_of_stuff1_max_1h': 0, 'number_of_stuff1_max_24h': 0, 'number_of_stuff1_max_2h': 0,
'number_of_stuff1_min_1h': 0, 'number_of_stuff1_min_24h': 0, 'number_of_stuff1_min_2h': 0,
'number_of_stuff1_sum_1h': 0, 'number_of_stuff1_sum_24h': 0, 'number_of_stuff1_sum_2h': 0,
'number_of_stuff2_avg_1h': 0, 'number_of_stuff2_avg_24h': 0, 'number_of_stuff2_avg_2h': 0,
'number_of_stuff2_max_1h': -math.inf, 'number_of_stuff2_max_24h': -math.inf, 'number_of_stuff2_max_2h': -math.inf,
'number_of_stuff2_min_1h': math.inf, 'number_of_stuff2_min_24h': math.inf, 'number_of_stuff2_min_2h': math.inf,
'number_of_stuff2_avg_1h': math.nan, 'number_of_stuff2_avg_24h': math.nan, 'number_of_stuff2_avg_2h': math.nan,
'number_of_stuff2_max_1h': math.nan, 'number_of_stuff2_max_24h': math.nan, 'number_of_stuff2_max_2h': math.nan,
'number_of_stuff2_min_1h': math.nan, 'number_of_stuff2_min_24h': math.nan, 'number_of_stuff2_min_2h': math.nan,
'number_of_stuff2_sum_1h': 0, 'number_of_stuff2_sum_24h': 0, 'number_of_stuff2_sum_2h': 0},
{'col2': 0, 'number_of_stuff1_avg_1h': 0.0, 'number_of_stuff1_avg_24h': 0.0, 'number_of_stuff1_avg_2h': 0.0,
'number_of_stuff1_max_1h': 0, 'number_of_stuff1_max_24h': 0, 'number_of_stuff1_max_2h': 0,
Expand Down Expand Up @@ -266,9 +266,9 @@ def test_sliding_window_sparse_data_uneven_feature_occurrence():
'number_of_stuff1_max_1h': 0, 'number_of_stuff1_max_24h': 0, 'number_of_stuff1_max_2h': 0,
'number_of_stuff1_min_1h': 0, 'number_of_stuff1_min_24h': 0, 'number_of_stuff1_min_2h': 0,
'number_of_stuff1_sum_1h': 0, 'number_of_stuff1_sum_24h': 0, 'number_of_stuff1_sum_2h': 0,
'number_of_stuff2_avg_1h': 0, 'number_of_stuff2_avg_24h': 0, 'number_of_stuff2_avg_2h': 0,
'number_of_stuff2_max_1h': -math.inf, 'number_of_stuff2_max_24h': -math.inf, 'number_of_stuff2_max_2h': -math.inf,
'number_of_stuff2_min_1h': math.inf, 'number_of_stuff2_min_24h': math.inf, 'number_of_stuff2_min_2h': math.inf,
'number_of_stuff2_avg_1h': math.nan, 'number_of_stuff2_avg_24h': math.nan, 'number_of_stuff2_avg_2h': math.nan,
'number_of_stuff2_max_1h': math.nan, 'number_of_stuff2_max_24h': math.nan, 'number_of_stuff2_max_2h': math.nan,
'number_of_stuff2_min_1h': math.nan, 'number_of_stuff2_min_24h': math.nan, 'number_of_stuff2_min_2h': math.nan,
'number_of_stuff2_sum_1h': 0, 'number_of_stuff2_sum_24h': 0, 'number_of_stuff2_sum_2h': 0},
{'col2': 0, 'number_of_stuff1_avg_1h': 0.0, 'number_of_stuff1_avg_24h': 0.0, 'number_of_stuff1_avg_2h': 0.0,
'number_of_stuff1_max_1h': 0, 'number_of_stuff1_max_24h': 0, 'number_of_stuff1_max_2h': 0,
Expand Down

0 comments on commit 15afc6e

Please sign in to comment.