diff --git a/storey/dtypes.py b/storey/dtypes.py index 5c6ded6a..99bdb21c 100644 --- a/storey/dtypes.py +++ b/storey/dtypes.py @@ -190,10 +190,8 @@ def round_up_time_to_window(self, timestamp): def get_period_by_time(self, timestamp): return int(timestamp / self.period_millis) * self.period_millis - def get_window_start_time_by_time(self, reference_timestamp): - window_seconds = int(self.window_millis / 1000) - timestamp_seconds = int(reference_timestamp / 1000) - return int(timestamp_seconds / window_seconds) * window_seconds * 1000 + def get_window_start_time_by_time(self, timestamp): + return int(timestamp / self.window_millis) * self.window_millis class SlidingWindows(WindowsBase): diff --git a/storey/table.py b/storey/table.py index e07ad582..32ef087f 100644 --- a/storey/table.py +++ b/storey/table.py @@ -1162,7 +1162,6 @@ def get_nearest_window_index_by_timestamp(self, timestamp, window_millis): def remove_old_values_from_pre_aggregations(self, timestamp): if self._precalculated_aggregations: for (aggr_name, current_window_millis), aggr in self._current_aggregate_values.items(): - self.is_fixed_window = isinstance(self.explicit_windows, FixedWindows) if self.is_fixed_window: previous_window_start_time = \ self.get_window_start_time_from_timestamp(self._last_data_point_timestamp, current_window_millis) @@ -1211,10 +1210,19 @@ def advance_window_period(self, advance_to): aggr_value.reset() self.buckets.append(bucket_to_reuse) - self.first_bucket_start_time = \ - self.first_bucket_start_time + buckets_to_advance * self.period_millis - self.last_bucket_start_time = \ - self.last_bucket_start_time + buckets_to_advance * self.period_millis + # fixed windows are advancing in integral window size + if self.is_fixed_window: + window_millis = self.last_bucket_start_time + self.period_millis - self.first_bucket_start_time + windows_to_advance = int((advance_to - self.first_bucket_start_time) / window_millis) + self.first_bucket_start_time = \ + self.first_bucket_start_time + windows_to_advance * window_millis + self.last_bucket_start_time = \ + self.last_bucket_start_time + windows_to_advance * window_millis + else: + self.first_bucket_start_time = \ + self.first_bucket_start_time + buckets_to_advance * self.period_millis + self.last_bucket_start_time = \ + self.last_bucket_start_time + buckets_to_advance * self.period_millis def get_end_bucket(self, timestamp): if self.is_fixed_window: diff --git a/tests/test_aggregate_by_key.py b/tests/test_aggregate_by_key.py index e34ee254..c5161271 100644 --- a/tests/test_aggregate_by_key.py +++ b/tests/test_aggregate_by_key.py @@ -664,6 +664,77 @@ def test_fixed_window_aggregation_with_uncommon_windows_flow(): f'actual did not match expected. \n actual: {termination_result} \n expected: {expected}' +def test_fixed_window_aggregation_with_multiple_keys_flow(): + time_format = '%Y-%m-%d %H:%M:%S.%f' + columns = ['sample_time', 'signal', 'isotope'] + data = [[datetime.strptime('2021-05-30 16:42:15.797000', time_format).replace(tzinfo=timezone.utc), 790.235, 'U235'], + [datetime.strptime('2021-05-30 16:45:15.798000', time_format).replace(tzinfo=timezone.utc), 498.491, 'U235'], + [datetime.strptime('2021-05-30 16:48:15.799000', time_format).replace(tzinfo=timezone.utc), 34650.00343, 'U238'], + [datetime.strptime('2021-05-30 16:51:15.800000', time_format).replace(tzinfo=timezone.utc), 189.823, 'U238'], + [datetime.strptime('2021-05-30 16:54:15.801000', time_format).replace(tzinfo=timezone.utc), 379.524, 'U238'], + [datetime.strptime('2021-05-30 16:57:15.802000', time_format).replace(tzinfo=timezone.utc), 2225.4952, 'U238'], + [datetime.strptime('2021-05-30 17:00:15.803000', time_format).replace(tzinfo=timezone.utc), 1049.0903, 'U235'], + [datetime.strptime('2021-05-30 17:03:15.804000', time_format).replace(tzinfo=timezone.utc), 41905.63447, 'U238'], + [datetime.strptime('2021-05-30 17:06:15.805000', time_format).replace(tzinfo=timezone.utc), 4987.6764, 'U235'], + [datetime.strptime('2021-05-30 17:09:15.806000', time_format).replace(tzinfo=timezone.utc), 67657.11975, 'U235'], + [datetime.strptime('2021-05-30 17:12:15.807000', time_format).replace(tzinfo=timezone.utc), 56173.06327, 'U235'], + [datetime.strptime('2021-05-30 17:15:15.808000', time_format).replace(tzinfo=timezone.utc), 14249.67394, 'U238'], + [datetime.strptime('2021-05-30 17:18:15.809000', time_format).replace(tzinfo=timezone.utc), 656.831, 'U238'], + [datetime.strptime('2021-05-30 17:21:15.810000', time_format).replace(tzinfo=timezone.utc), 5768.4822, 'U235'], + [datetime.strptime('2021-05-30 17:24:15.811000', time_format).replace(tzinfo=timezone.utc), 929.028, 'U235'], + [datetime.strptime('2021-05-30 17:27:15.812000', time_format).replace(tzinfo=timezone.utc), 2585.9646, 'U238'], + [datetime.strptime('2021-05-30 17:30:15.813000', time_format).replace(tzinfo=timezone.utc), 358.918, 'U238']] + + df = pd.DataFrame(data, columns=columns) + + controller = build_flow([ + DataframeSource(df, time_field="sample_time", key_field="isotope"), + AggregateByKey([FieldAggregator("samples", "signal", ["count"], + FixedWindows(['10m', '15m']))], Table("U235_test", NoopDriver())), + Reduce([], lambda acc, x: append_return(acc, x)), + ]).run() + termination_result = controller.await_termination() + + expected = [ + {'samples_count_10m': 1.0, 'samples_count_15m': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 16:42:15.797000+0000', tz='UTC'), 'signal': 790.235, 'isotope': 'U235'}, + {'samples_count_10m': 2.0, 'samples_count_15m': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 16:45:15.798000+0000', tz='UTC'), 'signal': 498.491, 'isotope': 'U235'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 16:48:15.799000+0000', tz='UTC'), 'signal': 34650.00343,'isotope': 'U238'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 2.0, + 'sample_time': pd.Timestamp('2021-05-30 16:51:15.800000+0000', tz='UTC'), 'signal': 189.823, 'isotope': 'U238'}, + {'samples_count_10m': 2.0, 'samples_count_15m': 3.0, + 'sample_time': pd.Timestamp('2021-05-30 16:54:15.801000+0000', tz='UTC'), 'signal': 379.524, 'isotope': 'U238'}, + {'samples_count_10m': 3.0, 'samples_count_15m': 4.0, + 'sample_time': pd.Timestamp('2021-05-30 16:57:15.802000+0000', tz='UTC'), 'signal': 2225.4952, 'isotope': 'U238'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 17:00:15.803000+0000', tz='UTC'), 'signal': 1049.0903, 'isotope': 'U235'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 17:03:15.804000+0000', tz='UTC'), 'signal': 41905.63447, 'isotope': 'U238'}, + {'samples_count_10m': 2.0, 'samples_count_15m': 2.0, + 'sample_time': pd.Timestamp('2021-05-30 17:06:15.805000+0000', tz='UTC'), 'signal': 4987.6764, 'isotope': 'U235'}, + {'samples_count_10m': 3.0, 'samples_count_15m': 3.0, + 'sample_time': pd.Timestamp('2021-05-30 17:09:15.806000+0000', tz='UTC'), 'signal': 67657.11975, 'isotope': 'U235'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 4.0, + 'sample_time': pd.Timestamp('2021-05-30 17:12:15.807000+0000', tz='UTC'), 'signal': 56173.06327, 'isotope': 'U235'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 17:15:15.808000+0000', tz='UTC'), 'signal': 14249.67394, 'isotope': 'U238'}, + {'samples_count_10m': 2.0, 'samples_count_15m': 2.0, + 'sample_time': pd.Timestamp('2021-05-30 17:18:15.809000+0000', tz='UTC'), 'signal': 656.831, 'isotope': 'U238'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 17:21:15.810000+0000', tz='UTC'), 'signal': 5768.4822, 'isotope': 'U235'}, + {'samples_count_10m': 2.0, 'samples_count_15m': 2.0, + 'sample_time': pd.Timestamp('2021-05-30 17:24:15.811000+0000', tz='UTC'), 'signal': 929.028, 'isotope': 'U235'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 3.0, + 'sample_time': pd.Timestamp('2021-05-30 17:27:15.812000+0000', tz='UTC'), 'signal': 2585.9646, 'isotope': 'U238'}, + {'samples_count_10m': 1.0, 'samples_count_15m': 1.0, + 'sample_time': pd.Timestamp('2021-05-30 17:30:15.813000+0000', tz='UTC'), 'signal': 358.918, 'isotope': 'U238'}] + + assert termination_result == expected, \ + f'actual did not match expected. \n actual: {termination_result} \n expected: {expected}' + + def test_sliding_window_aggregation_with_uncommon_windows_flow(): time_format = '%Y-%m-%d %H:%M:%S.%f' columns = ['sample_time', 'signal', 'isotope']