Skip to content

Commit

Permalink
feat: Switch activate version logic to use sqlalchemy (#225)
Browse files Browse the repository at this point in the history
Closes #176
  • Loading branch information
sebastianswms authored Nov 27, 2023
1 parent a105fc6 commit a60bfe8
Showing 1 changed file with 34 additions and 16 deletions.
50 changes: 34 additions & 16 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,23 @@ def activate_version(self, new_version: int) -> None:
connection=connection,
)

metadata = MetaData()
target_table = Table(
self.table_name,
metadata,
autoload_with=connection.engine,
schema=self.schema_name,
)

self.logger.info("Hard delete: %s", self.config.get("hard_delete"))
if self.config["hard_delete"] is True:
connection.execute(
sqlalchemy.text(
f'DELETE FROM "{self.schema_name}"."{self.table_name}" '
f"WHERE {self.version_column_name} <= {new_version} "
f"OR {self.version_column_name} IS NULL"
delete_stmt = sqlalchemy.delete(target_table).where(
sqlalchemy.or_(
target_table.c[self.version_column_name].is_(None),
target_table.c[self.version_column_name] <= new_version,
)
)
connection.execute(delete_stmt)
return

if not self.connector.column_exists(
Expand All @@ -366,15 +374,25 @@ def activate_version(self, new_version: int) -> None:
connection=connection,
)
# Need to deal with the case where data doesn't exist for the version column
query = sqlalchemy.text(
f'UPDATE "{self.schema_name}"."{self.table_name}"\n'
f"SET {self.soft_delete_column_name} = :deletedate \n"
f"WHERE {self.version_column_name} < :version "
f"OR {self.version_column_name} IS NULL \n"
f" AND {self.soft_delete_column_name} IS NULL\n"
)
query = query.bindparams(
bindparam("deletedate", value=deleted_at, type_=datetime_type),
bindparam("version", value=new_version, type_=integer_type),
update_stmt = (
update(target_table)
.values(
{
target_table.c[self.soft_delete_column_name]: bindparam(
"deletedate"
)
}
)
.where(
sqlalchemy.and_(
sqlalchemy.or_(
target_table.c[self.version_column_name]
< bindparam("version"),
target_table.c[self.version_column_name].is_(None),
),
target_table.c[self.soft_delete_column_name].is_(None),
)
)
)
connection.execute(query)
bind_params = {"deletedate": deleted_at, "version": new_version}
connection.execute(update_stmt, bind_params)

0 comments on commit a60bfe8

Please sign in to comment.