Skip to content

Commit

Permalink
ML-313: in case this is pandas timestamp,set timezone manually (other… (
Browse files Browse the repository at this point in the history
#196)

* ML-313: in case this is pandas timestamp,set timezone manually (otherwise timestamp() gives wrong result)

* pr comments

* fix
  • Loading branch information
katyakats authored Apr 6, 2021
1 parent f7694ac commit be192b8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
2 changes: 1 addition & 1 deletion integration/test_aggregation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ def test_aggregate_multiple_keys(setup_teardown_test):
keys = ['first_name', 'last_name']
table = Table(setup_teardown_test, V3ioDriver())
controller = build_flow([
DataframeSource(data, key_field=keys),
DataframeSource(data, key_field=keys, time_field='time'),
AggregateByKey([FieldAggregator("number_of_stuff", "some_data", ["sum"],
SlidingWindows(['1h'], '10m'))],
table),
Expand Down
19 changes: 13 additions & 6 deletions storey/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import re
from typing import Optional, Union, Callable, List, Dict
import pandas as pd

from .dtypes import EmitEveryEvent, FixedWindows, SlidingWindows, EmitAfterPeriod, EmitAfterWindow, EmitAfterMaxEvent, \
_dict_to_emit_policy, FieldAggregator, EmitPolicy
Expand Down Expand Up @@ -114,6 +115,16 @@ def _parse_aggregates(aggregates):

raise TypeError('aggregates should be a list of FieldAggregator/dictionaries')

def _get_timestamp(self, event):
event_timestamp = event.time
if isinstance(event_timestamp, datetime):
if isinstance(event_timestamp, pd.Timestamp) and event_timestamp.tzinfo is None:
# timestamp for pandas timestamp gives the wrong result in case there is no timezone (ML-313)
local_time_zone = datetime.now().astimezone().tzinfo
event_timestamp = event_timestamp.replace(tzinfo=local_time_zone)
event_timestamp = event_timestamp.timestamp() * 1000
return event_timestamp

async def _do(self, event):
if event == _termination_obj:
self._terminate_worker = True
Expand All @@ -131,9 +142,7 @@ async def _do(self, event):
if self.key_extractor:
key = self.key_extractor(element)

event_timestamp = event.time
if isinstance(event_timestamp, datetime):
event_timestamp = event_timestamp.timestamp() * 1000
event_timestamp = self._get_timestamp(event)

await self._table._lazy_load_key_with_aggregates(key, event_timestamp)
await self._table._aggregate(key, element, event_timestamp)
Expand Down Expand Up @@ -170,9 +179,7 @@ async def _sleep_and_emit(self):

# Emit a single event for the requested key
async def _emit_event(self, key, event):
event_timestamp = event.time
if isinstance(event_timestamp, datetime):
event_timestamp = event_timestamp.timestamp() * 1000
event_timestamp = self._get_timestamp(event)

await self._table._lazy_load_key_with_aggregates(key, event_timestamp)
features = await self._table._get_features(key, event_timestamp)
Expand Down

0 comments on commit be192b8

Please sign in to comment.