Skip to content

Commit

Permalink
ML-635: unexpected results aggregating with sliding window (#245)
Browse files Browse the repository at this point in the history
Signed-off-by: Eyal Salomon <[email protected]>
  • Loading branch information
Eyal Salomon authored Jun 21, 2021
1 parent 3aadc5f commit 8a782b4
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 7 deletions.
2 changes: 1 addition & 1 deletion storey/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def __init__(self, windows: List[str], period: Optional[str] = None):
WindowsBase.__init__(self, period_millis, windows_tuples)

def get_window_start_time_by_time(self, timestamp):
return timestamp
return int(timestamp / self.period_millis) * self.period_millis


class EmissionType(Enum):
Expand Down
99 changes: 93 additions & 6 deletions tests/test_aggregate_by_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,14 +467,14 @@ def test_sliding_window_aggregations_with_max_values_flow():
actual = controller.await_termination()
expected_results = [{'col1': 0, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1},
{'col1': 1, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1},
{'col1': 2, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1},
{'col1': 3, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1},
{'col1': 4, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1},
{'col1': 5, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1},
{'col1': 2, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2},
{'col1': 3, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2},
{'col1': 4, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2},
{'col1': 5, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2},
{'col1': 6, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2},
{'col1': 7, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2},
{'col1': 8, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2},
{'col1': 9, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}]
{'col1': 8, 'num_hours_with_stuff_in_the_last_24h_count_24h': 3},
{'col1': 9, 'num_hours_with_stuff_in_the_last_24h_count_24h': 3}]

assert actual == expected_results, \
f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}'
Expand Down Expand Up @@ -664,6 +664,93 @@ def test_fixed_window_aggregation_with_uncommon_windows_flow():
f'actual did not match expected. \n actual: {termination_result} \n expected: {expected}'


def test_sliding_window_aggregation_with_uncommon_windows_flow():
time_format = '%Y-%m-%d %H:%M:%S.%f'
columns = ['sample_time', 'signal', 'isotope']
data = [[datetime.strptime('2021-05-30 16:42:15.797000', time_format).replace(tzinfo=timezone.utc), 790.235, 'U235'],
[datetime.strptime('2021-05-30 16:45:15.798000', time_format).replace(tzinfo=timezone.utc), 498.491, 'U235'],
[datetime.strptime('2021-05-30 16:48:15.799000', time_format).replace(tzinfo=timezone.utc), 34650.00343, 'U235'],
[datetime.strptime('2021-05-30 16:51:15.800000', time_format).replace(tzinfo=timezone.utc), 189.823, 'U235'],
[datetime.strptime('2021-05-30 16:54:15.801000', time_format).replace(tzinfo=timezone.utc), 379.524, 'U235'],
[datetime.strptime('2021-05-30 16:57:15.802000', time_format).replace(tzinfo=timezone.utc), 2225.4952, 'U235'],
[datetime.strptime('2021-05-30 17:00:15.803000', time_format).replace(tzinfo=timezone.utc), 1049.0903, 'U235'],
[datetime.strptime('2021-05-30 17:03:15.804000', time_format).replace(tzinfo=timezone.utc), 41905.63447, 'U235'],
[datetime.strptime('2021-05-30 17:06:15.805000', time_format).replace(tzinfo=timezone.utc), 4987.6764, 'U235'],
[datetime.strptime('2021-05-30 17:09:15.806000', time_format).replace(tzinfo=timezone.utc), 67657.11975, 'U235'],
[datetime.strptime('2021-05-30 17:12:15.807000', time_format).replace(tzinfo=timezone.utc), 56173.06327, 'U235'],
[datetime.strptime('2021-05-30 17:15:15.808000', time_format).replace(tzinfo=timezone.utc), 14249.67394, 'U235'],
[datetime.strptime('2021-05-30 17:18:15.809000', time_format).replace(tzinfo=timezone.utc), 656.831, 'U235'],
[datetime.strptime('2021-05-30 17:21:15.810000', time_format).replace(tzinfo=timezone.utc), 5768.4822, 'U235'],
[datetime.strptime('2021-05-30 17:24:15.811000', time_format).replace(tzinfo=timezone.utc), 929.028, 'U235'],
[datetime.strptime('2021-05-30 17:27:15.812000', time_format).replace(tzinfo=timezone.utc), 2585.9646, 'U235'],
[datetime.strptime('2021-05-30 17:30:15.813000', time_format).replace(tzinfo=timezone.utc), 358.918, 'U235']]

df = pd.DataFrame(data, columns=columns)

controller = build_flow([
DataframeSource(df, time_field="sample_time", key_field="isotope"),
AggregateByKey([FieldAggregator("samples", "signal", ["count"],
SlidingWindows(['15m', '25m', '45m', '1h'], '5m'))], Table("U235_test", NoopDriver())),
Reduce([], lambda acc, x: append_return(acc, x)),
]).run()
termination_result = controller.await_termination()

