diff --git a/storey/dtypes.py b/storey/dtypes.py index 03686551..5c6ded6a 100644 --- a/storey/dtypes.py +++ b/storey/dtypes.py @@ -225,7 +225,7 @@ def __init__(self, windows: List[str], period: Optional[str] = None): WindowsBase.__init__(self, period_millis, windows_tuples) def get_window_start_time_by_time(self, timestamp): - return timestamp + return int(timestamp / self.period_millis) * self.period_millis class EmissionType(Enum): diff --git a/tests/test_aggregate_by_key.py b/tests/test_aggregate_by_key.py index a530b339..120c20cc 100644 --- a/tests/test_aggregate_by_key.py +++ b/tests/test_aggregate_by_key.py @@ -467,14 +467,14 @@ def test_sliding_window_aggregations_with_max_values_flow(): actual = controller.await_termination() expected_results = [{'col1': 0, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1}, {'col1': 1, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1}, - {'col1': 2, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1}, - {'col1': 3, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1}, - {'col1': 4, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1}, - {'col1': 5, 'num_hours_with_stuff_in_the_last_24h_count_24h': 1}, + {'col1': 2, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}, + {'col1': 3, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}, + {'col1': 4, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}, + {'col1': 5, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}, {'col1': 6, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}, {'col1': 7, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}, - {'col1': 8, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}, - {'col1': 9, 'num_hours_with_stuff_in_the_last_24h_count_24h': 2}] + {'col1': 8, 'num_hours_with_stuff_in_the_last_24h_count_24h': 3}, + {'col1': 9, 'num_hours_with_stuff_in_the_last_24h_count_24h': 3}] assert actual == expected_results, \ f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}' @@ -664,6 +664,93 @@ def test_fixed_window_aggregation_with_uncommon_windows_flow(): f'actual did not match expected. \n actual: {termination_result} \n expected: {expected}' +def test_sliding_window_aggregation_with_uncommon_windows_flow(): + time_format = '%Y-%m-%d %H:%M:%S.%f' + columns = ['sample_time', 'signal', 'isotope'] + data = [[datetime.strptime('2021-05-30 16:42:15.797000', time_format).replace(tzinfo=timezone.utc), 790.235, 'U235'], + [datetime.strptime('2021-05-30 16:45:15.798000', time_format).replace(tzinfo=timezone.utc), 498.491, 'U235'], + [datetime.strptime('2021-05-30 16:48:15.799000', time_format).replace(tzinfo=timezone.utc), 34650.00343, 'U235'], + [datetime.strptime('2021-05-30 16:51:15.800000', time_format).replace(tzinfo=timezone.utc), 189.823, 'U235'], + [datetime.strptime('2021-05-30 16:54:15.801000', time_format).replace(tzinfo=timezone.utc), 379.524, 'U235'], + [datetime.strptime('2021-05-30 16:57:15.802000', time_format).replace(tzinfo=timezone.utc), 2225.4952, 'U235'], + [datetime.strptime('2021-05-30 17:00:15.803000', time_format).replace(tzinfo=timezone.utc), 1049.0903, 'U235'], + [datetime.strptime('2021-05-30 17:03:15.804000', time_format).replace(tzinfo=timezone.utc), 41905.63447, 'U235'], + [datetime.strptime('2021-05-30 17:06:15.805000', time_format).replace(tzinfo=timezone.utc), 4987.6764, 'U235'], + [datetime.strptime('2021-05-30 17:09:15.806000', time_format).replace(tzinfo=timezone.utc), 67657.11975, 'U235'], + [datetime.strptime('2021-05-30 17:12:15.807000', time_format).replace(tzinfo=timezone.utc), 56173.06327, 'U235'], + [datetime.strptime('2021-05-30 17:15:15.808000', time_format).replace(tzinfo=timezone.utc), 14249.67394, 'U235'], + [datetime.strptime('2021-05-30 17:18:15.809000', time_format).replace(tzinfo=timezone.utc), 656.831, 'U235'], + [datetime.strptime('2021-05-30 17:21:15.810000', time_format).replace(tzinfo=timezone.utc), 5768.4822, 'U235'], + [datetime.strptime('2021-05-30 17:24:15.811000', time_format).replace(tzinfo=timezone.utc), 929.028, 'U235'], + [datetime.strptime('2021-05-30 17:27:15.812000', time_format).replace(tzinfo=timezone.utc), 2585.9646, 'U235'], + [datetime.strptime('2021-05-30 17:30:15.813000', time_format).replace(tzinfo=timezone.utc), 358.918, 'U235']] + + df = pd.DataFrame(data, columns=columns) + + controller = build_flow([ + DataframeSource(df, time_field="sample_time", key_field="isotope"), + AggregateByKey([FieldAggregator("samples", "signal", ["count"], + SlidingWindows(['15m', '25m', '45m', '1h'], '5m'))], Table("U235_test", NoopDriver())), + Reduce([], lambda acc, x: append_return(acc, x)), + ]).run() + termination_result = controller.await_termination() + + expected = [{'samples_count_15m': 1.0, 'samples_count_25m': 1.0, 'samples_count_45m': 1.0, 'samples_count_1h': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 16:42:15.797000+0000', tz='UTC'), 'signal': 790.235, + 'isotope': 'U235'}, + {'samples_count_15m': 2.0, 'samples_count_25m': 2.0, 'samples_count_45m': 2.0, 'samples_count_1h': 2.0, + 'sample_time': pd.Timestamp('2021-05-30 16:45:15.798000+0000', tz='UTC'), 'signal': 498.491, + 'isotope': 'U235'}, + {'samples_count_15m': 3.0, 'samples_count_25m': 3.0, 'samples_count_45m': 3.0, 'samples_count_1h': 3.0, + 'sample_time': pd.Timestamp('2021-05-30 16:48:15.799000+0000', tz='UTC'), 'signal': 34650.00343, + 'isotope': 'U235'}, + {'samples_count_15m': 4.0, 'samples_count_25m': 4.0, 'samples_count_45m': 4.0, 'samples_count_1h': 4.0, + 'sample_time': pd.Timestamp('2021-05-30 16:51:15.800000+0000', tz='UTC'), 'signal': 189.823, + 'isotope': 'U235'}, + {'samples_count_15m': 5.0, 'samples_count_25m': 5.0, 'samples_count_45m': 5.0, 'samples_count_1h': 5.0, + 'sample_time': pd.Timestamp('2021-05-30 16:54:15.801000+0000', tz='UTC'), 'signal': 379.524, + 'isotope': 'U235'}, + {'samples_count_15m': 5.0, 'samples_count_25m': 6.0, 'samples_count_45m': 6.0, 'samples_count_1h': 6.0, + 'sample_time': pd.Timestamp('2021-05-30 16:57:15.802000+0000', tz='UTC'), 'signal': 2225.4952, + 'isotope': 'U235'}, + {'samples_count_15m': 4.0, 'samples_count_25m': 7.0, 'samples_count_45m': 7.0, 'samples_count_1h': 7.0, + 'sample_time': pd.Timestamp('2021-05-30 17:00:15.803000+0000', tz='UTC'), 'signal': 1049.0903, + 'isotope': 'U235'}, + {'samples_count_15m': 5.0, 'samples_count_25m': 8.0, 'samples_count_45m': 8.0, 'samples_count_1h': 8.0, + 'sample_time': pd.Timestamp('2021-05-30 17:03:15.804000+0000', tz='UTC'), 'signal': 41905.63447, + 'isotope': 'U235'}, + {'samples_count_15m': 4.0, 'samples_count_25m': 8.0, 'samples_count_45m': 9.0, 'samples_count_1h': 9.0, + 'sample_time': pd.Timestamp('2021-05-30 17:06:15.805000+0000', tz='UTC'), 'signal': 4987.6764, + 'isotope': 'U235'}, + {'samples_count_15m': 5.0, 'samples_count_25m': 9.0, 'samples_count_45m': 10.0, 'samples_count_1h': 10.0, + 'sample_time': pd.Timestamp('2021-05-30 17:09:15.806000+0000', tz='UTC'), 'signal': 67657.11975, + 'isotope': 'U235'}, + {'samples_count_15m': 5.0, 'samples_count_25m': 8.0, 'samples_count_45m': 11.0, 'samples_count_1h': 11.0, + 'sample_time': pd.Timestamp('2021-05-30 17:12:15.807000+0000', tz='UTC'), 'signal': 56173.06327, + 'isotope': 'U235'}, + {'samples_count_15m': 4.0, 'samples_count_25m': 7.0, 'samples_count_45m': 12.0, 'samples_count_1h': 12.0, + 'sample_time': pd.Timestamp('2021-05-30 17:15:15.808000+0000', tz='UTC'), 'signal': 14249.67394, + 'isotope': 'U235'}, + {'samples_count_15m': 5.0, 'samples_count_25m': 8.0, 'samples_count_45m': 13.0, 'samples_count_1h': 13.0, + 'sample_time': pd.Timestamp('2021-05-30 17:18:15.809000+0000', tz='UTC'), + 'signal': 656.831, 'isotope': 'U235'}, + {'samples_count_15m': 4.0, 'samples_count_25m': 8.0, 'samples_count_45m': 14.0, 'samples_count_1h': 14.0, + 'sample_time': pd.Timestamp('2021-05-30 17:21:15.810000+0000', tz='UTC'), 'signal': 5768.4822, + 'isotope': 'U235'}, + {'samples_count_15m': 5.0, 'samples_count_25m': 9.0, 'samples_count_45m': 15.0, 'samples_count_1h': 15.0, + 'sample_time': pd.Timestamp('2021-05-30 17:24:15.811000+0000', tz='UTC'), 'signal': 929.028, + 'isotope': 'U235'}, + {'samples_count_15m': 5.0, 'samples_count_25m': 8.0, 'samples_count_45m': 15.0, 'samples_count_1h': 16.0, + 'sample_time': pd.Timestamp('2021-05-30 17:27:15.812000+0000', tz='UTC'), 'signal': 2585.9646, + 'isotope': 'U235'}, + {'samples_count_15m': 4.0, 'samples_count_25m': 7.0, 'samples_count_45m': 14.0, 'samples_count_1h': 17.0, + 'sample_time': pd.Timestamp('2021-05-30 17:30:15.813000+0000', tz='UTC'), 'signal': 358.918, + 'isotope': 'U235'}] + + assert termination_result == expected, \ + f'actual did not match expected. \n actual: {termination_result} \n expected: {expected}' + + def test_emit_max_event_sliding_window_multiple_keys_aggregation_flow(): controller = build_flow([ SyncEmitSource(),