Skip to content

Commit

Permalink
[AP-832] Defined partial sync (#1054)
Browse files Browse the repository at this point in the history
* implementing defined partial sync
  • Loading branch information
amofakhar authored Feb 2, 2023
1 parent 3e256b7 commit 22f75d5
Show file tree
Hide file tree
Showing 23 changed files with 1,218 additions and 395 deletions.
7 changes: 7 additions & 0 deletions docs/connectors/taps/mysql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ Example YAML for ``tap-mysql``:
- table_name: "table_two"
replication_method: "LOG_BASED" # Important! Log based must be enabled in MySQL
- table_name: "table_three"
replication_method: "LOG_BASED"
sync_start_from: # Optional, applies for then first sync and fast sync
column: "column_name" # column name to be picked for partial sync with incremental or timestamp value
value: "start_value" # The first sync always starts from column >= value
drop_target_table: true # Optional, drops target table before syncing. default value is false
# You can add as many schemas as you need...
# Uncomment this if you want replicate tables from multiple schemas
#- source_schema: "another_schema_in_mysql"
Expand Down
7 changes: 7 additions & 0 deletions docs/connectors/taps/postgres.rst
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ Example YAML for ``tap-postgres``:
- table_name: "table_two"
replication_method: "LOG_BASED" # Important! Log based must be enabled in MySQL
- table_name: "table_three"
replication_method: "LOG_BASED"
sync_start_from: # Optional, applies for then first sync and fast sync
column: "column_name" # column name to be picked for partial sync with inremental or timestamp value
value: "start_value" # The first sync always starts from column >= value
drop_target_table: true # Optional, drops target table before syncing. default value is false
# You can add as many schemas as you need...
# Uncomment this if you want replicate tables from multiple schemas
#- source_schema: "another_schema_in_postgres"
Expand Down
6 changes: 6 additions & 0 deletions docs/user_guide/resync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ add the ``--tables`` argument:
list of table names using the ``<schema_name>.<table_name>`` format. Schema and
table names have to be the names in the source database.

.. warning::

Based on the tap setting, tables can be fully synced or partial synced if they are defined
as partial synced.
Currently this option is available only for :ref:`tap-mysql` and :ref:`tap-postgres` to Snowflake.

2. **Partial resync**

If you want to partial resync a table from a specific tap then use the ``partial_sync_table`` command
Expand Down
8 changes: 5 additions & 3 deletions pipelinewise/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def build_singer_command(


# pylint: disable=too-many-arguments
def build_fastsync_partial_command(
def build_partialsync_command(
tap: TapParams,
target: TargetParams,
transform: TransformParams,
Expand All @@ -364,7 +364,8 @@ def build_fastsync_partial_command(
table: str,
column: str,
start_value: str,
end_value: str = None
end_value: str = None,
drop_target_table: str = None

):
"""Builds a command that starts a partial sync"""
Expand All @@ -387,7 +388,8 @@ def build_fastsync_partial_command(
f'--table "{table}"',
f'--column "{column}"',
f'--start_value "{start_value}"',
f'--end_value "{end_value}"' if end_value else None
f'--end_value "{end_value}"' if end_value else None,
f'--drop_target_table {drop_target_table}' if drop_target_table else None
],
)
)
Expand Down
3 changes: 3 additions & 0 deletions pipelinewise/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ def generate_selection(cls, tap: Dict) -> List[Dict]:
schema_name = schema.get('source_schema')
for table in schema.get('tables', []):
table_name = table.get('table_name')
sync_start_from = table.get('sync_start_from')
replication_method = table.get(
'replication_method', utils.get_tap_default_replication_method(tap)
)
Expand All @@ -340,6 +341,8 @@ def generate_selection(cls, tap: Dict) -> List[Dict]:
# Add replication_key only if replication_method is INCREMENTAL
'replication_key': table.get('replication_key')
if replication_method == 'INCREMENTAL' else None,
'sync_start_from': sync_start_from
if sync_start_from else None
}
)
)
Expand Down
30 changes: 30 additions & 0 deletions pipelinewise/cli/multiprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import multiprocessing
import traceback


class Process(multiprocessing.Process):
"""
This is an extension of Process to let catching raised exceptions inside the
process.
"""
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None

def run(self):
try:
multiprocessing.Process.run(self)
self._cconn.send(None)
except Exception as exp:
t_b = traceback.format_exc()
self._cconn.send((exp, t_b))

@property
def exception(self):
"""
Returning exception of the process
"""
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
Loading

0 comments on commit 22f75d5

Please sign in to comment.