Skip to content

Commit

Permalink
fix exclude_fields in sqlite
Browse files Browse the repository at this point in the history
  • Loading branch information
eloyfelix committed Jan 17, 2025
1 parent 793f0ea commit c20446c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 25 deletions.
70 changes: 46 additions & 24 deletions cbl_migrator/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ def fill_table(o_eng_conn, d_eng_conn, table, chunk_size):
Skips if destination already has the same row count.
Makes partial reads/writes depending on PK presence.
"""
logger.info(f"Migrating {table.name} table")
logger.info(f"Starting migration of table '{table.name}'")
d_eng = create_engine(d_eng_conn)
o_eng = create_engine(o_eng_conn)

# Adjust identifier length if necessary
if d_eng.name == "mysql":
o_eng.dialect.max_identifier_length = 64
if o_eng.dialect.max_identifier_length > d_eng.dialect.max_identifier_length:
logger.warning(
f"Adjusting identifier length from {o_eng.dialect.max_identifier_length} "
f"to {d_eng.dialect.max_identifier_length}"
)
o_eng.dialect.max_identifier_length = d_eng.dialect.max_identifier_length

pks = [c for c in table.primary_key.columns]
Expand All @@ -91,13 +95,17 @@ def fill_table(o_eng_conn, d_eng_conn, table, chunk_size):
raise

if count == d_count:
logger.info(f"{table.name} already matches origin row count. Skipping.")
logger.info(
f"Table '{table.name}' already matches origin row count ({count} rows). Skipping."
)
return True
elif count != d_count and d_count != 0:
logger.info(f"Resuming migration of '{table.name}' from last ID")
q = select(pk).order_by(pk.desc()).limit(1)
with d_eng.connect() as conn:
last_id = conn.scalar(q)
else:
logger.info(f"Starting fresh migration of '{table.name}' ({count} rows)")
last_id = None

# Multi or single PK copy
Expand All @@ -107,7 +115,7 @@ def fill_table(o_eng_conn, d_eng_conn, table, chunk_size):
offset = d_count if last_id else 0
chunked_copy_multi_pk(table, pks, count, offset, chunk_size, o_eng, d_eng)

logger.info(f"{table.name} table filled")
logger.info(f"Successfully completed migration of table '{table.name}'")
return True


Expand Down Expand Up @@ -139,7 +147,6 @@ def __init__(
self.d_eng_conn = d_conn_string
self.n_cores = n_workers
self.exclude_fields = {f.split(".")[0]: f.split(".")[1] for f in exclude_fields}
print(self.exclude_fields)

o_eng = create_engine(self.o_eng_conn)
metadata = MetaData()
Expand Down Expand Up @@ -196,26 +203,36 @@ def __copy_schema(self):
if isinstance(cons, PrimaryKeyConstraint)
]
if d_eng.name == "sqlite":
# Retain all constraints for SQLite
# Retain all constraints for SQLite except those involving excluded fields
excluded_fields = self.exclude_fields.get(table_name, [])

# Unique constraints
uks = insp.get_unique_constraints(table_name)
for uk in uks:
uk_cols = [
c for c in table._columns if c.name in uk["column_names"]
]
keep_constraints.append(UniqueConstraint(*uk_cols, name=uk["name"]))
if not any(col in excluded_fields for col in uk["column_names"]):
uk_cols = [
c for c in table._columns if c.name in uk["column_names"]
]
keep_constraints.append(UniqueConstraint(*uk_cols, name=uk["name"]))

# Foreign key constraints
for fk in [
cons
for cons in table.constraints
if isinstance(cons, ForeignKeyConstraint)
]:
keep_constraints.append(fk)
if not any(col.name in excluded_fields for col in fk.columns):
keep_constraints.append(fk)

# Check constraints
for cc in [
cons
for cons in table.constraints
if isinstance(cons, CheckConstraint)
]:
cc.sqltext = TextClause(str(cc.sqltext).replace('"', ""))
keep_constraints.append(cc)
if not any(col in str(cc.sqltext) for col in excluded_fields):
cc.sqltext = TextClause(str(cc.sqltext).replace('"', ""))
keep_constraints.append(cc)
table.constraints = set(keep_constraints)
table.indexes = set()

Expand Down Expand Up @@ -380,6 +397,12 @@ def migrate(
copy_indexes (bool): Migrate indexes to destination.
chunk_size (int): Batch size for chunked copying.
"""
logger.info(
f"Starting database migration with settings: schema={copy_schema}, "
f"data={copy_data}, constraints={copy_constraints}, "
f"indexes={copy_indexes}, chunk_size={chunk_size}"
)

o_eng = create_engine(self.o_eng_conn)
d_eng = create_engine(self.d_eng_conn)

Expand All @@ -397,9 +420,9 @@ def migrate(
raise Exception("Origin SQLite database doesn't exist or is too small")

if copy_schema:
logger.info("Copying schema")
logger.info("Starting schema copy")
self.__copy_schema()
logger.info("Schema copied")
logger.info("Schema copy completed successfully")

# Fill tables with data
if copy_data:
Expand All @@ -415,6 +438,8 @@ def migrate(

processes = 1 if d_eng.name == "sqlite" else self.n_cores

logger.info(f"Starting data migration using {processes} processes")

with cf.ProcessPoolExecutor(max_workers=processes) as exe:
futures = {
exe.submit(
Expand All @@ -434,20 +459,17 @@ def migrate(
# Validate row counts
all_migrated = not copy_data or self.validate_migration()

# Copy constraints and indexes if all tables migrated
if all_migrated:
logger.info("All tables successfully migrated")
logger.info("Row count validation successful")
if copy_constraints and d_eng.name != "sqlite":
logger.info("Copying constraints")
logger.info("Starting constraint migration")
self.__copy_constraints()
logger.info("Constraints created")
logger.info("Constraint migration completed")
if copy_indexes:
logger.info("Copying indexes")
logger.info("Starting index migration")
self.__copy_indexes()
logger.info("Indexes created")
logger.info("Migration completed")
logger.info("Index migration completed")
logger.info("Database migration completed successfully")
else:
logger.error(
"Table migration validation failed; constraints and indexes not migrated."
)
logger.error("Migration failed: row count validation unsuccessful")
return all_migrated
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "cbl_migrator"
version = "0.3.9"
version = "0.4.0"
description = "Migrates Oracle dbs to PostgreSQL, MySQL and SQLite"
readme = "README.md"
license = { text = "MIT" }
Expand Down

0 comments on commit c20446c

Please sign in to comment.