diff --git a/target_postgres/connector.py b/target_postgres/connector.py index b50e1cae..127048e6 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -118,13 +118,22 @@ def prepare_table( # type: ignore[override] table = meta.tables[ full_table_name ] # So we don't mess up the casing of the Table reference + + columns = self.get_table_columns( + schema_name=schema_name, table_name=table_name, connection=connection + ) + for property_name, property_def in schema["properties"].items(): + column_object = None + if property_name in columns: + column_object = columns[property_name] self.prepare_column( schema_name=cast(str, schema_name), table=table, column_name=property_name, sql_type=self.to_sql_type(property_def), connection=connection, + column_object=column_object, ) return meta.tables[full_table_name] @@ -366,6 +375,7 @@ def prepare_column( # type: ignore[override] column_name: str, sql_type: sqlalchemy.types.TypeEngine, connection: sqlalchemy.engine.Connection, + column_object: sqlalchemy.Column | None = None, ) -> None: """Adapt target table to provided schema if possible. @@ -376,7 +386,11 @@ def prepare_column( # type: ignore[override] sql_type: the SQLAlchemy type. connection: the database connection. """ - if not self.column_exists(table.fullname, column_name, connection=connection): + column_exists = column_object is not None or self.column_exists( + table.fullname, column_name, connection=connection + ) + + if not column_exists: self._create_empty_column( # We should migrate every function to use sqlalchemy.Table # instead of having to know what the function wants @@ -394,6 +408,7 @@ def prepare_column( # type: ignore[override] column_name=column_name, sql_type=sql_type, connection=connection, + column_object=column_object, ) def _create_empty_column( # type: ignore[override] @@ -466,6 +481,7 @@ def _adapt_column_type( # type: ignore[override] column_name: str, sql_type: sqlalchemy.types.TypeEngine, connection: sqlalchemy.engine.Connection, + column_object: sqlalchemy.Column | None, ) -> None: """Adapt table column type to support the new JSON schema type. @@ -477,12 +493,16 @@ def _adapt_column_type( # type: ignore[override] Raises: NotImplementedError: if altering columns is not supported. """ - current_type: sqlalchemy.types.TypeEngine = self._get_column_type( - schema_name=schema_name, - table_name=table_name, - column_name=column_name, - connection=connection, - ) + current_type: sqlalchemy.types.TypeEngine + if column_object is not None: + current_type = t.cast(sqlalchemy.types.TypeEngine, column_object.type) + else: + current_type = self._get_column_type( + schema_name=schema_name, + table_name=table_name, + column_name=column_name, + connection=connection, + ) # remove collation if present and save it current_type_collation = self.remove_collation(current_type)