Skip to content

Commit

Permalink
ML-1530: changing aggregation_time_attribute_prefix from t_ to _ (#348)
Browse files Browse the repository at this point in the history
* ML-1530: changing aggregation_time_attribute_prefix from t_ to _

* minor fixes
  • Loading branch information
katyakats authored Apr 4, 2022
1 parent 7713f7b commit 28397c2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
46 changes: 45 additions & 1 deletion integration/test_aggregation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd

from storey import build_flow, SyncEmitSource, Reduce, Table, V3ioDriver, MapWithState, AggregateByKey, FieldAggregator, \
QueryByKey, NoSqlTarget, Context, DataframeSource, Map
QueryByKey, NoSqlTarget, Context, DataframeSource, Map, CSVSource

from storey.dtypes import SlidingWindows, FixedWindows, EmitAfterMaxEvent, FixedWindowType
from storey.utils import _split_path
Expand Down Expand Up @@ -1645,6 +1645,50 @@ def test_multiple_keys_int(setup_teardown_test):
f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}'


def test_column_begin_t(setup_teardown_test):
t0 = pd.Timestamp(test_base_time)
data = pd.DataFrame(
{
'key_column': ['a', 'b'],
'some_data': [1, 2],
't_col': ['storey', 'rules'],
'time': [t0 - pd.Timedelta(minutes=25), t0 - pd.Timedelta(minutes=30)]
}
)

keys = ['key_column']
table = Table(setup_teardown_test, V3ioDriver())
controller = build_flow([
DataframeSource(data, key_field=keys, time_field='time'),
AggregateByKey([FieldAggregator('number_of_stuff', 'some_data', ['sum'],
SlidingWindows(['1h'], '10m'))],
table, emit_policy=EmitAfterMaxEvent(1)),
NoSqlTarget(table),
]).run()

controller.await_termination()

other_table = Table(setup_teardown_test, V3ioDriver())
controller = build_flow([
SyncEmitSource(),
QueryByKey(['number_of_stuff_sum_1h', 't_col'],
other_table, key=keys),
Reduce([], lambda acc, x: append_return(acc, x)),
]).run()

controller.emit({'key_column': 'a'},
key=['a'], event_time=test_base_time)

controller.terminate()
actual = controller.await_termination()
expected_results = [
{'number_of_stuff_sum_1h': 1.0, 'key_column': 'a', 't_col': 'storey'}
]

assert actual == expected_results, \
f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}'


def test_aggregate_float_key(setup_teardown_test):
t0 = pd.Timestamp(test_base_time)
data = pd.DataFrame(
Expand Down
2 changes: 1 addition & 1 deletion storey/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self, webapi: Optional[str] = None, access_key: Optional[str] = Non
self._closed = True

self._aggregation_attribute_prefix = 'aggr_'
self._aggregation_time_attribute_prefix = 't_'
self._aggregation_time_attribute_prefix = '_'
self._error_code_string = "ErrorCode"
self._false_condition_error_code = "16777244"
self._mtime_header_name = 'X-v3io-transaction-verifier'
Expand Down

0 comments on commit 28397c2

Please sign in to comment.