From 22f75d5c59f3b4e809a424c6c85c7e7d8153f336 Mon Sep 17 00:00:00 2001 From: Amir Mofakhar Date: Thu, 2 Feb 2023 12:02:36 +0100 Subject: [PATCH] [AP-832] Defined partial sync (#1054) * implementing defined partial sync --- docs/connectors/taps/mysql.rst | 7 + docs/connectors/taps/postgres.rst | 7 + docs/user_guide/resync.rst | 6 + pipelinewise/cli/commands.py | 8 +- pipelinewise/cli/config.py | 3 + pipelinewise/cli/multiprocess.py | 30 ++ pipelinewise/cli/pipelinewise.py | 327 +++++++++++------- .../partialsync/mysql_to_snowflake.py | 61 +++- .../partialsync/postgres_to_snowflake.py | 60 +++- pipelinewise/fastsync/partialsync/utils.py | 117 +++++-- tests/end_to_end/helpers/assertions.py | 8 + tests/end_to_end/helpers/env.py | 23 +- ...test_defined_partial_sync_mariadb_to_sf.py | 101 ++++++ .../target_snowflake/tap_postgres/__init__.py | 1 + .../test_defined_partial_sync_pg_to_sf.py | 98 ++++++ ...ql_to_sf_defined_partial_sync.yml.template | 56 +++ ...es_to_sf_defined_partial_sync.yml.template | 59 ++++ .../target_one/tap_one/selection.json | 6 +- tests/units/cli/test_cli.py | 153 +++++--- .../partialsync/test_mysql_to_snowflake.py | 194 +++++++---- .../partialsync/test_partial_sync_utils.py | 54 ++- .../partialsync/test_postgres_to_snowflake.py | 199 +++++++---- tests/units/partialsync/utils.py | 35 +- 23 files changed, 1218 insertions(+), 395 deletions(-) create mode 100644 pipelinewise/cli/multiprocess.py create mode 100644 tests/end_to_end/target_snowflake/tap_mariadb/test_defined_partial_sync_mariadb_to_sf.py create mode 100644 tests/end_to_end/target_snowflake/tap_postgres/test_defined_partial_sync_pg_to_sf.py create mode 100644 tests/end_to_end/test-project/tap_mysql_to_sf_defined_partial_sync.yml.template create mode 100644 tests/end_to_end/test-project/tap_postgres_to_sf_defined_partial_sync.yml.template diff --git a/docs/connectors/taps/mysql.rst b/docs/connectors/taps/mysql.rst index f486449f4..4e0e610c9 100644 --- a/docs/connectors/taps/mysql.rst +++ b/docs/connectors/taps/mysql.rst @@ -184,6 +184,13 @@ Example YAML for ``tap-mysql``: - table_name: "table_two" replication_method: "LOG_BASED" # Important! Log based must be enabled in MySQL + - table_name: "table_three" + replication_method: "LOG_BASED" + sync_start_from: # Optional, applies for then first sync and fast sync + column: "column_name" # column name to be picked for partial sync with incremental or timestamp value + value: "start_value" # The first sync always starts from column >= value + drop_target_table: true # Optional, drops target table before syncing. default value is false + # You can add as many schemas as you need... # Uncomment this if you want replicate tables from multiple schemas #- source_schema: "another_schema_in_mysql" diff --git a/docs/connectors/taps/postgres.rst b/docs/connectors/taps/postgres.rst index f36d2306c..ec9657f9a 100644 --- a/docs/connectors/taps/postgres.rst +++ b/docs/connectors/taps/postgres.rst @@ -198,6 +198,13 @@ Example YAML for ``tap-postgres``: - table_name: "table_two" replication_method: "LOG_BASED" # Important! Log based must be enabled in MySQL + - table_name: "table_three" + replication_method: "LOG_BASED" + sync_start_from: # Optional, applies for then first sync and fast sync + column: "column_name" # column name to be picked for partial sync with inremental or timestamp value + value: "start_value" # The first sync always starts from column >= value + drop_target_table: true # Optional, drops target table before syncing. default value is false + # You can add as many schemas as you need... # Uncomment this if you want replicate tables from multiple schemas #- source_schema: "another_schema_in_postgres" diff --git a/docs/user_guide/resync.rst b/docs/user_guide/resync.rst index b901f8a14..3f0484138 100644 --- a/docs/user_guide/resync.rst +++ b/docs/user_guide/resync.rst @@ -34,6 +34,12 @@ add the ``--tables`` argument: list of table names using the ``.`` format. Schema and table names have to be the names in the source database. +.. warning:: + + Based on the tap setting, tables can be fully synced or partial synced if they are defined + as partial synced. + Currently this option is available only for :ref:`tap-mysql` and :ref:`tap-postgres` to Snowflake. + 2. **Partial resync** If you want to partial resync a table from a specific tap then use the ``partial_sync_table`` command diff --git a/pipelinewise/cli/commands.py b/pipelinewise/cli/commands.py index 28d35b231..f43d99965 100644 --- a/pipelinewise/cli/commands.py +++ b/pipelinewise/cli/commands.py @@ -355,7 +355,7 @@ def build_singer_command( # pylint: disable=too-many-arguments -def build_fastsync_partial_command( +def build_partialsync_command( tap: TapParams, target: TargetParams, transform: TransformParams, @@ -364,7 +364,8 @@ def build_fastsync_partial_command( table: str, column: str, start_value: str, - end_value: str = None + end_value: str = None, + drop_target_table: str = None ): """Builds a command that starts a partial sync""" @@ -387,7 +388,8 @@ def build_fastsync_partial_command( f'--table "{table}"', f'--column "{column}"', f'--start_value "{start_value}"', - f'--end_value "{end_value}"' if end_value else None + f'--end_value "{end_value}"' if end_value else None, + f'--drop_target_table {drop_target_table}' if drop_target_table else None ], ) ) diff --git a/pipelinewise/cli/config.py b/pipelinewise/cli/config.py index 3e264b509..dbbae93dc 100644 --- a/pipelinewise/cli/config.py +++ b/pipelinewise/cli/config.py @@ -327,6 +327,7 @@ def generate_selection(cls, tap: Dict) -> List[Dict]: schema_name = schema.get('source_schema') for table in schema.get('tables', []): table_name = table.get('table_name') + sync_start_from = table.get('sync_start_from') replication_method = table.get( 'replication_method', utils.get_tap_default_replication_method(tap) ) @@ -340,6 +341,8 @@ def generate_selection(cls, tap: Dict) -> List[Dict]: # Add replication_key only if replication_method is INCREMENTAL 'replication_key': table.get('replication_key') if replication_method == 'INCREMENTAL' else None, + 'sync_start_from': sync_start_from + if sync_start_from else None } ) ) diff --git a/pipelinewise/cli/multiprocess.py b/pipelinewise/cli/multiprocess.py new file mode 100644 index 000000000..88b674d96 --- /dev/null +++ b/pipelinewise/cli/multiprocess.py @@ -0,0 +1,30 @@ +import multiprocessing +import traceback + + +class Process(multiprocessing.Process): + """ + This is an extension of Process to let catching raised exceptions inside the + process. + """ + def __init__(self, *args, **kwargs): + multiprocessing.Process.__init__(self, *args, **kwargs) + self._pconn, self._cconn = multiprocessing.Pipe() + self._exception = None + + def run(self): + try: + multiprocessing.Process.run(self) + self._cconn.send(None) + except Exception as exp: + t_b = traceback.format_exc() + self._cconn.send((exp, t_b)) + + @property + def exception(self): + """ + Returning exception of the process + """ + if self._pconn.poll(): + self._exception = self._pconn.recv() + return self._exception diff --git a/pipelinewise/cli/pipelinewise.py b/pipelinewise/cli/pipelinewise.py index 86f5e6749..1323d25be 100644 --- a/pipelinewise/cli/pipelinewise.py +++ b/pipelinewise/cli/pipelinewise.py @@ -29,7 +29,9 @@ InvalidConfigException, PartialSyncNotSupportedTypeException, PreRunChecksException ) + from pipelinewise.fastsync.commons.tap_postgres import FastSyncTapPostgres +from pipelinewise.cli.multiprocess import Process FASTSYNC_PAIRS = { ConnectorType.TAP_MYSQL: { @@ -1023,19 +1025,6 @@ def run_tap_singer( profiling_dir=self.profiling_dir, ) - # Do not run if another instance is already running - log_dir = os.path.dirname(self.tap_run_log_file) - if ( - os.path.isdir(log_dir) - and len(utils.search_files(log_dir, patterns=['*.log.running'])) > 0 - ): - self.logger.info( - 'Failed to run. Another instance of the same tap is already running. ' - 'Log file detected in running status at %s', - log_dir, - ) - sys.exit(1) - start = None state = None @@ -1084,7 +1073,7 @@ def run_tap_partialsync(self, tap: TapParams, target: TargetParams, transform: T """Running the tap for partial sync table""" # Build the partial sync executable command - command = commands.build_fastsync_partial_command( + command = commands.build_partialsync_command( tap=tap, target=target, transform=transform, @@ -1093,7 +1082,8 @@ def run_tap_partialsync(self, tap: TapParams, target: TargetParams, transform: T table=self.args.table, column=self.args.column, start_value=self.args.start_value, - end_value=self.args.end_value + end_value=self.args.end_value, + drop_target_table=self.args.drop_target_table ) def add_partialsync_output_to_main_logger(line: str) -> str: @@ -1128,19 +1118,6 @@ def run_tap_fastsync( drop_pg_slot=self.drop_pg_slot, ) - # Do not run if another instance is already running - log_dir = os.path.dirname(self.tap_run_log_file) - if ( - os.path.isdir(log_dir) - and len(utils.search_files(log_dir, patterns=['*.log.running'])) > 0 - ): - self.logger.info( - 'Failed to run. Another instance of the same tap is already running. ' - 'Log file detected in running status at %s', - log_dir, - ) - sys.exit(1) - # Fastsync is running in subprocess. # Collect the formatted logs and log it in the main PipelineWise process as well # Logs are already formatted at this stage so not using logging functions to avoid double formatting. @@ -1185,6 +1162,8 @@ def run_tap(self): 'stream_buffer_size', commands.DEFAULT_STREAM_BUFFER_SIZE ) + not_partial_syned_tables = set() + self.logger.info('Running %s tap in %s target', tap_id, target_id) # Run only if tap enabled @@ -1234,6 +1213,7 @@ def run_tap(self): start_time = datetime.now() try: with pidfile.PIDFile(self.tap['files']['pidfile']): + target_params = TargetParams( target_id=target_id, type=target_type, @@ -1253,24 +1233,20 @@ def run_tap(self): # Run fastsync for FULL_TABLE replication method if len(fastsync_stream_ids) > 0: self.logger.info( - 'Table(s) selected to sync by fastsync: %s', fastsync_stream_ids - ) - self.tap_run_log_file = os.path.join( - log_dir, f'{target_id}-{tap_id}-{current_time}.fastsync.log' - ) - tap_params = TapParams( - tap_id=tap_id, - type=tap_type, - bin=self.tap_bin, - python_bin=self.tap_python_bin, - config=tap_config, - properties=tap_properties_fastsync, - state=tap_state, + 'Table(s) selected to sync by fastsync/partialsync: %s', fastsync_stream_ids ) + self.do_sync_tables() + + # Finding out which partial syn tables are not synced yet for not running singer for them + try: + with open(tap_state, 'r', encoding='utf8') as state_file: + state_dict = json.load(state_file) + except Exception: + state_dict = {} + stored_bookmarks = state_dict.get('bookmarks', {}) + stored_bookmarks_keys = set(stored_bookmarks.keys()) + not_partial_syned_tables = set(singer_stream_ids).difference(stored_bookmarks_keys) - self.run_tap_fastsync( - tap=tap_params, target=target_params, transform=transform_params - ) else: self.logger.info( 'No table available that needs to be sync by fastsync' @@ -1294,6 +1270,8 @@ def run_tap(self): state=tap_state, ) + self._remove_not_partial_synced_tables_from_properties(tap_params, not_partial_syned_tables) + self.run_tap_singer( tap=tap_params, target=target_params, @@ -1342,7 +1320,7 @@ def stop_tap(self, sig=None, frame=None): # Terminate all the processes in the current process' process group. for child in parent.children(recursive=True): - if os.getpgid(child.pid) == pgid: + if os.getpgid(child.pid) == pgid and child.status == 'running': self.logger.info('Sending SIGTERM to child pid %s...', child.pid) child.terminate() try: @@ -1365,7 +1343,10 @@ def stop_tap(self, sig=None, frame=None): sys.exit(1) # Remove pidfile. - os.remove(pidfile_path) + try: + os.remove(pidfile_path) + except Exception: + pass # Rename log files from running to terminated status if self.tap_run_log_file: @@ -1379,6 +1360,45 @@ def stop_tap(self, sig=None, frame=None): # pylint: disable=too-many-locals def sync_tables(self): + """ + This method calls do_sync_tables if sync_tables command is chosen + """ + try: + with pidfile.PIDFile(self.tap['files']['pidfile']): + self.do_sync_tables() + except pidfile.AlreadyRunningError as exc: + self.logger.error('Another instance of the tap is already running.') + raise SystemExit(1) from exc + + def do_sync_tables(self): + """ + syncing tables by using fast sync + """ + + selected_tables = self._get_sync_tables_setting_from_selection_file(self.args.tables) + processes_list = [] + if selected_tables['partial_sync']: + self._reset_state_file_for_partial_sync(selected_tables) + partial_sync_process = Process( + target=self.sync_tables_partial_sync, args=(selected_tables['partial_sync'],)) + partial_sync_process.start() + processes_list.append(partial_sync_process) + + if selected_tables['full_sync']: + fast_sync_process = Process( + target=self.sync_tables_fast_sync, args=(selected_tables['full_sync'],)) + fast_sync_process.start() + processes_list.append(fast_sync_process) + + for process in processes_list: + process.join() + if process.exception: + error, _ = process.exception + raise Exception(error) + if process.exitcode != 0: + raise SystemExit(process.exitcode) + + def sync_tables_fast_sync(self, selected_tables): """ Sync every or a list of selected tables from a specific tap. It performs an initial sync and resets the table bookmarks to their new location. @@ -1387,6 +1407,7 @@ def sync_tables(self): available for taps and targets where the native and optimised fastsync component is implemented. """ + self.args.tables = ','.join(f'"{x}"' for x in selected_tables) tap_id = self.tap['id'] tap_type = self.tap['type'] target_id = self.target['id'] @@ -1432,50 +1453,45 @@ def sync_tables(self): current_time = datetime.utcnow().strftime('%Y%m%d_%H%M%S') # sync_tables command always using fastsync - with pidfile.PIDFile(self.tap['files']['pidfile']): - self.tap_run_log_file = os.path.join( - log_dir, f'{target_id}-{tap_id}-{current_time}.fastsync.log' - ) + self.tap_run_log_file = os.path.join( + log_dir, f'{target_id}-{tap_id}-{current_time}.fastsync.log' + ) - # Create parameters as NamedTuples - tap_params = TapParams( - tap_id=tap_id, - type=tap_type, - bin=self.tap_bin, - python_bin=self.tap_python_bin, - config=tap_config, - properties=tap_properties, - state=tap_state, - ) + # Create parameters as NamedTuples + tap_params = TapParams( + tap_id=tap_id, + type=tap_type, + bin=self.tap_bin, + python_bin=self.tap_python_bin, + config=tap_config, + properties=tap_properties, + state=tap_state, + ) - target_params = TargetParams( - target_id=target_id, - type=target_type, - bin=self.target_bin, - python_bin=self.target_python_bin, - config=cons_target_config, - ) + target_params = TargetParams( + target_id=target_id, + type=target_type, + bin=self.target_bin, + python_bin=self.target_python_bin, + config=cons_target_config, + ) - transform_params = TransformParams( - bin=self.transform_field_bin, - config=tap_transformation, - python_bin=self.transform_field_python_bin, - tap_id=tap_id, - target_id=target_id, - ) + transform_params = TransformParams( + bin=self.transform_field_bin, + config=tap_transformation, + python_bin=self.transform_field_python_bin, + tap_id=tap_id, + target_id=target_id, + ) - self.run_tap_fastsync( - tap=tap_params, target=target_params, transform=transform_params - ) + self.run_tap_fastsync( + tap=tap_params, target=target_params, transform=transform_params + ) - except pidfile.AlreadyRunningError: - self.logger.error('Another instance of the tap is already running.') - sys.exit(1) - # Delete temp file if there is any except commands.RunCommandException as exc: self.logger.exception(exc) self.send_alert(message=f'Failed to sync tables in {tap_id} tap', exc=exc) - sys.exit(1) + raise SystemExit(1) from exc except PreRunChecksException as exc: raise exc except Exception as exc: @@ -1680,7 +1696,18 @@ def encrypt_string(self): def partial_sync_table(self): """ - Partial Sync Table + This method calls partial sync if partial_sync_table command is chosen + """ + try: + with pidfile.PIDFile(self.tap['files']['pidfile']): + self.sync_tables_partial_sync() + except pidfile.AlreadyRunningError as exc: + self.logger.error('Another instance of the tap is already running.') + raise SystemExit(1) from exc + + def sync_tables_partial_sync(self, defined_tables=None): + """ + Partial Sync Tables """ cons_target_config = None @@ -1707,9 +1734,25 @@ def partial_sync_table(self): self._check_if_complete_tap_configuration(sync_bin, tap_type, target_type) - self._validate_selected_table_and_column() - - self._check_if_state_exists() + if self.args.table != '*': + self._validate_selected_table_and_column() + self._check_if_state_exists() + self.args.drop_target_table = None + else: + table_names = [] + table_columns = [] + table_values = [] + table_drop_targets = [] + for table, sync_settings in defined_tables.items(): + table_names.append(table) + table_columns.append(sync_settings['column']) + table_values.append(str(sync_settings['value'])) + table_drop_targets.append(sync_settings.get('drop_target_table')) + + self.args.table = ','.join(table_names) + self.args.column = ','.join(table_columns) + self.args.start_value = ','.join(table_values) + self.args.drop_target_table = ','.join(map(str, table_drop_targets)) # Generate and run the command to run the tap directly tap_config = self.tap['files']['config'] @@ -1729,45 +1772,39 @@ def partial_sync_table(self): log_dir = self.get_tap_log_dir(target_id, tap_id) current_time = datetime.utcnow().strftime('%Y%m%d_%H%M%S') - with pidfile.PIDFile(self.tap['files']['pidfile']): - self.tap_run_log_file = os.path.join( - log_dir, f'{target_id}-{tap_id}-{current_time}.partialsync.log' - ) + self.tap_run_log_file = os.path.join( + log_dir, f'{target_id}-{tap_id}-{current_time}.partialsync.log' + ) - # Create parameters as NamedTuples - tap_params = TapParams( - tap_id=tap_id, - type=tap_type, - bin=self.tap_bin, - python_bin=self.tap_python_bin, - config=tap_config, - properties=tap_properties, - state=tap_state, - ) + # Create parameters as NamedTuples + tap_params = TapParams( + tap_id=tap_id, + type=tap_type, + bin=self.tap_bin, + python_bin=self.tap_python_bin, + config=tap_config, + properties=tap_properties, + state=tap_state, + ) - target_params = TargetParams( - target_id=target_id, - type=target_type, - bin=self.target_bin, - python_bin=self.target_python_bin, - config=cons_target_config, - ) + target_params = TargetParams( + target_id=target_id, + type=target_type, + bin=self.target_bin, + python_bin=self.target_python_bin, + config=cons_target_config, + ) - transform_params = TransformParams( - bin=self.transform_field_bin, - config=tap_transformation, - python_bin=self.transform_field_python_bin, - tap_id=tap_id, - target_id=target_id, - ) + transform_params = TransformParams( + bin=self.transform_field_bin, + config=tap_transformation, + python_bin=self.transform_field_python_bin, + tap_id=tap_id, + target_id=target_id, + ) - self.run_tap_partialsync( - tap=tap_params, target=target_params, transform=transform_params, - ) + self.run_tap_partialsync(tap=tap_params, target=target_params, transform=transform_params) - except pidfile.AlreadyRunningError as exc: - self.logger.error('Another instance of the tap is already running.') - raise SystemExit(1) from exc # Delete temp file if there is any except commands.RunCommandException as exc: self.logger.exception(exc) @@ -1780,11 +1817,39 @@ def partial_sync_table(self): raise SystemExit(1) from exp except Exception as exc: self.send_alert(message=f'Failed to sync tables in {tap_id} tap', exc=exc) + self.logger.exception(exc) raise exc finally: if cons_target_config: utils.silentremove(cons_target_config) + @staticmethod + def _remove_not_partial_synced_tables_from_properties(tap_params, not_synced_tables): + """" Remove partial sync table which are not synced yet from properties """ + with open(tap_params.properties, 'r', encoding='utf8') as properties_temp_file: + properties_temp = json.load(properties_temp_file) + streams = properties_temp.get('streams') + filtered_streams = list(filter(lambda d: d['tap_stream_id'] not in not_synced_tables, streams)) + properties_temp['streams'] = filtered_streams + with open(tap_params.properties, 'w', encoding='utf8') as properties_temp_file: + json.dump(properties_temp, properties_temp_file) + + def _reset_state_file_for_partial_sync(self, selected_tables): + tap_state = self.tap['files']['state'] + try: + with open(tap_state, 'r', encoding='utf8') as state_file: + state_content = json.load(state_file) + bookmarks = state_content.get('bookmarks') + except Exception: + bookmarks = None + if bookmarks: + selected_partial_sync_tables = set(selected_tables['partial_sync'].keys()) + selected_partial_sync_tables = {sub.replace('.', '-') for sub in selected_partial_sync_tables} + filtered_bookmarks = dict(filter(lambda k: k[0] not in selected_partial_sync_tables, bookmarks.items())) + state_content['bookmarks'] = filtered_bookmarks + with open(tap_state, 'w', encoding='utf8') as state_file: + json.dump(state_content, state_file) + def _check_supporting_tap_and_target_for_partial_sync(self): tap_type = self.tap['type'] tap_id = self.tap['id'] @@ -1970,8 +2035,6 @@ def _cleanup_tap_state_file(self) -> None: state_file = self.tap['files']['state'] if tables: self._clean_tables_from_bookmarks_in_state_file(state_file, tables) - else: - utils.silentremove(state_file) @staticmethod def _clean_tables_from_bookmarks_in_state_file(state_file_to_clean: str, tables: str) -> None: @@ -1982,7 +2045,7 @@ def _clean_tables_from_bookmarks_in_state_file(state_file_to_clean: str, tables: list_of_tables = tables.split(',') if bookmarks: for table_name in list_of_tables: - bookmarks.pop(table_name, None) + bookmarks.pop(table_name.replace('"', ''), None) state_file.seek(0) json.dump(state_data, state_file) @@ -1993,6 +2056,24 @@ def _clean_tables_from_bookmarks_in_state_file(state_file_to_clean: str, tables: except json.JSONDecodeError: pass + @staticmethod + def _get_fixed_name_of_table(stream_id): + return stream_id.replace('-', '.', 1) + + def _get_sync_tables_setting_from_selection_file(self, tables): + selection = utils.load_json(self.tap['files']['selection']) + selection = selection.get('selection') + all_tables = {'full_sync': [], 'partial_sync': {}} + if selection: + for table in selection: + table_name = self._get_fixed_name_of_table(table['tap_stream_id']) + if tables is None or table_name in tables: + if table.get('sync_start_from'): + all_tables['partial_sync'][table_name] = table['sync_start_from'] + else: + all_tables['full_sync'].append(table_name) + return all_tables + def __check_if_table_is_selected(self, table_in_properties): table_metadata = table_in_properties.get('metadata', []) for metadata in table_metadata: diff --git a/pipelinewise/fastsync/partialsync/mysql_to_snowflake.py b/pipelinewise/fastsync/partialsync/mysql_to_snowflake.py index c39a3b4a7..68f34360e 100644 --- a/pipelinewise/fastsync/partialsync/mysql_to_snowflake.py +++ b/pipelinewise/fastsync/partialsync/mysql_to_snowflake.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 import os +import multiprocessing +from functools import partial from argparse import Namespace from typing import Union @@ -13,25 +15,34 @@ from pipelinewise.fastsync.partialsync import utils from pipelinewise.fastsync.mysql_to_snowflake import REQUIRED_CONFIG_KEYS, tap_type_to_target_type -from pipelinewise.fastsync.partialsync.utils import ( - upload_to_s3, update_state_file, diff_source_target_columns, load_into_snowflake) + LOGGER = Logger().get_logger(__name__) # pylint: disable=too-many-locals -def partial_sync_table(args: Namespace) -> Union[bool, str]: +def partial_sync_table(table: tuple, args: Namespace) -> Union[bool, str]: """Partial sync table for MySQL to Snowflake""" snowflake = FastSyncTargetSnowflake(args.target, args.transform) tap_id = args.target.get('tap_id') try: + table_name = table[0] + start_value = utils.validate_boundary_value(table[1]['start_value']) + end_value = utils.validate_boundary_value(table[1]['end_value']) + + column_name = table[1]['column'] + + drop_target_table = table[1]['drop_target_table'] + args.drop_target_table = drop_target_table + args.table = table_name + mysql = FastSyncTapMySql(args.tap, tap_type_to_target_type) mysql.open_connections() - target_schema = common_utils.get_target_schema(args.target, args.table) - table_dict = common_utils.tablename_to_dict(args.table) + target_schema = common_utils.get_target_schema(args.target, table_name) + table_dict = common_utils.tablename_to_dict(table_name) target_table = table_dict.get('table_name') target_sf = { @@ -41,9 +52,10 @@ def partial_sync_table(args: Namespace) -> Union[bool, str]: 'temp': table_dict.get('temp_table_name') } - snowflake_types = mysql.map_column_types_to_target(args.table) + snowflake_types = mysql.map_column_types_to_target(table_name) # making target table if not exists + snowflake.create_schema(target_schema) snowflake.create_table( target_schema=target_schema, table_name=target_table, @@ -55,14 +67,14 @@ def partial_sync_table(args: Namespace) -> Union[bool, str]: ) source_columns = snowflake_types.get('columns', []) - columns_diff = diff_source_target_columns(target_sf, source_columns=source_columns) + columns_diff = utils.diff_source_target_columns(target_sf, source_columns=source_columns) # Get bookmark - Binlog position or Incremental Key value - bookmark = common_utils.get_bookmark_for_table(args.table, args.properties, mysql) + bookmark = common_utils.get_bookmark_for_table(table_name, args.properties, mysql) - where_clause_sql = f' WHERE {args.column} >= \'{args.start_value}\'' - if args.end_value: - where_clause_sql += f' AND {args.column} <= \'{args.end_value}\'' + where_clause_sql = f' WHERE {column_name} >= \'{start_value}\'' + if end_value: + where_clause_sql += f' AND {column_name} <= \'{end_value}\'' # export data from source file_parts = mysql.export_source_table_data(args, tap_id, where_clause_sql) @@ -75,22 +87,24 @@ def partial_sync_table(args: Namespace) -> Union[bool, str]: primary_keys = snowflake_types.get('primary_key') snowflake.create_schema(target_schema) snowflake.create_table( - target_schema, args.table, source_columns, primary_keys, is_temporary=True + target_schema, table_name, source_columns, primary_keys, is_temporary=True ) mysql.close_connections() size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) - _, s3_key_pattern = upload_to_s3(snowflake, file_parts, args.temp_dir) + _, s3_key_pattern = utils.upload_to_s3(snowflake, file_parts, args.temp_dir) - load_into_snowflake(target_sf, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, where_clause_sql) + utils.load_into_snowflake( + target_sf, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, where_clause_sql) - update_state_file(args, bookmark) + if file_parts: + utils.update_state_file(args, bookmark) return True except Exception as exc: LOGGER.exception(exc) - return f'{args.table}: {exc}' + return f'{table_name}: {exc}' def main_impl(): @@ -99,6 +113,8 @@ def main_impl(): args = utils.parse_args_for_partial_sync(REQUIRED_CONFIG_KEYS) start_time = datetime.now() + pool_size = common_utils.get_pool_size(args.tap) + # Log start info LOGGER.info( ''' @@ -113,7 +129,16 @@ def main_impl(): ''', args.table, args.column, args.start_value, args.end_value ) - sync_excs = partial_sync_table(args=args) + sync_tables = utils.get_sync_tables(args) + + with multiprocessing.Pool(pool_size) as proc: + sync_excs = list( + filter( + lambda x: not isinstance(x, bool), + proc.map(partial(partial_sync_table, args=args), sync_tables.items()) + ) + ) + if isinstance(sync_excs, bool): sync_excs = None @@ -135,7 +160,7 @@ def main_impl(): ''', args.table, args.column, args.start_value, args.end_value, sync_excs, end_time - start_time ) - if sync_excs is not None: + if len(sync_excs) > 0: raise SystemExit(1) diff --git a/pipelinewise/fastsync/partialsync/postgres_to_snowflake.py b/pipelinewise/fastsync/partialsync/postgres_to_snowflake.py index 4199c9894..0f0155b3d 100644 --- a/pipelinewise/fastsync/partialsync/postgres_to_snowflake.py +++ b/pipelinewise/fastsync/partialsync/postgres_to_snowflake.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 import os +import multiprocessing +from functools import partial from datetime import datetime from typing import Union @@ -9,28 +11,37 @@ from pipelinewise.fastsync.commons.tap_postgres import FastSyncTapPostgres from pipelinewise.fastsync.postgres_to_snowflake import REQUIRED_CONFIG_KEYS, tap_type_to_target_type from pipelinewise.fastsync.commons import utils as common_utils -from pipelinewise.fastsync.partialsync.utils import ( - upload_to_s3, update_state_file, parse_args_for_partial_sync, diff_source_target_columns, load_into_snowflake) +from pipelinewise.fastsync.partialsync import utils from pipelinewise.logger import Logger LOGGER = Logger().get_logger(__name__) # pylint: disable=too-many-locals -def partial_sync_table(args: Namespace) -> Union[bool, str]: +def partial_sync_table(table: tuple, args: Namespace) -> Union[bool, str]: """Partial sync table for Postgres to Snowflake""" snowflake = FastSyncTargetSnowflake(args.target, args.transform) tap_id = args.target.get('tap_id') dbname = args.tap.get('dbname') try: + table_name = table[0] + start_value = utils.validate_boundary_value(table[1]['start_value']) + end_value = utils.validate_boundary_value(table[1]['end_value']) + + column_name = table[1]['column'] + + drop_target_table = table[1]['drop_target_table'] + args.drop_target_table = drop_target_table + args.table = table_name + postgres = FastSyncTapPostgres(args.tap, tap_type_to_target_type) # Get bookmark - Binlog position or Incremental Key value postgres.open_connection() # Get column differences - target_schema = common_utils.get_target_schema(args.target, args.table) - table_dict = common_utils.tablename_to_dict(args.table) + target_schema = common_utils.get_target_schema(args.target, table_name) + table_dict = common_utils.tablename_to_dict(table_name) target_table = table_dict.get('table_name') target_sf = { @@ -40,9 +51,10 @@ def partial_sync_table(args: Namespace) -> Union[bool, str]: 'temp': table_dict.get('temp_table_name') } - snowflake_types = postgres.map_column_types_to_target(args.table) + snowflake_types = postgres.map_column_types_to_target(table_name) # making target table if not exists + snowflake.create_schema(target_schema) snowflake.create_table( target_schema=target_schema, table_name=target_table, @@ -54,13 +66,13 @@ def partial_sync_table(args: Namespace) -> Union[bool, str]: ) source_columns = snowflake_types.get('columns', []) - columns_diff = diff_source_target_columns(target_sf, source_columns=source_columns) + columns_diff = utils.diff_source_target_columns(target_sf, source_columns=source_columns) - bookmark = common_utils.get_bookmark_for_table(args.table, args.properties, postgres, dbname=dbname) + bookmark = common_utils.get_bookmark_for_table(table_name, args.properties, postgres, dbname=dbname) - where_clause_sql = f' WHERE {args.column} >= \'{args.start_value}\'' + where_clause_sql = f' WHERE {column_name} >= \'{start_value}\'' if args.end_value: - where_clause_sql += f' AND {args.column} <= \'{args.end_value}\'' + where_clause_sql += f' AND {column_name} <= \'{end_value}\'' file_parts = postgres.export_source_table_data(args, tap_id, where_clause_sql) @@ -73,29 +85,32 @@ def partial_sync_table(args: Namespace) -> Union[bool, str]: primary_keys = snowflake_types.get('primary_key') snowflake.create_schema(target_schema) snowflake.create_table( - target_schema, args.table, source_columns, primary_keys, is_temporary=True + target_schema, table_name, source_columns, primary_keys, is_temporary=True ) postgres.close_connection() size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) - _, s3_key_pattern = upload_to_s3(snowflake, file_parts, args.temp_dir) + _, s3_key_pattern = utils.upload_to_s3(snowflake, file_parts, args.temp_dir) - load_into_snowflake(target_sf, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, where_clause_sql) + utils.load_into_snowflake( + target_sf, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, where_clause_sql) - update_state_file(args, bookmark) + if file_parts: + utils.update_state_file(args, bookmark) return True except Exception as exc: LOGGER.exception(exc) - return f'{args.table}: {exc}' + return f'{table_name}: {exc}' def main_impl(): """Main sync logic""" - args = parse_args_for_partial_sync(REQUIRED_CONFIG_KEYS) + args = utils.parse_args_for_partial_sync(REQUIRED_CONFIG_KEYS) start_time = datetime.now() + pool_size = common_utils.get_pool_size(args.tap) # Log start info LOGGER.info( ''' @@ -110,7 +125,16 @@ def main_impl(): ''', args.table, args.column, args.start_value, args.end_value ) - sync_excs = partial_sync_table(args=args) + sync_tables = utils.get_sync_tables(args) + + with multiprocessing.Pool(pool_size) as proc: + sync_excs = list( + filter( + lambda x: not isinstance(x, bool), + proc.map(partial(partial_sync_table, args=args), sync_tables.items()) + ) + ) + if isinstance(sync_excs, bool): sync_excs = None @@ -132,7 +156,7 @@ def main_impl(): ''', args.table, args.column, args.start_value, args.end_value, sync_excs, end_time - start_time ) - if sync_excs is not None: + if len(sync_excs) > 0: raise SystemExit(1) diff --git a/pipelinewise/fastsync/partialsync/utils.py b/pipelinewise/fastsync/partialsync/utils.py index 8fa91a142..0288afb4b 100644 --- a/pipelinewise/fastsync/partialsync/utils.py +++ b/pipelinewise/fastsync/partialsync/utils.py @@ -4,8 +4,12 @@ import os import re +from datetime import datetime +from ast import literal_eval + from typing import Dict, Tuple, List +from pipelinewise.cli.errors import InvalidConfigException from pipelinewise.fastsync.commons import utils as common_utils from pipelinewise.fastsync.commons.target_snowflake import FastSyncTargetSnowflake @@ -46,30 +50,6 @@ def diff_source_target_columns(target_sf: dict, source_columns: list) -> dict: } -def _get_target_columns_info(target_column): - target_columns_dict = {} - list_of_target_column_names = [] - for column in target_column: - list_of_target_column_names.append(column['column_name']) - column_type_str = column['data_type'] - column_type_dict = json.loads(column_type_str) - target_columns_dict[f'"{column["column_name"]}"'] = column_type_dict['type'] - return { - 'column_names': list_of_target_column_names, - 'columns_dict': target_columns_dict - } - - -def _get_source_columns_dict(source_columns): - source_columns_dict = {} - for column in source_columns: - column_info = column.split(' ') - column_name = column_info[0] - column_type = ' '.join(column_info[1:]) - source_columns_dict[column_name] = column_type - return source_columns_dict - - def load_into_snowflake(target, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, where_clause_sql): """Loading data from S3 to the temp table in snowflake and then merge it with the target table""" @@ -84,12 +64,16 @@ def load_into_snowflake(target, args, columns_diff, primary_keys, s3_key_pattern snowflake.add_columns(target['schema'], target['table'], columns_diff['added_columns']) added_metadata_columns = ['_SDC_EXTRACTED_AT', '_SDC_BATCHED_AT', '_SDC_DELETED_AT'] - snowflake.merge_tables( - target['schema'], target['temp'], target['table'], - list(columns_diff['source_columns'].keys()) + added_metadata_columns, primary_keys) - if args.target['hard_delete'] is True: - snowflake.partial_hard_delete(target['schema'], target['table'], where_clause_sql) - snowflake.drop_table(target['schema'], target['temp']) + if args.drop_target_table: + snowflake.swap_tables(target['schema'], target['table']) + else: + snowflake.merge_tables( + target['schema'], target['temp'], target['table'], + list(columns_diff['source_columns'].keys()) + added_metadata_columns, primary_keys) + + if args.target['hard_delete'] is True: + snowflake.partial_hard_delete(target['schema'], target['table'], where_clause_sql) + snowflake.drop_table(target['schema'], target['temp']) def update_state_file(args: argparse.Namespace, bookmark: Dict) -> None: @@ -108,6 +92,7 @@ def parse_args_for_partial_sync(required_config_keys: Dict) -> argparse.Namespac parser.add_argument('--column', help='Column for partial sync table') parser.add_argument('--start_value', help='Start value for partial sync table') parser.add_argument('--end_value', help='End value for partial sync table') + parser.add_argument('--drop_target_table', help='Dropping target table before sync') args: argparse.Namespace = parser.parse_args() @@ -134,6 +119,78 @@ def parse_args_for_partial_sync(required_config_keys: Dict) -> argparse.Namespac return args +def validate_boundary_value(string_to_check: str) -> str: + """Validating if the boundary values are valid and there is no injection""" + if not string_to_check: + return string_to_check + + # Validating string and number format + pattern = re.compile(r'[A-Za-z0-9\\.\\-]+') + if re.fullmatch(pattern, string_to_check): + return string_to_check + + # Validating timestamp format + try: + datetime.strptime(string_to_check, '%Y-%m-%d %H:%M:%S') + except ValueError: + try: + datetime.strptime(string_to_check, '%Y-%m-%d') + except ValueError: + raise InvalidConfigException(f'Invalid boundary value: {string_to_check}') from Exception + + return string_to_check + + +def get_sync_tables(args: argparse.Namespace) -> Dict: + """ + getting all needed information of tables for using in partial sync. + """ + table_names = args.table.split(',') + column_names = args.column.split(',') + start_values = args.start_value.split(',') + if args.end_value: + end_values = args.end_value.split(',') + else: + end_values = [None] * len(table_names) + if args.drop_target_table: + drop_target_tables = [literal_eval(x) for x in args.drop_target_table.split(',')] + else: + drop_target_tables = [False] * len(table_names) + sync_tables = {} + for ind, table in enumerate(table_names): + sync_tables[table] = { + 'column': column_names[ind], + 'start_value': start_values[ind], + 'end_value': end_values[ind], + 'drop_target_table': drop_target_tables[ind], + } + return sync_tables + + +def _get_target_columns_info(target_column): + target_columns_dict = {} + list_of_target_column_names = [] + for column in target_column: + list_of_target_column_names.append(column['column_name']) + column_type_str = column['data_type'] + column_type_dict = json.loads(column_type_str) + target_columns_dict[f'"{column["column_name"]}"'] = column_type_dict['type'] + return { + 'column_names': list_of_target_column_names, + 'columns_dict': target_columns_dict + } + + +def _get_source_columns_dict(source_columns): + source_columns_dict = {} + for column in source_columns: + column_info = column.split(' ') + column_name = column_info[0] + column_type = ' '.join(column_info[1:]) + source_columns_dict[column_name] = column_type + return source_columns_dict + + def _get_args_parser_for_partialsync(): parser = argparse.ArgumentParser() parser.add_argument('--tap', help='Tap Config file', required=True) diff --git a/tests/end_to_end/helpers/assertions.py b/tests/end_to_end/helpers/assertions.py index 6ba06eeb7..30dfc2101 100644 --- a/tests/end_to_end/helpers/assertions.py +++ b/tests/end_to_end/helpers/assertions.py @@ -491,6 +491,14 @@ def assert_not_raises(exc_type): raise TestCase.failureException(f'{exc_type.__name__} raised!') +def assert_record_count_in_sf(env, tap_type, table, expected_records, where_clause=''): + """Assert record count in target Snowflake""" + result = env.run_query_target_snowflake( + f'SELECT count(1) FROM ppw_e2e_{tap_type}{env.sf_schema_postfix}."{table.upper()}" {where_clause};' + )[0][0] + assert result == expected_records + + def _get_command_for_partial_sync(tap_parameters, start_value, end_value=None): end_value_command = f' --end_value {end_value}' if end_value else '' command = f'pipelinewise partial_sync_table --tap {tap_parameters["tap"]} --target {tap_parameters["target"]}' \ diff --git a/tests/end_to_end/helpers/env.py b/tests/end_to_end/helpers/env.py index 8887ed882..50bb89917 100644 --- a/tests/end_to_end/helpers/env.py +++ b/tests/end_to_end/helpers/env.py @@ -655,7 +655,7 @@ def delete_record_from_target_snowflake(self, tap_type, table, where_clause): f'DELETE from ppw_e2e_tap_{tap_type}{self.sf_schema_postfix}.{table} {where_clause}' ) - def add_column_into_target_sf(self, tap_type, table, new_column) : + def add_column_into_target_sf(self, tap_type, table, new_column): """Add a record into the target""" self.run_query_target_snowflake( f'ALTER TABLE ppw_e2e_tap_{tap_type}{self.sf_schema_postfix}.{table} ADD {new_column["name"]} int' @@ -682,12 +682,11 @@ def delete_record_from_source(self, tap_type, table, where_clause): f'DELETE FROM {table} {where_clause}' ) - def run_query_on_source(self, tap_type, query): - """Running a query on the source""" - run_query_method = getattr(self, f'run_query_tap_{tap_type}') - run_query_method( - query - ) + def get_source_records_count(self, tap_type, table): + """Getting count of records from the source""" + run_query_method = getattr(self, f'run_query_{tap_type.lower()}') + result = run_query_method(f'SELECT count(1) FROM {table}') + return result[0][0] def get_records_from_target_snowflake(self, tap_type, table, column, primary_key): """"Getting all records from a specific table of snowflake target""" @@ -712,3 +711,13 @@ def remove_all_state_files(): """Clean up state files to ensure tests behave the same every time""" for state_file in Path(CONFIG_DIR).glob('**/state.json'): state_file.unlink() + + @staticmethod + def clean_up_temp_dir(): + """Clean up temp folder to ensure tests behave the same every time""" + files = glob.glob(f'{CONFIG_DIR}/tmp/*') + for f in files: + try: + os.remove(f) + except Exception: + pass diff --git a/tests/end_to_end/target_snowflake/tap_mariadb/test_defined_partial_sync_mariadb_to_sf.py b/tests/end_to_end/target_snowflake/tap_mariadb/test_defined_partial_sync_mariadb_to_sf.py new file mode 100644 index 000000000..5c2d17495 --- /dev/null +++ b/tests/end_to_end/target_snowflake/tap_mariadb/test_defined_partial_sync_mariadb_to_sf.py @@ -0,0 +1,101 @@ +from tests.end_to_end.helpers import assertions +from tests.end_to_end.target_snowflake.tap_mariadb import TapMariaDB + +TAP_ID = 'mariadb_to_sf_defined_partial_sync' +TARGET_ID = 'snowflake' + + +class TestDefinedPartialSyncMariaDBToSF(TapMariaDB): + """ + Defined Partial Sync from MariaDB to Snowflake + """ + + # pylint: disable=arguments-differ + def setUp(self): + super().setUp(tap_id=TAP_ID, target_id=TARGET_ID) + + def _manipulate_target_tables(self): + self.e2e_env.run_query_target_snowflake( + f'INSERT INTO ppw_e2e_tap_mysql{self.e2e_env.sf_schema_postfix}.address ' + '(address_id, street_number, supplier_supplier_id, zip_code_zip_code_id) VALUES (1, 1, 1, 1)') + + self.e2e_env.run_query_target_snowflake( + f'DELETE FROM ppw_e2e_tap_mysql{self.e2e_env.sf_schema_postfix}.address ' + 'WHERE address_id=500') + self.e2e_env.run_query_target_snowflake( + f'INSERT INTO ppw_e2e_tap_mysql{self.e2e_env.sf_schema_postfix}.weight_unit ' + "(weight_unit_id, weight_unit_name) VALUES (1, 'foo')") + + self.e2e_env.run_query_target_snowflake( + f'DELETE FROM ppw_e2e_tap_mysql{self.e2e_env.sf_schema_postfix}.weight_unit ' + 'WHERE weight_unit_id=25') + + self.e2e_env.run_query_target_snowflake( + f'DELETE FROM ppw_e2e_tap_mysql{self.e2e_env.sf_schema_postfix}.customers ' + 'WHERE id=15') + + # pylint: disable=invalid-name + def test_defined_partial_sync_mariadb_to_sf(self): + """ + Testing defined partial syn from Mariadb to Snowflake + """ + + from_value_weight = 5 + from_value_address = 400 + # run-tap command + assertions.assert_run_tap_success( + self.tap_id, self.target_id, ['fastsync', 'singer'] + + ) + + # partial sync + + source_records_weight = self.e2e_env.get_source_records_count(self.tap_type, 'weight_unit') + expected_records = source_records_weight - from_value_weight + 1 + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'weight_unit', expected_records) + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, + 'weight_unit', expected_records, f'WHERE weight_unit_id >= {from_value_weight}') + + # Partial sync + source_records_address = self.e2e_env.get_source_records_count(self.tap_type, 'address') + expected_records = source_records_address - from_value_address + 1 + + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'address', expected_records) + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, 'address', expected_records, f'WHERE address_id >= {from_value_address}') + + # Full fastsync + source_records_customers = self.e2e_env.get_source_records_count(self.tap_type, 'customers') + expected_records = source_records_customers + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'customers', expected_records) + + self._manipulate_target_tables() + + # sync-tables command + assertions.assert_resync_tables_success(self.tap_id, self.target_id) + + expected_records = source_records_weight - from_value_weight + 1 + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'weight_unit', expected_records) + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, + 'weight_unit', expected_records, f'WHERE weight_unit_id >= {from_value_weight}') + + # Partial sync + additional_record_in_target = 1 + total_expected_records = source_records_address + additional_record_in_target - from_value_address + 1 + expected_records_greater_than_from_value = source_records_address - from_value_address + 1 + expected_records_less_than_from_value = 1 + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'address', total_expected_records) + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, + 'address', expected_records_greater_than_from_value, f'WHERE address_id >= {from_value_address}') + + # To test if target table is not dropped + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, + 'address', expected_records_less_than_from_value, f'WHERE address_id < {from_value_address}') + + # Full fastsync + expected_records = source_records_customers + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'customers', expected_records) diff --git a/tests/end_to_end/target_snowflake/tap_postgres/__init__.py b/tests/end_to_end/target_snowflake/tap_postgres/__init__.py index 3b20f80be..20e54f8aa 100644 --- a/tests/end_to_end/target_snowflake/tap_postgres/__init__.py +++ b/tests/end_to_end/target_snowflake/tap_postgres/__init__.py @@ -9,4 +9,5 @@ class TapPostgres(TargetSnowflake): # pylint: disable=arguments-differ def setUp(self, tap_id: str, target_id: str): super().setUp(tap_id=tap_id, target_id=target_id, tap_type='TAP_POSTGRES') + self.e2e_env.clean_up_temp_dir() self.e2e_env.setup_tap_postgres() diff --git a/tests/end_to_end/target_snowflake/tap_postgres/test_defined_partial_sync_pg_to_sf.py b/tests/end_to_end/target_snowflake/tap_postgres/test_defined_partial_sync_pg_to_sf.py new file mode 100644 index 000000000..dc5794c4d --- /dev/null +++ b/tests/end_to_end/target_snowflake/tap_postgres/test_defined_partial_sync_pg_to_sf.py @@ -0,0 +1,98 @@ +from tests.end_to_end.helpers import assertions +from tests.end_to_end.target_snowflake.tap_postgres import TapPostgres + +TAP_ID = 'pg_to_sf_defined_partial_sync' +TARGET_ID = 'snowflake' + + +class TestDefinedPartialSyncPGToSF(TapPostgres): + """ + Defined Partial Sync from Postgres to Snowflake + """ + + # pylint: disable=arguments-differ + def setUp(self): + super().setUp(tap_id=TAP_ID, target_id=TARGET_ID) + + def _manipulate_target_tables(self): + self.e2e_env.run_query_target_snowflake( + f'INSERT INTO ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."ORDER" ' + "(id, CVARCHAR) VALUES (1, 'A')") + + self.e2e_env.run_query_target_snowflake( + f'DELETE FROM ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."ORDER" ' + 'WHERE id=6') + self.e2e_env.run_query_target_snowflake( + f'INSERT INTO ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}.CITY ' + "(id, name) VALUES (1, 'foo')") + + self.e2e_env.run_query_target_snowflake( + f'DELETE FROM ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}.CITY ' + 'WHERE id=500') + + self.e2e_env.run_query_target_snowflake( + f'DELETE FROM ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}.customers ' + 'WHERE id=15') + + # pylint: disable=invalid-name + def test_defined_partial_sync_pg_to_sf(self): + """ + Testing defined partial syn from Postgres to Snowflake + """ + + from_value_city = 500 + from_value_order = 5 + # run-tap command + assertions.assert_run_tap_success( + self.tap_id, self.target_id, ['fastsync', 'singer'] + + ) + + # partial sync + + source_records_city = self.e2e_env.get_source_records_count(self.tap_type, 'city') + expected_records = source_records_city - from_value_city + 1 + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'city', expected_records) + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, 'city', expected_records, f'WHERE id >= {from_value_city}') + + # Partial sync + source_records_order = self.e2e_env.get_source_records_count(self.tap_type, '"order"') + expected_records = source_records_order - from_value_order + 1 + + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'order', expected_records) + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, 'order', expected_records, f'WHERE id >= {from_value_order}') + + # Full fastsync + source_records_customers = self.e2e_env.get_source_records_count(self.tap_type, 'customers') + expected_records = source_records_customers + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'customers', expected_records) + + self._manipulate_target_tables() + + # sync-tables command + assertions.assert_resync_tables_success(self.tap_id, self.target_id) + + expected_records = source_records_order - from_value_order + 1 + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'order', expected_records) + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, 'order', expected_records, f'WHERE id >= {from_value_order}') + + # Partial sync + additional_record_in_target = 1 + total_expected_records = source_records_city + additional_record_in_target - from_value_city + 1 + expected_records_greater_than_from_value = source_records_city - from_value_city + 1 + expected_records_less_than_from_value = 1 + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'city', total_expected_records) + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, + 'city', expected_records_greater_than_from_value, f'WHERE id >= {from_value_city}') + + # To test if target table is not dropped + assertions.assert_record_count_in_sf( + self.e2e_env, self.tap_type, 'city', expected_records_less_than_from_value, f'WHERE id < {from_value_city}') + + # Full fastsync + expected_records = source_records_customers + assertions.assert_record_count_in_sf(self.e2e_env, self.tap_type, 'customers', expected_records) diff --git a/tests/end_to_end/test-project/tap_mysql_to_sf_defined_partial_sync.yml.template b/tests/end_to_end/test-project/tap_mysql_to_sf_defined_partial_sync.yml.template new file mode 100644 index 000000000..f6b5c5f5a --- /dev/null +++ b/tests/end_to_end/test-project/tap_mysql_to_sf_defined_partial_sync.yml.template @@ -0,0 +1,56 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "mariadb_to_sf_defined_partial_sync" +name: "MariaDB source test database" +type: "tap-mysql" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - MySQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_MYSQL_HOST}" # MySQL host + port: ${TAP_MYSQL_PORT} # MySQL port + user: "${TAP_MYSQL_USER}" # MySQL user + password: "${TAP_MYSQL_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_MYSQL_DB}" # MySQL database name + use_gtid: true + engine: mariadb + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "snowflake" # ID of the target connector where the data will be loaded +batch_size_rows: 20000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + - source_schema: "mysql_source_db" + target_schema: "ppw_e2e_tap_mysql${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + - table_name: "weight_unit" + replication_method: "LOG_BASED" + sync_start_from: + column: "weight_unit_id" + value: "5" + drop_target_table: true + + - table_name: "address" + replication_method: "INCREMENTAL" + replication_key: "date_updated" + sync_start_from: + column: "address_id" + value: 400 + + - table_name: "customers" + replication_method: "LOG_BASED" diff --git a/tests/end_to_end/test-project/tap_postgres_to_sf_defined_partial_sync.yml.template b/tests/end_to_end/test-project/tap_postgres_to_sf_defined_partial_sync.yml.template new file mode 100644 index 000000000..344cfa80a --- /dev/null +++ b/tests/end_to_end/test-project/tap_postgres_to_sf_defined_partial_sync.yml.template @@ -0,0 +1,59 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "pg_to_sf_defined_partial_sync" +name: "PostgreSQL source test database" +type: "tap-postgres" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - PostgreSQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_POSTGRES_HOST}" # PostgreSQL host + logical_poll_total_seconds: 3 # Time out if no LOG_BASED changes received for 3 seconds + port: ${TAP_POSTGRES_PORT} # PostgreSQL port + user: "${TAP_POSTGRES_USER}" # PostgreSQL user + password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "snowflake" # ID of the target connector where the data will be loaded +batch_size_rows: 1000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + + ### SOURCE SCHEMA: public + - source_schema: "public" + target_schema: "ppw_e2e_tap_postgres${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + - table_name: "city" + replication_method: "INCREMENTAL" + replication_key: "id" + sync_start_from: + column: "id" + value: "500" + + - table_name: "order" + replication_method: "INCREMENTAL" + replication_key: "id" + sync_start_from: + column: "id" + value: 5 + drop_target_table: true + + - table_name: "customers" + replication_method: "LOG_BASED" diff --git a/tests/units/cli/resources/sample_json_config/target_one/tap_one/selection.json b/tests/units/cli/resources/sample_json_config/target_one/tap_one/selection.json index b2a81b0df..b35b3447f 100644 --- a/tests/units/cli/resources/sample_json_config/target_one/tap_one/selection.json +++ b/tests/units/cli/resources/sample_json_config/target_one/tap_one/selection.json @@ -2,7 +2,11 @@ "selection": [ { "replication_method": "LOG_BASED", - "tap_stream_id": "db_test_mysql-table_one" + "tap_stream_id": "db_test_mysql-table_one", + "sync_start_from": { + "column": "id", + "value": "5" + } }, { "replication_method": "INCREMENTAL", diff --git a/tests/units/cli/test_cli.py b/tests/units/cli/test_cli.py index 0921cb862..6a47871bc 100644 --- a/tests/units/cli/test_cli.py +++ b/tests/units/cli/test_cli.py @@ -30,7 +30,7 @@ # Can't inherit from unittest.TestCase because it breaks pytest fixture # https://github.com/pytest-dev/pytest/issues/2504#issuecomment-308828149 -# pylint: disable=no-self-use,too-many-public-methods,attribute-defined-outside-init,fixme +# pylint: disable=no-self-use,too-many-public-methods,attribute-defined-outside-init,too-many-lines,fixme class TestCli: """ Unit Tests for PipelineWise CLI executable @@ -69,24 +69,41 @@ def _init_for_sync_tables_states_cleanup(tables_arg: str = None) -> PipelineWise return pipelinewise @staticmethod - def _make_sample_state_file(test_state_file: str) -> None: - sample_state_data = { - 'currently_syncing': None, - 'bookmarks': { - 'table1': {'foo': 'bar'}, - 'table2': {'foo': 'bar'}, - 'table3': {'foo': 'bar'} + def _make_sample_state_file(test_state_file: str, content=None) -> None: + if content: + sample_state_data = content + else: + sample_state_data = { + 'currently_syncing': None, + 'bookmarks': { + 'table1': {'foo': 'bar'}, + 'table2': {'foo': 'bar'}, + 'table3': {'foo': 'bar'} + } } - } with open(test_state_file, 'w', encoding='UTF-8') as state_file: json.dump(sample_state_data, state_file) @staticmethod - def _assert_calling_sync_tables(pipelinewise: PipelineWise, side_effect_method: Optional[Callable] = None) -> None: + def _assert_calling_sync_tables(pipelinewise: PipelineWise) -> None: + with patch('pipelinewise.cli.pipelinewise.Process') as mocked_process: + mocked_process.return_value.exception = None + mocked_process.return_value.exitcode = 0 + pipelinewise.sync_tables() + + assert mocked_process.call_args_list == [ + call(target=pipelinewise.sync_tables_partial_sync, args=( + {'db_test_mysql.table_one': {'column': 'id', 'value': '5'}},)), + call(target=pipelinewise.sync_tables_fast_sync, args=(['db_test_mysql.table_two'],)), + ] + + @staticmethod + def _assert_calling_fastsync_tables( + pipelinewise: PipelineWise, selected_tables, side_effect_method: Optional[Callable] = None) -> None: with patch('pipelinewise.cli.pipelinewise.PipelineWise.run_tap_fastsync') as mocked_fastsync: if side_effect_method: mocked_fastsync.side_effect = side_effect_method - pipelinewise.sync_tables() + pipelinewise.sync_tables_fast_sync(selected_tables) mocked_fastsync.assert_called_once() def _assert_import_command(self, args): @@ -109,6 +126,16 @@ def _assert_import_command(self, args): for call_arg in mocked_parallel.call_args_list: assert call_arg[1]['tap']['id'] in expected_taps + def _assert_run_command_exit_with_error_1(self, command): + with patch('pipelinewise.cli.pipelinewise.PipelineWise.run_tap_singer'): + args = CliArgs(target='target_one', tap='tap_one') + pipelinewise = PipelineWise(args, CONFIG_DIR, VIRTUALENVS_DIR) + with pytest.raises(SystemExit) as pytest_wrapped_e: + ppw_command = getattr(pipelinewise, command) + ppw_command() + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + def test_target_dir(self): """Singer target connector config path must be relative to the project config dir""" assert self.pipelinewise.get_target_dir( @@ -371,7 +398,6 @@ def test_merge_same_catalog(self): def test_merge_updated_catalog(self): """Test merging not empty schemas""" - # TODO: Check if pipelinewise.merge_schemas is required at all or not tap_one_catalog = cli.utils.load_json( '{}/resources/sample_json_config/target_one/tap_one/properties.json'.format( os.path.dirname(__file__) @@ -668,6 +694,30 @@ def test_command_stop_tap(self): # Delete test log file os.remove('{}.terminated'.format(pipelinewise.tap_run_log_file)) + def test_command_run_tap_exit_with_error_1_if_fastsync_exception(self): + """Test if run_tap command returns error 1 if exception in fastsync""" + with patch('pipelinewise.cli.pipelinewise.PipelineWise.run_tap_fastsync') as mocked_fastsync: + mocked_fastsync.side_effect = Exception('FOO') + self._assert_run_command_exit_with_error_1('run_tap') + + def test_command_run_tap_exit_with_error_1_if_partial_sync_exception(self): + """Test if run_tap command returns error 1 if exception in partialsync""" + with patch('pipelinewise.cli.pipelinewise.PipelineWise.run_tap_partialsync') as mocked_partial_sync: + mocked_partial_sync.side_effect = Exception('FOO') + self._assert_run_command_exit_with_error_1('run_tap') + + def test_command_sync_tables_exit_with_error_1_if_fast_sync_exception(self): + """Test if sync_tables command returns error 1 if exception in fastsync""" + with patch('pipelinewise.cli.pipelinewise.PipelineWise.run_tap_fastsync') as mocked_fastsync: + mocked_fastsync.side_effect = Exception('FOO') + self._assert_run_command_exit_with_error_1('sync_tables') + + def test_command_sync_tables_exit_with_error_1_if_partial_sync_exception(self): + """Test if sync_tables command returns error 1 if exception in partial sync""" + with patch('pipelinewise.cli.pipelinewise.PipelineWise.run_tap_partialsync') as mocked_partial_sync: + mocked_partial_sync.side_effect = Exception('FOO') + self._assert_run_command_exit_with_error_1('sync_tables') + def test_command_sync_tables(self): """Test run tap command""" args = CliArgs(target='target_one', tap='tap_one') @@ -676,10 +726,13 @@ def test_command_sync_tables(self): # Running sync_tables should detect the tap type and path to the connector # Since the executable is not available in this test then it should fail # TODO: sync discover_tap and run_tap behaviour. run_tap sys.exit but discover_tap does not. - with pytest.raises(SystemExit) as pytest_wrapped_e: - pipelinewise.sync_tables() - assert pytest_wrapped_e.type == SystemExit - assert pytest_wrapped_e.value.code == 1 + all_sync_methods = (pipelinewise.sync_tables_partial_sync, pipelinewise.sync_tables_fast_sync) + + for sync_method in all_sync_methods: + with pytest.raises(SystemExit) as pytest_wrapped_e: + sync_method(['foo']) + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 def test_command_sync_tables_cleanup_state_if_file_not_exists_and_no_tables_argument(self): """Testing sync_tables cleanup state if file not exists and there is no tables argument""" @@ -688,42 +741,64 @@ def test_command_sync_tables_cleanup_state_if_file_not_exists_and_no_tables_argu def test_command_sync_tables_cleanup_state_if_file_not_exists_and_tables_argument(self): """Testing sync_tables cleanup state if file not exists and there is tables argument""" - pipelinewise = self._init_for_sync_tables_states_cleanup(tables_arg='table1,table3') + pipelinewise = self._init_for_sync_tables_states_cleanup( + tables_arg='db_test_mysql.table_one,db_test_mysql.table_two') self._assert_calling_sync_tables(pipelinewise) - def test_command_sync_tables_cleanup_state_if_file_exists_and_no_table_argument(self): + def test_do_sync_tables_reset_state_file_for_partial_sync(self): + """Testing if selected partial sync tables are filtered from state file if sync_tables run""" + pipelinewise = self._init_for_sync_tables_states_cleanup() + test_state_file = pipelinewise.tap['files']['state'] + state_content = { + 'currently_syncing': None, + 'bookmarks': { + 'tb1': {'foo': 'bar'}, + 'db_test_mysql-table_one': {'column': 'id', 'value': '5'} + } + } + self._make_sample_state_file(test_state_file, content=state_content) + with patch('pipelinewise.cli.pipelinewise.Process') as mocked_process: + mocked_process.return_value.exception = None + mocked_process.return_value.exitcode = 0 + pipelinewise.do_sync_tables() + + with open(test_state_file, 'r', encoding='utf-8') as state_file: + bookmarks = json.load(state_file) + + assert bookmarks == {'bookmarks': {'tb1': {'foo': 'bar'}}, 'currently_syncing': None} + + def test_fast_sync_tables_cleanup_state_for_selected_tables(self): """Testing sync_tables cleanup state if file exists and there is no table argument""" - def _assert_state_file_is_deleted(*args, **kwargs): + def _assert_state_file(*args, **kwargs): # pylint: disable=unused-argument - assert os.path.isfile(test_state_file) is False + with open(test_state_file, 'r', encoding='utf-8') as state_file: + bookmarks = json.load(state_file) + + assert bookmarks == {'bookmarks': {'table2': {'foo': 'bar'}}, 'currently_syncing': None} pipelinewise = self._init_for_sync_tables_states_cleanup() test_state_file = pipelinewise.tap['files']['state'] self._make_sample_state_file(test_state_file) - self._assert_calling_sync_tables(pipelinewise, _assert_state_file_is_deleted) - def test_command_sync_tables_cleanup_state_if_file_exists_and_table_argument(self): - """Testing sync_tables cleanup state if file exists and there is table argument""" - def _assert_state_file_is_cleaned(*args, **kwargs): - # pylint: disable=unused-argument - expected_state_data = { - 'currently_syncing': None, - 'bookmarks': { - 'table2': {'foo': 'bar'}, - } - } - with open(test_state_file, encoding='UTF-8') as state_file: - state_data = json.load(state_file) - assert state_data == expected_state_data + # TODO: fix side effect! + original_bookmarks = { + 'bookmarks': { + 'table1': {'foo': 'bar'}, + 'table2': {'foo': 'bar'}, + 'table3': {'foo': 'bar'} + }, + 'currently_syncing': None + } + with open(test_state_file, 'r', encoding='utf-8') as state_file: + bookmarks = json.load(state_file) - pipelinewise = self._init_for_sync_tables_states_cleanup(tables_arg='table1,table3') - test_state_file = pipelinewise.tap['files']['state'] - self._make_sample_state_file(test_state_file) - self._assert_calling_sync_tables(pipelinewise, _assert_state_file_is_cleaned) + assert bookmarks == original_bookmarks + self._assert_calling_fastsync_tables(pipelinewise, ['table1', 'table3'], _assert_state_file) def test_command_sync_tables_cleanup_state_if_file_empty_and_table_argument(self): """Testing sync_tables cleanup state if file empty and there is table argument""" - pipelinewise = self._init_for_sync_tables_states_cleanup(tables_arg='table1,table3') + pipelinewise = self._init_for_sync_tables_states_cleanup( + tables_arg='db_test_mysql.table_one,db_test_mysql.table_two') test_state_file = pipelinewise.tap['files']['state'] with open(test_state_file, 'a', encoding='UTF-8'): pass diff --git a/tests/units/partialsync/test_mysql_to_snowflake.py b/tests/units/partialsync/test_mysql_to_snowflake.py index d78685d65..fde7cc118 100644 --- a/tests/units/partialsync/test_mysql_to_snowflake.py +++ b/tests/units/partialsync/test_mysql_to_snowflake.py @@ -4,8 +4,9 @@ from unittest import TestCase, mock from pipelinewise.fastsync.partialsync import mysql_to_snowflake +from pipelinewise.fastsync.partialsync.utils import parse_args_for_partial_sync from pipelinewise.fastsync.commons.tap_mysql import FastSyncTapMySql -from tests.units.partialsync.utils import PartialSync2SFArgs, run_mysql_to_snowflake +from tests.units.partialsync.utils import PartialSync2SFArgs, get_argv_list class PartialSyncTestCase(TestCase): @@ -17,19 +18,23 @@ def setUp(self) -> None: def test_mysql_to_snowflake_partial_sync_table_if_exception_happens(self): """Test partial sync if an exception raises""" - # TODO: an exception in database connection! args = PartialSync2SFArgs(temp_test_dir='FOO_DIR') exception_message = 'FOO Exception!' + test_table = ('foo', { + 'column': 'foo_column', + 'start_value': '1', + 'end_value': '2', + 'drop_target_table': False, + }) with mock.patch( 'pipelinewise.fastsync.partialsync.mysql_to_snowflake.FastSyncTapMySql.open_connections' ) as mocked_mysql_connection: mocked_mysql_connection.side_effect = Exception(exception_message) - actual_return = mysql_to_snowflake.partial_sync_table(args) + actual_return = mysql_to_snowflake.partial_sync_table(test_table, args) self.assertEqual(f'{args.table}: {exception_message}', actual_return) - def test_export_source_table_data(self): """Test export_source_table_data method""" expected_file_parts = [] @@ -66,9 +71,11 @@ def mocked_copy_table_method(table, filepath, **kwargs): self.assertEqual(2, len(call_args)) self.assertEqual(args.table, call_args[0]) - self.assertRegex(call_args[1], - f'^{args.temp_dir}/pipelinewise_{tap_id}_{args.table}_[0-9]{{8}}-[0-9]{{6}}-[0-9]{{6}}' - f'_partialsync_[0-9A-Z]{{8}}.csv.gz') + self.assertRegex( + call_args[1], + f'^{args.temp_dir}/pipelinewise_{tap_id}_{args.table}_[0-9]{{8}}-[0-9]{{6}}-[0-9]{{6}}' + f'_partialsync_[0-9A-Z]{{8}}.csv.gz' + ) self.assertDictEqual(expected_call_kwargs, call_kwargs) self.assertEqual(len(actual_file_parts), len(expected_file_parts)) @@ -76,40 +83,40 @@ def mocked_copy_table_method(table, filepath, **kwargs): self.assertIn(file_part, actual_file_parts) # pylint: disable=too-many-locals, too-many-arguments - @mock.patch('pipelinewise.fastsync.commons.utils.save_state_file') - @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.upload_to_s3') - @mock.patch('pipelinewise.fastsync.commons.utils.get_bookmark_for_table') - @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.FastSyncTapMySql') - @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.FastSyncTargetSnowflake') - def test_running_partial_sync_mysql_to_snowflake(self, - mocked_fastsync_sf, - mocked_fastsyncmysql, - mocked_bookmark, - mocked_upload_to_s3, - mocked_save_state): + + @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.multiprocessing.Pool') + def test_running_partial_sync_mysql_to_snowflake(self, mocked_pool): """Test the whole partial_sync_mysql_to_snowflake module works as expected""" + test_table = {} + expected_args = None + + # pylint: disable=too-few-public-methods + class MockedMultiprocessor: + """"Mocked multiprocessing class""" + @staticmethod + def map(partial_func, itter_param): + """Mocked map method which is used for assertion""" + # Asserting if multiprocess calling is as expected + actual_args = partial_func.keywords['args'] + + assert itter_param == test_table.items() + assert actual_args == expected_args + return [True] + + class PoolClass: + """Mocked pool class""" + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return MockedMultiprocessor() + + def __exit__(self, *args, **kwargs): + pass + + mocked_pool.side_effect = PoolClass + with TemporaryDirectory() as temp_directory: - file_size = 5 - file_parts = [f'{temp_directory}/t1', ] - s3_keys = ['FOO_S3_KEYS', ] - s3_key_pattern = 'BAR_S3_KEY_PATTERN' - bookmark = 'foo_bookmark' - maped_column_types_to_target = { - 'columns': ['foo type1', 'bar type2'], - 'primary_key': 'foo_primary' - } - - def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument - with open(f'{temp_directory}/t1', 'w', encoding='utf8') as exported_file: - exported_file.write('F' * file_size) - - return file_parts - - mocked_upload_to_s3.return_value = (s3_keys, s3_key_pattern) - mocked_bookmark.return_value = bookmark - mocked_export_data = mocked_fastsyncmysql.return_value.export_source_table_data - mocked_fastsyncmysql.return_value.map_column_types_to_target.return_value = maped_column_types_to_target - mocked_export_data.side_effect = export_data_to_file table_name = 'foo_table' column = 'foo_column' @@ -131,7 +138,18 @@ def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument } with self.assertLogs('pipelinewise') as actual_logs: - args_namespace = run_mysql_to_snowflake(arguments) + test_table = { + table_name: { + 'column': column, + 'start_value': start_value, + 'end_value': end_value, + 'drop_target_table': False + } + } + argv_list = get_argv_list(arguments) + with mock.patch('sys.argv', argv_list): + expected_args = parse_args_for_partial_sync(mysql_to_snowflake.REQUIRED_CONFIG_KEYS) + mysql_to_snowflake.main() expected_log_messages = [ [ @@ -147,27 +165,87 @@ def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument f'Column : {column}', f'Start value : {start_value}', f'End value : {end_value}', - 'Exceptions during table sync : None', + 'Exceptions during table sync : []', ] ] for log_index, log_messages in enumerate(expected_log_messages): for message in log_messages: self.assertIn(message, actual_logs.output[log_index]) - expected_where_clause = f" WHERE {column} >= '{start_value}'" - if end_value: - expected_where_clause += f" AND {column} <= '10'" - mocked_export_data.assert_called_with( - args_namespace, args_namespace.target.get('tap_id'), expected_where_clause - ) - mocked_upload_to_s3.assert_called_with(mocked_fastsync_sf(), file_parts, arguments['temp_dir']) - mocked_fastsync_sf.return_value.merge_tables.assert_called_with( - 'foo_schema', f'{table_name}_temp', table_name, - ['foo', 'bar', '_SDC_EXTRACTED_AT', '_SDC_BATCHED_AT', '_SDC_DELETED_AT'], - maped_column_types_to_target['primary_key'] - ) - mocked_fastsync_sf.return_value.drop_table.assert_called_with('foo_schema', f'{table_name}_temp') - if end_value: - mocked_save_state.assert_not_called() - else: - mocked_save_state.assert_called_with(arguments['state'], table_name, bookmark) + @mock.patch('pipelinewise.fastsync.partialsync.utils.load_into_snowflake') + @mock.patch('pipelinewise.fastsync.partialsync.utils.upload_to_s3') + @mock.patch('pipelinewise.fastsync.commons.utils.save_state_file') + @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.FastSyncTapMySql') + @mock.patch('pipelinewise.fastsync.commons.utils.get_bookmark_for_table') + @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.FastSyncTargetSnowflake') + def test_mysql_to_snowflake_partial_sync_table(self, + mocked_fastsync_sf, mocked_bookmark, mocked_fastsyncmysql, + mocked_save_state, mocked_upload_to_s3, mocked_load_into_sf): + """Test mysql to sf partial sync table""" + table_name = 'foo' + test_end_values = (None, ) + + for end_value in test_end_values: + args = PartialSync2SFArgs(temp_test_dir='FOO_DIR', end_value=end_value) + test_table = (table_name, { + 'column': 'foo_column', + 'start_value': '1', + 'end_value': end_value, + 'drop_target_table': False, + }) + + with TemporaryDirectory() as temp_directory: + file_size = 5 + file_parts = [f'{temp_directory}/t1', ] + s3_keys = ['FOO_S3_KEYS', ] + s3_key_pattern = 'BAR_S3_KEY_PATTERN' + bookmark = 'foo_bookmark' + maped_column_types_to_target = { + 'columns': ['foo type1', 'bar type2'], + 'primary_key': 'foo_primary' + } + + # pylint: disable=cell-var-from-loop + def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument + with open(f'{temp_directory}/t1', 'w', encoding='utf8') as exported_file: + exported_file.write('F' * file_size) + + return file_parts + + mocked_upload_to_s3.return_value = (s3_keys, s3_key_pattern) + mocked_bookmark.return_value = bookmark + mocked_export_data = mocked_fastsyncmysql.return_value.export_source_table_data + mocked_fastsyncmysql.return_value.map_column_types_to_target.return_value = maped_column_types_to_target + mocked_export_data.side_effect = export_data_to_file + + actual_return = mysql_to_snowflake.partial_sync_table(test_table, args) + self.assertTrue(actual_return) + + target = { + 'schema': 'foo_schema', + 'sf_object': mocked_fastsync_sf(), + 'table': table_name, + 'temp': 'foo_temp' + } + columns_diff = { + 'added_columns': {'bar': 'type2', 'foo': 'type1'}, + 'removed_columns': {}, + 'source_columns': {'bar': 'type2', 'foo': 'type1'}, + 'target_columns': [] + } + + mocked_fastsync_sf.return_value.query.assert_called_with( + 'UPDATE foo_schema."FOO" SET _SDC_DELETEd_AT = CURRENT_TIMESTAMP()' + ' WHERE foo_column >= \'1\' AND _SDC_DELETED_AT IS NULL' + ) + mocked_fastsync_sf.return_value.create_schema.assert_called_with('foo_schema') + mocked_fastsync_sf.return_value.create_table.assert_called_with( + 'foo_schema', 'foo', ['foo type1', 'bar type2'], 'foo_primary', is_temporary=True) + mocked_load_into_sf.assert_called_with( + target, args, columns_diff, maped_column_types_to_target['primary_key'], + s3_key_pattern, file_size, f" WHERE {test_table[1]['column']} >= '{test_table[1]['start_value']}'") + + if end_value: + mocked_save_state.assert_not_called() + else: + mocked_save_state.assert_called_with('state.json', table_name, bookmark) diff --git a/tests/units/partialsync/test_partial_sync_utils.py b/tests/units/partialsync/test_partial_sync_utils.py index db931af84..e2c7c77a5 100644 --- a/tests/units/partialsync/test_partial_sync_utils.py +++ b/tests/units/partialsync/test_partial_sync_utils.py @@ -1,9 +1,9 @@ from unittest import TestCase, mock from tempfile import TemporaryDirectory -from pipelinewise.fastsync.partialsync.utils import load_into_snowflake, upload_to_s3, update_state_file -from pipelinewise.fastsync.partialsync.utils import diff_source_target_columns - +from pipelinewise.fastsync.partialsync.utils import ( + load_into_snowflake, upload_to_s3, update_state_file, diff_source_target_columns, validate_boundary_value) +from pipelinewise.cli.errors import InvalidConfigException from tests.units.partialsync.utils import PartialSync2SFArgs from tests.units.partialsync.resources.test_partial_sync_utils.sample_sf_columns import SAMPLE_OUTPUT_FROM_SF @@ -31,7 +31,7 @@ def test_upload_to_s3(self): # pylint: disable=no-self-use def test_load_into_snowflake_hard_delete(self): - """Test load_into_snowflake method""" + """Test load_into_snowflake method with hard delete""" snowflake = mock.MagicMock() target = { 'sf_object': snowflake, @@ -66,7 +66,7 @@ def test_load_into_snowflake_hard_delete(self): # pylint: disable=no-self-use def test_load_into_snowflake_soft_delete(self): - """Test load_into_snowflake method""" + """Test load_into_snowflake method with soft delete""" snowflake = mock.MagicMock() target = { 'sf_object': snowflake, @@ -98,6 +98,36 @@ def test_load_into_snowflake_soft_delete(self): mock.call.drop_table(target['schema'], target['temp']) ]) + def test_load_into_snowflake_drop_target_table_enabled(self): + """Test load_into_snowflake if drop_target_table is enabled""" + snowflake = mock.MagicMock() + target = { + 'sf_object': snowflake, + 'schema': 'FOO_SCHEMA', + 'table': 'FOO_TABLE', + 'temp': 'FOO_TEMP' + } + args = PartialSync2SFArgs( + temp_test_dir='temp_test_dir', start_value='20', end_value='30', hard_delete=False, drop_target_table=True + ) + columns_diff = { + 'added_columns': ['FOO_ADDED_COLUMN'], + 'source_columns': {'FOO_SOURCE_COLUMN': 'FOO_TYPE'} + } + primary_keys = ['FOO_PRIMARY'] + s3_key_pattern = 'FOO_PATTERN' + size_bytes = 3 + where_clause_sql = 'test' + load_into_snowflake(target, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, + where_clause_sql) + + snowflake.assert_has_calls([ + mock.call.copy_to_table(s3_key_pattern, target['schema'], args.table, size_bytes, is_temporary=True), + mock.call.obfuscate_columns(target['schema'], args.table), + mock.call.add_columns(target['schema'], target['table'], columns_diff['added_columns']), + mock.call.swap_tables(target['schema'], target['table']), + ]) + # pylint: disable=no-self-use def test_update_state_file(self): """Test state file updating with and without end value""" @@ -151,3 +181,17 @@ def test_find_diff_columns(self): } actual_output = diff_source_target_columns(target_sf=sample_target_sf, source_columns=sample_source_columns) self.assertDictEqual(actual_output, expected_output) + + def test_validate_boundary_value_works_as_expected(self): + """Testing validate_boundary_value method""" + valid_values = ('foo', '123', '2022-12-11 12:11:13', '2022-12-11', 'foo123', '24.5', 'ABCD-FH11-24', None) + + for test_value in valid_values: + self.assertEqual(test_value, validate_boundary_value(test_value)) + + def test_validate_boundary_value_raises_exception_if_invalid_value(self): + """Test if exception is raised on invalid values""" + invalid_values = (';', 'foo bar', '(foo)', 'foo;bar', 'foo%', '1 2 3', 'foo,bar', '[foo]', '*', '%') + + for test_value in invalid_values: + self.assertRaises(InvalidConfigException, validate_boundary_value, test_value) diff --git a/tests/units/partialsync/test_postgres_to_snowflake.py b/tests/units/partialsync/test_postgres_to_snowflake.py index 7f40d077f..0850f0190 100644 --- a/tests/units/partialsync/test_postgres_to_snowflake.py +++ b/tests/units/partialsync/test_postgres_to_snowflake.py @@ -5,7 +5,9 @@ from pipelinewise.fastsync.partialsync import postgres_to_snowflake from pipelinewise.fastsync.commons.tap_postgres import FastSyncTapPostgres -from tests.units.partialsync.utils import PartialSync2SFArgs, run_postgres_to_snowflake +from pipelinewise.fastsync.partialsync.utils import parse_args_for_partial_sync + +from tests.units.partialsync.utils import PartialSync2SFArgs, get_argv_list class PartialSyncTestCase(TestCase): @@ -17,15 +19,20 @@ def setUp(self) -> None: def test_postgres_to_snowflake_partial_sync_table_if_exception_happens(self): """Test partial sync if an exception raises""" - # TODO: an exception in database connection! args = PartialSync2SFArgs(temp_test_dir='FOO_DIR') exception_message = 'FOO Exception!' + test_table = ('foo', { + 'column': 'foo_column', + 'start_value': '1', + 'end_value': '2', + 'drop_target_table': False, + }) with mock.patch( 'pipelinewise.fastsync.partialsync.postgres_to_snowflake.FastSyncTapPostgres.open_connection' ) as mocked_postgres_connection: mocked_postgres_connection.side_effect = Exception(exception_message) - actual_return = postgres_to_snowflake.partial_sync_table(args) + actual_return = postgres_to_snowflake.partial_sync_table(test_table, args) self.assertEqual(f'{args.table}: {exception_message}', actual_return) @@ -65,9 +72,11 @@ def mocked_copy_table_method(table, filepath, **kwargs): self.assertEqual(2, len(call_args)) self.assertEqual(args.table, call_args[0]) - self.assertRegex(call_args[1], - f'^{args.temp_dir}/pipelinewise_{tap_id}_{args.table}_[0-9]{{8}}-[0-9]{{6}}-[0-9]{{6}}' - f'_partialsync_[0-9A-Z]{{8}}.csv.gz') + self.assertRegex( + call_args[1], + f'^{args.temp_dir}/pipelinewise_{tap_id}_{args.table}_[0-9]{{8}}-[0-9]{{6}}-[0-9]{{6}}' + f'_partialsync_[0-9A-Z]{{8}}.csv.gz' + ) self.assertDictEqual(expected_call_kwargs, call_kwargs) self.assertEqual(len(actual_file_parts), len(expected_file_parts)) @@ -75,41 +84,40 @@ def mocked_copy_table_method(table, filepath, **kwargs): self.assertIn(file_part, actual_file_parts) # pylint: disable=too-many-locals, too-many-arguments - @mock.patch('pipelinewise.fastsync.commons.utils.save_state_file') - @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.upload_to_s3') - @mock.patch('pipelinewise.fastsync.commons.utils.get_bookmark_for_table') - @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.FastSyncTapPostgres') - @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.FastSyncTargetSnowflake') - def test_running_partial_sync_postgres_to_snowflake(self, - mocked_fastsync_sf, - mocked_fastsyncpostgres, - mocked_bookmark, - mocked_upload_to_s3, - mocked_save_state): + + @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.multiprocessing.Pool') + def test_running_partial_sync_postgres_to_snowflake(self, mocked_pool): """Test the whole partial_sync_postgres_to_snowflake module works as expected""" - with TemporaryDirectory() as temp_directory: - file_size = 5 - file_parts = [f'{temp_directory}/t1',] - s3_keys = ['FOO_S3_KEYS',] - s3_key_pattern = 'BAR_S3_KEY_PATTERN' - bookmark = 'foo_bookmark' - maped_column_types_to_target = { - 'columns': ['foo type1', 'bar type2'], - 'primary_key': 'foo_primary' - } - - def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument - with open(f'{temp_directory}/t1', 'w', encoding='utf8') as exported_file: - exported_file.write('F' * file_size) - - return file_parts - - mocked_upload_to_s3.return_value = (s3_keys, s3_key_pattern) - mocked_bookmark.return_value = bookmark - mocked_export_data = mocked_fastsyncpostgres.return_value.export_source_table_data - mocked_fastsyncpostgres.return_value.map_column_types_to_target.return_value = maped_column_types_to_target - mocked_export_data.side_effect = export_data_to_file + test_table = {} + expected_args = None + # pylint: disable=too-few-public-methods + class MockedMultiprocessor: + """"Mocked multiprocessing class""" + @staticmethod + def map(partial_func, itter_param): + """Mocked map method which is used for assertion""" + # Asserting if multiprocess calling is as expected + actual_args = partial_func.keywords['args'] + + assert itter_param == test_table.items() + assert actual_args == expected_args + return [True] + + class PoolClass: + """Mocked pool class""" + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return MockedMultiprocessor() + + def __exit__(self, *args, **kwargs): + pass + + mocked_pool.side_effect = PoolClass + + with TemporaryDirectory() as temp_directory: table_name = 'foo_table' column = 'foo_column' start_value = '1' @@ -130,7 +138,14 @@ def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument } with self.assertLogs('pipelinewise') as actual_logs: - args_namespace = run_postgres_to_snowflake(arguments) + test_table = { + table_name: {'column': column, 'start_value': start_value, 'end_value': end_value, + 'drop_target_table': False} + } + argv_list = get_argv_list(arguments) + with mock.patch('sys.argv', argv_list): + expected_args = parse_args_for_partial_sync(postgres_to_snowflake.REQUIRED_CONFIG_KEYS) + postgres_to_snowflake.main() expected_log_messages = [ [ @@ -146,33 +161,89 @@ def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument f'Column : {column}', f'Start value : {start_value}', f'End value : {end_value}', - 'Exceptions during table sync : None', + 'Exceptions during table sync : []', ] ] for log_index, log_messages in enumerate(expected_log_messages): for message in log_messages: self.assertIn(message, actual_logs.output[log_index]) - expected_where_clause = f" WHERE {column} >= '{start_value}'" - if end_value: - expected_where_clause += f" AND {column} <= '10'" - - - mocked_export_data.assert_called_with( - args_namespace, args_namespace.target.get('tap_id'), expected_where_clause - ) - mocked_upload_to_s3.assert_called_with(mocked_fastsync_sf(), file_parts, arguments['temp_dir']) - - mocked_fastsync_sf.return_value.merge_tables.assert_called_with('foo_schema', f'{table_name}_temp', - table_name, - ['foo', 'bar', '_SDC_EXTRACTED_AT', - '_SDC_BATCHED_AT', - '_SDC_DELETED_AT'], - maped_column_types_to_target[ - 'primary_key']) - mocked_fastsync_sf.return_value.drop_table.assert_called_with('foo_schema', f'{table_name}_temp') - - if end_value: - mocked_save_state.assert_not_called() - else: - mocked_save_state.assert_called_with(arguments['state'], table_name, bookmark) + @mock.patch('pipelinewise.fastsync.partialsync.utils.load_into_snowflake') + @mock.patch('pipelinewise.fastsync.partialsync.utils.upload_to_s3') + @mock.patch('pipelinewise.fastsync.commons.utils.save_state_file') + @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.FastSyncTapPostgres') + @mock.patch('pipelinewise.fastsync.commons.utils.get_bookmark_for_table') + @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.FastSyncTargetSnowflake') + def test_postgres_to_snowflake_partial_sync_table(self, + mocked_fastsync_sf, mocked_bookmark, mocked_fastsyncpg, + mocked_save_state, mocked_upload_to_s3, mocked_load_into_sf): + """Test postgres to sf partial sync table""" + + table_name = 'foo' + + test_end_values = (None, ) + for end_value in test_end_values: + args = PartialSync2SFArgs(temp_test_dir='FOO_DIR', end_value=end_value) + test_table = (table_name, { + 'column': 'foo_column', + 'start_value': '1', + 'end_value': end_value, + 'drop_target_table': False, + }) + + with TemporaryDirectory() as temp_directory: + file_size = 5 + file_parts = [f'{temp_directory}/t1', ] + s3_keys = ['FOO_S3_KEYS', ] + s3_key_pattern = 'BAR_S3_KEY_PATTERN' + bookmark = 'foo_bookmark' + maped_column_types_to_target = { + 'columns': ['foo type1', 'bar type2'], + 'primary_key': 'foo_primary' + } + + # pylint: disable=cell-var-from-loop + def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument + with open(f'{temp_directory}/t1', 'w', encoding='utf8') as exported_file: + exported_file.write('F' * file_size) + + return file_parts + + mocked_upload_to_s3.return_value = (s3_keys, s3_key_pattern) + mocked_bookmark.return_value = bookmark + mocked_export_data = mocked_fastsyncpg.return_value.export_source_table_data + mocked_fastsyncpg.return_value.map_column_types_to_target.return_value = maped_column_types_to_target + mocked_export_data.side_effect = export_data_to_file + + actual_return = postgres_to_snowflake.partial_sync_table(test_table, args) + self.assertTrue(actual_return) + + target = { + 'schema': 'foo_schema', + 'sf_object': mocked_fastsync_sf(), + 'table': table_name, + 'temp': 'foo_temp' + } + columns_diff = { + 'added_columns': {'bar': 'type2', 'foo': 'type1'}, + 'removed_columns': {}, + 'source_columns': {'bar': 'type2', 'foo': 'type1'}, + 'target_columns': [] + } + + mocked_fastsync_sf.return_value.query.assert_called_with( + 'UPDATE foo_schema."FOO" SET _SDC_DELETEd_AT = CURRENT_TIMESTAMP()' + ' WHERE foo_column >= \'1\' AND _SDC_DELETED_AT IS NULL' + ) + mocked_fastsync_sf.return_value.create_schema.assert_called_with('foo_schema') + mocked_fastsync_sf.return_value.create_table.assert_called_with( + 'foo_schema', 'foo', ['foo type1', 'bar type2'], 'foo_primary', is_temporary=True) + + mocked_load_into_sf.assert_called_with( + target, args, columns_diff, maped_column_types_to_target['primary_key'], + s3_key_pattern, file_size, f" WHERE {test_table[1]['column']} >= '{test_table[1]['start_value']}'") + + if end_value: + mocked_save_state.assert_not_called() + else: + mocked_save_state.assert_called_with('state.json', table_name, bookmark) diff --git a/tests/units/partialsync/utils.py b/tests/units/partialsync/utils.py index 762623ef1..176d61f2a 100644 --- a/tests/units/partialsync/utils.py +++ b/tests/units/partialsync/utils.py @@ -1,18 +1,14 @@ import json import os -from argparse import Namespace -from unittest.mock import patch - -from pipelinewise.fastsync.partialsync import mysql_to_snowflake, postgres_to_snowflake -from pipelinewise.fastsync.partialsync.utils import parse_args_for_partial_sync - # pylint: disable=too-many-instance-attributes, too-few-public-methods class PartialSync2SFArgs: """Arguments for using in mysql to snowflake tests""" + # pylint: disable=too-many-arguments def __init__(self, temp_test_dir, table='email', - start_value='FOO_START', end_value='FOO_END', state='state.json', hard_delete=None): + start_value='FOO_START', end_value='FOO_END', state='state.json', + hard_delete=None, drop_target_table=False): resources_dir = f'{os.path.dirname(__file__)}/resources' config_dir = f'{resources_dir}/test_partial_sync' tap_config = self._load_json_config(f'{config_dir}/target_snowflake/tap_mysql/config.json') @@ -32,6 +28,7 @@ def __init__(self, temp_test_dir, table='email', self.temp_dir = temp_test_dir self.properties = properties_config self.state = state + self.drop_target_table = drop_target_table @staticmethod def _load_json_config(file_name): @@ -39,28 +36,8 @@ def _load_json_config(file_name): return json.load(config_file) -def run_mysql_to_snowflake(arguments_dict: dict) -> Namespace: - """Running the mysql_to_snowflake module""" - - argv_list = _get_argv_list(arguments_dict) - with patch('sys.argv', argv_list): - args = parse_args_for_partial_sync(mysql_to_snowflake.REQUIRED_CONFIG_KEYS) - mysql_to_snowflake.main() - - return args - - -def run_postgres_to_snowflake(arguments_dict: dict) -> Namespace: - """Running PS to SF""" - argv_list = _get_argv_list(arguments_dict) - with patch('sys.argv', argv_list): - args = parse_args_for_partial_sync(postgres_to_snowflake.REQUIRED_CONFIG_KEYS) - postgres_to_snowflake.main() - - return args - - -def _get_argv_list(arguments_dict): +def get_argv_list(arguments_dict): + """Get list of argv""" argv_list = ['main'] if arguments_dict.get('tap'): argv_list.extend(['--tap', arguments_dict['tap']])