expected = [{'samples_count_15m': 1.0, 'samples_count_25m': 1.0, 'samples_count_45m': 1.0, 'samples_count_1h': 1.0,
'sample_time': pd.Timestamp('2021-05-30 16:42:15.797000+0000', tz='UTC'), 'signal': 790.235,
'isotope': 'U235'},
{'samples_count_15m': 2.0, 'samples_count_25m': 2.0, 'samples_count_45m': 2.0, 'samples_count_1h': 2.0,
'sample_time': pd.Timestamp('2021-05-30 16:45:15.798000+0000', tz='UTC'), 'signal': 498.491,
'isotope': 'U235'},
{'samples_count_15m': 3.0, 'samples_count_25m': 3.0, 'samples_count_45m': 3.0, 'samples_count_1h': 3.0,
'sample_time': pd.Timestamp('2021-05-30 16:48:15.799000+0000', tz='UTC'), 'signal': 34650.00343,
'isotope': 'U235'},
{'samples_count_15m': 4.0, 'samples_count_25m': 4.0, 'samples_count_45m': 4.0, 'samples_count_1h': 4.0,
'sample_time': pd.Timestamp('2021-05-30 16:51:15.800000+0000', tz='UTC'), 'signal': 189.823,
'isotope': 'U235'},
{'samples_count_15m': 5.0, 'samples_count_25m': 5.0, 'samples_count_45m': 5.0, 'samples_count_1h': 5.0,
'sample_time': pd.Timestamp('2021-05-30 16:54:15.801000+0000', tz='UTC'), 'signal': 379.524,
'isotope': 'U235'},
{'samples_count_15m': 5.0, 'samples_count_25m': 6.0, 'samples_count_45m': 6.0, 'samples_count_1h': 6.0,
'sample_time': pd.Timestamp('2021-05-30 16:57:15.802000+0000', tz='UTC'), 'signal': 2225.4952,
'isotope': 'U235'},
{'samples_count_15m': 4.0, 'samples_count_25m': 7.0, 'samples_count_45m': 7.0, 'samples_count_1h': 7.0,
'sample_time': pd.Timestamp('2021-05-30 17:00:15.803000+0000', tz='UTC'), 'signal': 1049.0903,
'isotope': 'U235'},
{'samples_count_15m': 5.0, 'samples_count_25m': 8.0, 'samples_count_45m': 8.0, 'samples_count_1h': 8.0,
'sample_time': pd.Timestamp('2021-05-30 17:03:15.804000+0000', tz='UTC'), 'signal': 41905.63447,
'isotope': 'U235'},
{'samples_count_15m': 4.0, 'samples_count_25m': 8.0, 'samples_count_45m': 9.0, 'samples_count_1h': 9.0,
'sample_time': pd.Timestamp('2021-05-30 17:06:15.805000+0000', tz='UTC'), 'signal': 4987.6764,
'isotope': 'U235'},
{'samples_count_15m': 5.0, 'samples_count_25m': 9.0, 'samples_count_45m': 10.0, 'samples_count_1h': 10.0,
'sample_time': pd.Timestamp('2021-05-30 17:09:15.806000+0000', tz='UTC'), 'signal': 67657.11975,
'isotope': 'U235'},
{'samples_count_15m': 5.0, 'samples_count_25m': 8.0, 'samples_count_45m': 11.0, 'samples_count_1h': 11.0,
'sample_time': pd.Timestamp('2021-05-30 17:12:15.807000+0000', tz='UTC'), 'signal': 56173.06327,
'isotope': 'U235'},
{'samples_count_15m': 4.0, 'samples_count_25m': 7.0, 'samples_count_45m': 12.0, 'samples_count_1h': 12.0,
'sample_time': pd.Timestamp('2021-05-30 17:15:15.808000+0000', tz='UTC'), 'signal': 14249.67394,
'isotope': 'U235'},
{'samples_count_15m': 5.0, 'samples_count_25m': 8.0, 'samples_count_45m': 13.0, 'samples_count_1h': 13.0,
'sample_time': pd.Timestamp('2021-05-30 17:18:15.809000+0000', tz='UTC'),
'signal': 656.831, 'isotope': 'U235'},
{'samples_count_15m': 4.0, 'samples_count_25m': 8.0, 'samples_count_45m': 14.0, 'samples_count_1h': 14.0,
'sample_time': pd.Timestamp('2021-05-30 17:21:15.810000+0000', tz='UTC'), 'signal': 5768.4822,
'isotope': 'U235'},
{'samples_count_15m': 5.0, 'samples_count_25m': 9.0, 'samples_count_45m': 15.0, 'samples_count_1h': 15.0,
'sample_time': pd.Timestamp('2021-05-30 17:24:15.811000+0000', tz='UTC'), 'signal': 929.028,
'isotope': 'U235'},
{'samples_count_15m': 5.0, 'samples_count_25m': 8.0, 'samples_count_45m': 15.0, 'samples_count_1h': 16.0,
'sample_time': pd.Timestamp('2021-05-30 17:27:15.812000+0000', tz='UTC'), 'signal': 2585.9646,
'isotope': 'U235'},
{'samples_count_15m': 4.0, 'samples_count_25m': 7.0, 'samples_count_45m': 14.0, 'samples_count_1h': 17.0,
'sample_time': pd.Timestamp('2021-05-30 17:30:15.813000+0000', tz='UTC'), 'signal': 358.918,
'isotope': 'U235'}]

assert termination_result == expected, \
f'actual did not match expected. \n actual: {termination_result} \n expected: {expected}'


def test_emit_max_event_sliding_window_multiple_keys_aggregation_flow():
controller = build_flow([
SyncEmitSource(),
Expand Down

0 comments on commit 8a782b4

Please sign in to comment.