Skip to content

Commit

Permalink
[AP-742] tap-mysql: detect schema changes during log_based (#495)
Browse files Browse the repository at this point in the history
* tap-mysql: Detect new/renamed column during log_based

* Detect new supported columns during tap-mysql log_based runtime
  • Loading branch information
Samira-El authored Aug 5, 2020
1 parent f043979 commit 19ebb6a
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 11 deletions.
2 changes: 1 addition & 1 deletion singer-connectors/tap-mysql/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pipelinewise-tap-mysql==1.3.2
pipelinewise-tap-mysql==1.3.4
18 changes: 13 additions & 5 deletions tests/end_to_end/helpers/assertions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import re

from typing import List
from typing import List, Set, Union
from pathlib import Path

from . import tasks
Expand Down Expand Up @@ -172,13 +172,17 @@ def assert_row_counts_equal(tap_query_runner_fn: callable, target_query_runner_f
# pylint: disable=too-many-locals
def assert_all_columns_exist(tap_query_runner_fn: callable,
target_query_runner_fn: callable,
colum_type_mapper_fn: callable = None) -> None:
column_type_mapper_fn: callable = None,
ignore_cols: Union[Set, List] = None) -> None:
"""Takes two query runner methods, gets the columns list for every table in both the
source and target database and tests if every column in source exists in the target database.
Some taps have unsupported column types and these are not part of the schemas published to the target thus
target table doesn't have such columns.
:param tap_query_runner_fn: method to run queries in the first connection
:param target_query_runner_fn: method to run queries in the second connection
:param colum_type_mapper_fn: method to convert source to target column types"""
:param column_type_mapper_fn: method to convert source to target column types
:param ignore_cols: List or set of columns to ignore if we know target table won't have them"""
# Generate a map of source and target specific functions
funcs = _map_tap_to_target_functions(tap_query_runner_fn, target_query_runner_fn)

Expand Down Expand Up @@ -228,17 +232,21 @@ def _cols_list_to_dict(cols: List) -> dict:
print(target_cols_dict)
for col_name, col_props in source_cols_dict.items():
# Check if column exists in the target table

if ignore_cols and col_name in ignore_cols:
continue

try:
assert col_name in target_cols_dict
except AssertionError as ex:
ex.args += ('Error', f'{col_name} column not found in target table {table_to_check}')
raise

# Check if column type is expected in the target table, if mapper function provided
if colum_type_mapper_fn:
if column_type_mapper_fn:
try:
target_col = target_cols_dict[col_name]
exp_col_type = colum_type_mapper_fn(col_props['type'], col_props['type_extra'])\
exp_col_type = column_type_mapper_fn(col_props['type'], col_props['type_extra'])\
.replace(' NULL', '').lower()
act_col_type = target_col['type'].lower()
assert act_col_type == exp_col_type
Expand Down
12 changes: 10 additions & 2 deletions tests/end_to_end/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def test_replicate_mariadb_to_pg(self, tap_mariadb_id=TAP_MARIADB_ID):
# 2. Make changes in MariaDB source database
# LOG_BASED
self.run_query_tap_mysql('UPDATE weight_unit SET isactive = 0 WHERE weight_unit_id IN (2, 3, 4)')
self.run_query_tap_mysql('ALTER table weight_unit add column bool_col bool;')
self.run_query_tap_mysql('INSERT into weight_unit(weight_unit_name, isActive, original_date_created, bool_col) '
'values (\'Oz\', false, \'2020-07-23 10:00:00\', true);')
self.run_query_tap_mysql('ALTER table weight_unit add column blob_col blob;')
self.run_query_tap_mysql('INSERT into weight_unit(weight_unit_name, isActive, original_date_created, blob_col) '
'values (\'Oz\', false, \'2020-07-23 10:00:00\', \'blob content\');')
self.run_query_tap_mysql('ALTER table weight_unit change column bool_col is_imperial bool;')
self.run_query_tap_mysql('UPDATE weight_unit SET is_imperial = false WHERE weight_unit_name like \'%oz%\'')

# INCREMENTAL
self.run_query_tap_mysql('INSERT INTO address(isactive, street_number, date_created, date_updated,'
Expand All @@ -89,7 +97,7 @@ def test_replicate_mariadb_to_pg(self, tap_mariadb_id=TAP_MARIADB_ID):
assertions.assert_run_tap_success(tap_mariadb_id, TARGET_ID, ['fastsync', 'singer'])
assertions.assert_row_counts_equal(self.run_query_tap_mysql, self.run_query_target_postgres)
assertions.assert_all_columns_exist(self.run_query_tap_mysql, self.run_query_target_postgres,
mysql_to_postgres.tap_type_to_target_type)
mysql_to_postgres.tap_type_to_target_type, {'blob_col'})

@pytest.mark.dependency(depends=['import_config'])
def test_resync_mariadb_to_pg(self, tap_mariadb_id=TAP_MARIADB_ID):
Expand All @@ -105,7 +113,7 @@ def test_resync_mariadb_to_pg(self, tap_mariadb_id=TAP_MARIADB_ID):
def test_replicate_mariadb_to_pg_with_custom_buffer_size(self):
"""Replicate data from MariaDB to Postgres DWH with custom buffer size
Same tests cases as test_replicate_mariadb_to_pg but using another tap with custom stream buffer size"""
self.test_replicate_mariadb_to_pg(tap_mariadb_id=TAP_MARIADB_BUFFERED_STREAM_ID)
self.test_resync_mariadb_to_pg(tap_mariadb_id=TAP_MARIADB_BUFFERED_STREAM_ID)

@pytest.mark.dependency(depends=['import_config'])
def test_replicate_pg_to_pg(self):
Expand Down
6 changes: 3 additions & 3 deletions tests/end_to_end/test_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_replicate_mariadb_to_sf(self, tap_mariadb_id=TAP_MARIADB_ID):
assertions.assert_run_tap_success(tap_mariadb_id, TARGET_ID, ['fastsync', 'singer'])
assertions.assert_row_counts_equal(self.run_query_tap_mysql, self.run_query_target_snowflake)
assertions.assert_all_columns_exist(self.run_query_tap_mysql, self.e2e.run_query_target_snowflake,
mysql_to_snowflake.tap_type_to_target_type)
mysql_to_snowflake.tap_type_to_target_type, {'blob_col'})

@pytest.mark.dependency(depends=['import_config'])
def test_resync_mariadb_to_sf(self, tap_mariadb_id=TAP_MARIADB_ID):
Expand All @@ -100,9 +100,9 @@ def test_resync_mariadb_to_sf(self, tap_mariadb_id=TAP_MARIADB_ID):

# pylint: disable=invalid-name
@pytest.mark.dependency(depends=['import_config'])
def test_replicate_mariadb_to_pg_with_custom_buffer_size(self):
def test_replicate_mariadb_to_sf_with_custom_buffer_size(self):
"""Replicate data from MariaDB to Snowflake with custom buffer size
Same tests cases as test_replicate_mariadb_to_pg but using another tap with custom stream buffer size"""
Same tests cases as test_replicate_mariadb_to_sf but using another tap with custom stream buffer size"""
self.test_replicate_mariadb_to_sf(tap_mariadb_id=TAP_MARIADB_BUFFERED_STREAM_ID)

@pytest.mark.dependency(depends=['import_config'])
Expand Down

0 comments on commit 19ebb6a

Please sign in to comment.