diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index 3b0b772..12a48cb 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -181,11 +181,15 @@ def prepare_column( if '"' in formatter.format_collation(column_name): column_name = column_name.upper() - super().prepare_column( - full_table_name, - column_name, - sql_type, - ) + try: + super().prepare_column( + full_table_name, + column_name, + sql_type, + ) + except Exception as e: + self.logger.error(f"Error preparing column for {full_table_name=} {column_name=}") + raise e @staticmethod def get_column_rename_ddl( @@ -558,3 +562,30 @@ def get_initialize_script(role, user, password, warehouse, database): commit; """ + + def _adapt_column_type( + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, + ) -> None: + """Adapt table column type to support the new JSON schema type. + + Args: + full_table_name: The target table name. + column_name: The target column name. + sql_type: The new SQLAlchemy type. + + Raises: + NotImplementedError: if altering columns is not supported. + """ + + try: + super()._adapt_column_type(full_table_name, column_name, sql_type) + except Exception as e: + current_type: sqlalchemy.types.TypeEngine = self._get_column_type( + full_table_name, + column_name, + ) + self.logger.error(f"Error adapting column type for {full_table_name=} {column_name=}, {current_type=} {sql_type=} (new sql type)") + raise e \ No newline at end of file diff --git a/target_snowflake/sinks.py b/target_snowflake/sinks.py index cc4a2e4..8db7294 100644 --- a/target_snowflake/sinks.py +++ b/target_snowflake/sinks.py @@ -77,12 +77,17 @@ def setup(self) -> None: object_type="schema" ), ) - self.connector.prepare_table( - full_table_name=self.full_table_name, - schema=self.conform_schema(self.schema), - primary_keys=self.key_properties, - as_temp_table=False, - ) + try: + self.connector.prepare_table( + full_table_name=self.full_table_name, + schema=self.conform_schema(self.schema), + primary_keys=self.key_properties, + as_temp_table=False, + ) + except Exception as e: + self.logger.error(f"Error creating {self.full_table_name=} {self.conform_schema(self.schema)=}") + raise e + def conform_name( self,