Skip to content

Commit

Permalink
ML-673: removing adding of "igz_part" to partitions (#246)
Browse files Browse the repository at this point in the history
* ML-673: removing adding of "igz_part" to partitions

* minor fix
  • Loading branch information
katyakats authored Jun 21, 2021
1 parent 8a782b4 commit 76236a6
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 19 deletions.
4 changes: 2 additions & 2 deletions integration/test_filesystems_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def test_filter_before_after_partitioned_inner_other_partition(setup_teardown_te
after = pd.Timestamp('2019-07-01 00:00:00')

controller = build_flow([
ParquetSource(out_file, end_filter=before, start_filter=after, filter_column='my_time'),
ParquetSource(out_file, end_filter=before, start_filter=after, filter_column='my_time', columns=columns),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
Expand Down Expand Up @@ -465,7 +465,7 @@ def test_filter_before_after_partitioned_outer_other_partition(setup_teardown_te
after = pd.Timestamp('2020-12-30 08:53:00')

controller = build_flow([
ParquetSource(out_file, end_filter=before, start_filter=after, filter_column='my_time'),
ParquetSource(out_file, end_filter=before, start_filter=after, filter_column='my_time', columns=columns),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
Expand Down
3 changes: 1 addition & 2 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from .dtypes import _termination_obj, Event, legal_time_units
from .flow import Flow, Complete
from .utils import url_to_file_system, drop_reserved_columns, find_filters
from .utils import url_to_file_system, find_filters


class AwaitableResult:
Expand Down Expand Up @@ -795,5 +795,4 @@ def _init(self):
df = self._read_filtered_parquet(path)
else:
df = pandas.read_parquet(path, columns=self._columns, storage_options=self._storage_options)
drop_reserved_columns(df)
self._dfs.append(df)
6 changes: 1 addition & 5 deletions storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,14 @@ def _path_from_event(self, event):
else:
val = event.body[col]

is_meta = False
if col.startswith('$'):
col = col[1:]
is_meta = True

if hash_into:
col = f'igzpart_hash{hash_into}_{col}'
col = f'hash{hash_into}_{col}'
if isinstance(val, list):
val = '.'.join(map(str, val))
val = hash(val) % hash_into
elif is_meta:
col = f'igzpart_{col}'

res += f'{col}={val}/'
return res
Expand Down
8 changes: 0 additions & 8 deletions storey/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,3 @@ def find_filters(partitions_time_attributes, start, end, filters, filter_column)
# for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, this method will append to filters
# [(year=2018, month=2,day<=5, filter_column<5.2.2018 16:24:31)]
_find_filter_helper(partitions_time_attributes, end, "<", "<=", first_uncommon, filters, filter_column)


def drop_reserved_columns(df):
cols_to_drop = []
for col in df.columns:
if col.startswith('igzpart_'):
cols_to_drop.append(col)
df.drop(labels=cols_to_drop, axis=1, inplace=True, errors='ignore')
4 changes: 2 additions & 2 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1708,9 +1708,9 @@ def test_write_to_parquet_partition_by_date(tmpdir):
for i in range(10):
controller.emit([i, f'this is {i}'], event_time=my_time)
expected.append(['2020-02-15', i, f'this is {i}'])
columns = ['igzpart_date', 'my_int', 'my_string']
columns = ['date', 'my_int', 'my_string']
expected = pd.DataFrame(expected, columns=columns, dtype='int64')
expected['igzpart_date'] = expected['igzpart_date'].astype("category")
expected['date'] = expected['date'].astype("category")
controller.terminate()
controller.await_termination()

Expand Down

0 comments on commit 76236a6

Please sign in to comment.