diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 58a6316..b39f41c 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -17,7 +17,7 @@ import simplejson import sqlalchemy as sa from singer_sdk import SQLConnector -from singer_sdk import typing as th +from singer_sdk.connectors.sql import JSONSchemaToSQL from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, BYTEA, JSONB, UUID from sqlalchemy.engine import URL from sqlalchemy.engine.url import make_url @@ -214,6 +214,43 @@ def clone_table( new_table.create(bind=connection) return new_table + def _handle_array_type(self, jsonschema: dict) -> ARRAY | JSONB: + """Handle array type.""" + items = jsonschema.get("items") + # Case 1: items is a string + if isinstance(items, str): + return ARRAY(self.to_sql_type({"type": items})) + + # Case 2: items are more complex + if isinstance(items, dict): + # Case 2.1: items are variants + if "type" not in items: + return ARRAY(JSONB()) + + items_type = items["type"] + + # Case 2.2: items are a single type + if isinstance(items_type, str): + return ARRAY(self.to_sql_type({"type": items_type})) + + # Case 2.3: items are a list of types + if isinstance(items_type, list): + return ARRAY(self.to_sql_type({"type": items_type})) + + # Case 3: tuples + return ARRAY(JSONB()) if isinstance(items, list) else JSONB() + + @cached_property + def jsonschema_to_sql(self) -> JSONSchemaToSQL: + """Return a JSONSchemaToSQL instance with custom type handling.""" + to_sql = JSONSchemaToSQL() + to_sql.register_type_handler("integer", BIGINT) + to_sql.register_type_handler("object", JSONB) + to_sql.register_type_handler("array", self._handle_array_type) + to_sql.register_format_handler("date-time", TIMESTAMP) + to_sql.register_format_handler("uuid", UUID) + return to_sql + def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: """Return a JSON Schema representation of the provided type. @@ -270,7 +307,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array) - def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911 + def pick_individual_type(self, jsonschema_type: dict): """Select the correct sql type assuming jsonschema_type has only a single type. Args: @@ -281,46 +318,25 @@ def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911 """ if "null" in jsonschema_type["type"]: return None - if "integer" in jsonschema_type["type"]: - return BIGINT() - if "object" in jsonschema_type["type"]: - return JSONB() - if "array" in jsonschema_type["type"]: - items = jsonschema_type.get("items") - # Case 1: items is a string - if isinstance(items, str): - return ARRAY(self.to_sql_type({"type": items})) - - # Case 2: items are more complex - if isinstance(items, dict): - # Case 2.1: items are variants - if "type" not in items: - return ARRAY(JSONB()) - - items_type = items["type"] - - # Case 2.2: items are a single type - if isinstance(items_type, str): - return ARRAY(self.to_sql_type({"type": items_type})) - - # Case 2.3: items are a list of types - if isinstance(items_type, list): - return ARRAY(self.to_sql_type({"type": items_type})) - - # Case 3: tuples - return ARRAY(JSONB()) if isinstance(items, list) else JSONB() + # if "integer" in jsonschema_type["type"]: + # return BIGINT() + # if "object" in jsonschema_type["type"]: + # return JSONB() + # if "array" in jsonschema_type["type"]: + # return self._handle_array_type(jsonschema_type) # string formats - if jsonschema_type.get("format") == "date-time": - return TIMESTAMP() - if jsonschema_type.get("format") == "uuid": - return UUID() + # if jsonschema_type.get("format") == "date-time": + # return TIMESTAMP() + # if jsonschema_type.get("format") == "uuid": + # return UUID() if ( self.interpret_content_encoding and jsonschema_type.get("contentEncoding") == "base16" ): return HexByteString() - individual_type = th.to_sql_type(jsonschema_type) + + individual_type = self.jsonschema_to_sql.to_sql_type(jsonschema_type) return TEXT() if isinstance(individual_type, VARCHAR) else individual_type @staticmethod