diff --git a/integration/test_flow_integration.py b/integration/test_flow_integration.py index 5083b4e0..9beec3bc 100644 --- a/integration/test_flow_integration.py +++ b/integration/test_flow_integration.py @@ -462,3 +462,37 @@ def test_write_multiple_keys_to_v3io_from_csv(setup_teardown_test): assert response.status_code == 200 assert expected == response.output.item + +def test_write_none_time(setup_teardown_test): + + table = Table(setup_teardown_test, V3ioDriver()) + data = pd.DataFrame( + { + "first_name": ["moshe", "yosi"], + "color": ['blue', 'yellow'], + "time": [test_base_time, None] + } + ) + + def set_moshe_time_to_none(data): + if data['first_name'] == 'moshe': + data['time'] = pd.NaT + return data + + controller = build_flow([ + DataframeSource(data, key_field='first_name'), + WriteToTable(table), + Map(set_moshe_time_to_none), + WriteToTable(table), + ]).run() + controller.await_termination() + + response = asyncio.run(get_kv_item(setup_teardown_test, 'yosi')) + expected = {'first_name': 'yosi', 'color': 'yellow'} + assert response.status_code == 200 + assert expected == response.output.item + + response = asyncio.run(get_kv_item(setup_teardown_test, 'moshe')) + expected = {'first_name': 'moshe', 'color': 'blue'} + assert response.status_code == 200 + assert expected == response.output.item diff --git a/storey/drivers.py b/storey/drivers.py index 5f83f8f2..b1a9591f 100644 --- a/storey/drivers.py +++ b/storey/drivers.py @@ -3,6 +3,7 @@ import os from datetime import datetime, timedelta from typing import Optional +import pandas as pd import v3io import v3io.aio.dataplane @@ -223,7 +224,11 @@ def _build_feature_store_update_expression(self, aggregation_element, additional for name, value in additional_data.items(): if name.casefold() in self.saved_engine_words.keys(): name = f'`{name}`' - expressions.append(f'{name}={self._convert_python_obj_to_expression_value(value)}') + expression_value = self._convert_python_obj_to_expression_value(value) + if expression_value: + expressions.append(f'{name}={self._convert_python_obj_to_expression_value(value)}') + else: + expressions.append(f'REMOVE {name}') update_expression = ';'.join(expressions) return update_expression, condition_expression, pending_updates @@ -352,6 +357,8 @@ def _convert_python_obj_to_expression_value(value): elif isinstance(value, bytes): return f"blob('{base64.b64encode(value).decode('ascii')}')" elif isinstance(value, datetime): + if pd.isnull(value): + return None timestamp = value.timestamp() secs = int(timestamp)