Skip to content

Commit

Permalink
[Storey] - Align fixed windows first and last buckets time to the win…
Browse files Browse the repository at this point in the history
…dow size (#251)

Signed-off-by: Eyal Salomon <[email protected]>
  • Loading branch information
Eyal Salomon authored Jun 27, 2021
1 parent 5938394 commit 50c2254
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 9 deletions.
6 changes: 2 additions & 4 deletions storey/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,8 @@ def round_up_time_to_window(self, timestamp):
def get_period_by_time(self, timestamp):
return int(timestamp / self.period_millis) * self.period_millis

def get_window_start_time_by_time(self, reference_timestamp):
window_seconds = int(self.window_millis / 1000)
timestamp_seconds = int(reference_timestamp / 1000)
return int(timestamp_seconds / window_seconds) * window_seconds * 1000
def get_window_start_time_by_time(self, timestamp):
return int(timestamp / self.window_millis) * self.window_millis


class SlidingWindows(WindowsBase):
Expand Down
18 changes: 13 additions & 5 deletions storey/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,6 @@ def get_nearest_window_index_by_timestamp(self, timestamp, window_millis):
def remove_old_values_from_pre_aggregations(self, timestamp):
if self._precalculated_aggregations:
for (aggr_name, current_window_millis), aggr in self._current_aggregate_values.items():
self.is_fixed_window = isinstance(self.explicit_windows, FixedWindows)
if self.is_fixed_window:
previous_window_start_time = \
self.get_window_start_time_from_timestamp(self._last_data_point_timestamp, current_window_millis)
Expand Down Expand Up @@ -1211,10 +1210,19 @@ def advance_window_period(self, advance_to):
aggr_value.reset()
self.buckets.append(bucket_to_reuse)

self.first_bucket_start_time = \
self.first_bucket_start_time + buckets_to_advance * self.period_millis
self.last_bucket_start_time = \
self.last_bucket_start_time + buckets_to_advance * self.period_millis
# fixed windows are advancing in integral window size
if self.is_fixed_window:
window_millis = self.last_bucket_start_time + self.period_millis - self.first_bucket_start_time
windows_to_advance = int((advance_to - self.first_bucket_start_time) / window_millis)
self.first_bucket_start_time = \
self.first_bucket_start_time + windows_to_advance * window_millis
self.last_bucket_start_time = \
self.last_bucket_start_time + windows_to_advance * window_millis
else:
self.first_bucket_start_time = \
self.first_bucket_start_time + buckets_to_advance * self.period_millis
self.last_bucket_start_time = \
self.last_bucket_start_time + buckets_to_advance * self.period_millis

def get_end_bucket(self, timestamp):
if self.is_fixed_window:
Expand Down
71 changes: 71 additions & 0 deletions tests/test_aggregate_by_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,77 @@ def test_fixed_window_aggregation_with_uncommon_windows_flow():
f'actual did not match expected. \n actual: {termination_result} \n expected: {expected}'


def test_fixed_window_aggregation_with_multiple_keys_flow():
time_format = '%Y-%m-%d %H:%M:%S.%f'
columns = ['sample_time', 'signal', 'isotope']
data = [[datetime.strptime('2021-05-30 16:42:15.797000', time_format).replace(tzinfo=timezone.utc), 790.235, 'U235'],
[datetime.strptime('2021-05-30 16:45:15.798000', time_format).replace(tzinfo=timezone.utc), 498.491, 'U235'],
[datetime.strptime('2021-05-30 16:48:15.799000', time_format).replace(tzinfo=timezone.utc), 34650.00343, 'U238'],
[datetime.strptime('2021-05-30 16:51:15.800000', time_format).replace(tzinfo=timezone.utc), 189.823, 'U238'],
[datetime.strptime('2021-05-30 16:54:15.801000', time_format).replace(tzinfo=timezone.utc), 379.524, 'U238'],
[datetime.strptime('2021-05-30 16:57:15.802000', time_format).replace(tzinfo=timezone.utc), 2225.4952, 'U238'],
[datetime.strptime('2021-05-30 17:00:15.803000', time_format).replace(tzinfo=timezone.utc), 1049.0903, 'U235'],
[datetime.strptime('2021-05-30 17:03:15.804000', time_format).replace(tzinfo=timezone.utc), 41905.63447, 'U238'],
[datetime.strptime('2021-05-30 17:06:15.805000', time_format).replace(tzinfo=timezone.utc), 4987.6764, 'U235'],
[datetime.strptime('2021-05-30 17:09:15.806000', time_format).replace(tzinfo=timezone.utc), 67657.11975, 'U235'],
[datetime.strptime('2021-05-30 17:12:15.807000', time_format).replace(tzinfo=timezone.utc), 56173.06327, 'U235'],
[datetime.strptime('2021-05-30 17:15:15.808000', time_format).replace(tzinfo=timezone.utc), 14249.67394, 'U238'],
[datetime.strptime('2021-05-30 17:18:15.809000', time_format).replace(tzinfo=timezone.utc), 656.831, 'U238'],
[datetime.strptime('2021-05-30 17:21:15.810000', time_format).replace(tzinfo=timezone.utc), 5768.4822, 'U235'],
[datetime.strptime('2021-05-30 17:24:15.811000', time_format).replace(tzinfo=timezone.utc), 929.028, 'U235'],
[datetime.strptime('2021-05-30 17:27:15.812000', time_format).replace(tzinfo=timezone.utc), 2585.9646, 'U238'],
[datetime.strptime('2021-05-30 17:30:15.813000', time_format).replace(tzinfo=timezone.utc), 358.918, 'U238']]

df = pd.DataFrame(data, columns=columns)

controller = build_flow([
DataframeSource(df, time_field="sample_time", key_field="isotope"),
AggregateByKey([FieldAggregator("samples", "signal", ["count"],
FixedWindows(['10m', '15m']))], Table("U235_test", NoopDriver())),
Reduce([], lambda acc, x: append_return(acc, x)),
]).run()
termination_result = controller.await_termination()

expected = [
{'samples_count_10m': 1.0, 'samples_count_15m': 1.0,
'sample_time': pd.Timestamp('2021-05-30 16:42:15.797000+0000', tz='UTC'), 'signal': 790.235, 'isotope': 'U235'},
{'samples_count_10m': 2.0, 'samples_count_15m': 1.0,
'sample_time': pd.Timestamp('2021-05-30 16:45:15.798000+0000', tz='UTC'), 'signal': 498.491, 'isotope': 'U235'},
{'samples_count_10m': 1.0, 'samples_count_15m': 1.0,
'sample_time': pd.Timestamp('2021-05-30 16:48:15.799000+0000', tz='UTC'), 'signal': 34650.00343,'isotope': 'U238'},
{'samples_count_10m': 1.0, 'samples_count_15m': 2.0,
'sample_time': pd.Timestamp('2021-05-30 16:51:15.800000+0000', tz='UTC'), 'signal': 189.823, 'isotope': 'U238'},
{'samples_count_10m': 2.0, 'samples_count_15m': 3.0,
'sample_time': pd.Timestamp('2021-05-30 16:54:15.801000+0000', tz='UTC'), 'signal': 379.524, 'isotope': 'U238'},
{'samples_count_10m': 3.0, 'samples_count_15m': 4.0,
'sample_time': pd.Timestamp('2021-05-30 16:57:15.802000+0000', tz='UTC'), 'signal': 2225.4952, 'isotope': 'U238'},
{'samples_count_10m': 1.0, 'samples_count_15m': 1.0,
'sample_time': pd.Timestamp('2021-05-30 17:00:15.803000+0000', tz='UTC'), 'signal': 1049.0903, 'isotope': 'U235'},
{'samples_count_10m': 1.0, 'samples_count_15m': 1.0,
'sample_time': pd.Timestamp('2021-05-30 17:03:15.804000+0000', tz='UTC'), 'signal': 41905.63447, 'isotope': 'U238'},
{'samples_count_10m': 2.0, 'samples_count_15m': 2.0,
'sample_time': pd.Timestamp('2021-05-30 17:06:15.805000+0000', tz='UTC'), 'signal': 4987.6764, 'isotope': 'U235'},
{'samples_count_10m': 3.0, 'samples_count_15m': 3.0,
'sample_time': pd.Timestamp('2021-05-30 17:09:15.806000+0000', tz='UTC'), 'signal': 67657.11975, 'isotope': 'U235'},
{'samples_count_10m': 1.0, 'samples_count_15m': 4.0,
'sample_time': pd.Timestamp('2021-05-30 17:12:15.807000+0000', tz='UTC'), 'signal': 56173.06327, 'isotope': 'U235'},
{'samples_count_10m': 1.0, 'samples_count_15m': 1.0,
'sample_time': pd.Timestamp('2021-05-30 17:15:15.808000+0000', tz='UTC'), 'signal': 14249.67394, 'isotope': 'U238'},
{'samples_count_10m': 2.0, 'samples_count_15m': 2.0,
'sample_time': pd.Timestamp('2021-05-30 17:18:15.809000+0000', tz='UTC'), 'signal': 656.831, 'isotope': 'U238'},
{'samples_count_10m': 1.0, 'samples_count_15m': 1.0,
'sample_time': pd.Timestamp('2021-05-30 17:21:15.810000+0000', tz='UTC'), 'signal': 5768.4822, 'isotope': 'U235'},
{'samples_count_10m': 2.0, 'samples_count_15m': 2.0,
'sample_time': pd.Timestamp('2021-05-30 17:24:15.811000+0000', tz='UTC'), 'signal': 929.028, 'isotope': 'U235'},
{'samples_count_10m': 1.0, 'samples_count_15m': 3.0,
'sample_time': pd.Timestamp('2021-05-30 17:27:15.812000+0000', tz='UTC'), 'signal': 2585.9646, 'isotope': 'U238'},
{'samples_count_10m': 1.0, 'samples_count_15m': 1.0,
'sample_time': pd.Timestamp('2021-05-30 17:30:15.813000+0000', tz='UTC'), 'signal': 358.918, 'isotope': 'U238'}]

assert termination_result == expected, \
f'actual did not match expected. \n actual: {termination_result} \n expected: {expected}'


def test_sliding_window_aggregation_with_uncommon_windows_flow():
time_format = '%Y-%m-%d %H:%M:%S.%f'
columns = ['sample_time', 'signal', 'isotope']
Expand Down

0 comments on commit 50c2254

Please sign in to comment.