From 76236a63625f67088fe5d6f5bc46e5c5cf4f4866 Mon Sep 17 00:00:00 2001 From: Katya Katsenelenbogen Date: Mon, 21 Jun 2021 17:06:10 +0300 Subject: [PATCH] ML-673: removing adding of "igz_part" to partitions (#246) * ML-673: removing adding of "igz_part" to partitions * minor fix --- integration/test_filesystems_integration.py | 4 ++-- storey/sources.py | 3 +-- storey/targets.py | 6 +----- storey/utils.py | 8 -------- tests/test_flow.py | 4 ++-- 5 files changed, 6 insertions(+), 19 deletions(-) diff --git a/integration/test_filesystems_integration.py b/integration/test_filesystems_integration.py index fa0a251a..2a654a32 100644 --- a/integration/test_filesystems_integration.py +++ b/integration/test_filesystems_integration.py @@ -432,7 +432,7 @@ def test_filter_before_after_partitioned_inner_other_partition(setup_teardown_te after = pd.Timestamp('2019-07-01 00:00:00') controller = build_flow([ - ParquetSource(out_file, end_filter=before, start_filter=after, filter_column='my_time'), + ParquetSource(out_file, end_filter=before, start_filter=after, filter_column='my_time', columns=columns), Reduce([], append_and_return) ]).run() read_back_result = controller.await_termination() @@ -465,7 +465,7 @@ def test_filter_before_after_partitioned_outer_other_partition(setup_teardown_te after = pd.Timestamp('2020-12-30 08:53:00') controller = build_flow([ - ParquetSource(out_file, end_filter=before, start_filter=after, filter_column='my_time'), + ParquetSource(out_file, end_filter=before, start_filter=after, filter_column='my_time', columns=columns), Reduce([], append_and_return) ]).run() read_back_result = controller.await_termination() diff --git a/storey/sources.py b/storey/sources.py index eeb499a8..40e3508d 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -15,7 +15,7 @@ from .dtypes import _termination_obj, Event, legal_time_units from .flow import Flow, Complete -from .utils import url_to_file_system, drop_reserved_columns, find_filters +from .utils import url_to_file_system, find_filters class AwaitableResult: @@ -795,5 +795,4 @@ def _init(self): df = self._read_filtered_parquet(path) else: df = pandas.read_parquet(path, columns=self._columns, storage_options=self._storage_options) - drop_reserved_columns(df) self._dfs.append(df) diff --git a/storey/targets.py b/storey/targets.py index ef4f639e..f22a4610 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -163,18 +163,14 @@ def _path_from_event(self, event): else: val = event.body[col] - is_meta = False if col.startswith('$'): col = col[1:] - is_meta = True if hash_into: - col = f'igzpart_hash{hash_into}_{col}' + col = f'hash{hash_into}_{col}' if isinstance(val, list): val = '.'.join(map(str, val)) val = hash(val) % hash_into - elif is_meta: - col = f'igzpart_{col}' res += f'{col}={val}/' return res diff --git a/storey/utils.py b/storey/utils.py index 06f13127..cffc111b 100644 --- a/storey/utils.py +++ b/storey/utils.py @@ -255,11 +255,3 @@ def find_filters(partitions_time_attributes, start, end, filters, filter_column) # for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, this method will append to filters # [(year=2018, month=2,day<=5, filter_column<5.2.2018 16:24:31)] _find_filter_helper(partitions_time_attributes, end, "<", "<=", first_uncommon, filters, filter_column) - - -def drop_reserved_columns(df): - cols_to_drop = [] - for col in df.columns: - if col.startswith('igzpart_'): - cols_to_drop.append(col) - df.drop(labels=cols_to_drop, axis=1, inplace=True, errors='ignore') diff --git a/tests/test_flow.py b/tests/test_flow.py index ef103f1b..e1e7c77d 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -1708,9 +1708,9 @@ def test_write_to_parquet_partition_by_date(tmpdir): for i in range(10): controller.emit([i, f'this is {i}'], event_time=my_time) expected.append(['2020-02-15', i, f'this is {i}']) - columns = ['igzpart_date', 'my_int', 'my_string'] + columns = ['date', 'my_int', 'my_string'] expected = pd.DataFrame(expected, columns=columns, dtype='int64') - expected['igzpart_date'] = expected['igzpart_date'].astype("category") + expected['date'] = expected['date'].astype("category") controller.terminate() controller.await_termination()