Skip to content

Commit

Permalink
fix: Use a SQLAlchemy to generate an insert statement (#2843)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Jan 28, 2025
1 parent c81fa53 commit a4f663e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
6 changes: 5 additions & 1 deletion singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,10 @@ def create_engine(self) -> Engine:
pool_pre_ping=True,
)

@deprecated(
"This method is deprecated. Use or override `FullyQualifiedName` instead.",
category=SingerSDKDeprecationWarning,
)
def quote(self, name: str) -> str:
"""Quote a name if it needs quoting, using '.' as a name-part delimiter.
Expand All @@ -853,7 +857,7 @@ def quote(self, name: str) -> str:
Returns:
str: The quoted name.
"""
return ".".join(
return ".".join( # pragma: no cover
[
self._dialect.identifier_preparer.quote(name_part)
for name_part in name.split(".")
Expand Down
43 changes: 28 additions & 15 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import re
import typing as t
import warnings
from collections import defaultdict
from copy import copy
from textwrap import dedent

import sqlalchemy as sa
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import insert
from sqlalchemy.sql.expression import bindparam

from singer_sdk.connectors import SQLConnector
Expand Down Expand Up @@ -282,19 +282,26 @@ def generate_insert_statement(
Returns:
An insert statement.
"""
property_names = list(self.conform_schema(schema)["properties"].keys())
column_identifiers = [
self.connector.quote(quoted_name(name, quote=True))
for name in property_names
]
statement = dedent(
f"""\
INSERT INTO {full_table_name}
({", ".join(column_identifiers)})
VALUES ({", ".join([f":{name}" for name in property_names])})
""",
conformed_schema = self.conform_schema(schema)
property_names = list(conformed_schema["properties"])

_, schema_name, table_name = self.connector.parse_full_table_name(
full_table_name
)
return statement.rstrip()

table = sa.Table(
table_name,
sa.MetaData(),
*[
sa.Column(
name, sa.String
) # Assuming all columns are of type String for simplicity # noqa: E501
for name in property_names
],
schema=schema_name,
)

return insert(table)

def bulk_insert_records(
self,
Expand All @@ -321,7 +328,13 @@ def bulk_insert_records(
full_table_name,
schema,
)
if isinstance(insert_sql, str):
if isinstance(insert_sql, str): # pragma: no cover
warnings.warn(
"Generating a SQL insert statement as a string is deprecated. "
"Please return an SQLAlchemy Executable object instead.",
DeprecationWarning,
stacklevel=2,
)
insert_sql = sa.text(insert_sql)

conformed_records = [self.conform_record(record) for record in records]
Expand Down
16 changes: 9 additions & 7 deletions tests/core/sinks/test_sql_sink.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

import typing as t
from textwrap import dedent

import pytest
from sqlalchemy.sql import Insert

from samples.sample_duckdb import DuckDBConnector
from singer_sdk.sinks.sql import SQLSink
Expand Down Expand Up @@ -55,10 +55,12 @@ def sink(self, target: DuckDBTarget, schema: dict) -> DuckDBSink:

def test_generate_insert_statement(self, sink: DuckDBSink, schema: dict):
"""Test that the insert statement is generated correctly."""
expected = dedent(
"""\
INSERT INTO foo
(id, col_ts, "table")
VALUES (:id, :col_ts, :table)"""
stmt = sink.generate_insert_statement("foo", schema=schema)
assert isinstance(stmt, Insert)
assert stmt.table.name == "foo"
assert stmt.table.columns.keys() == ["id", "col_ts", "table"]

# Rendered SQL should look like:
assert str(stmt) == (
'INSERT INTO foo (id, col_ts, "table") VALUES (:id, :col_ts, :table)'
)
assert sink.generate_insert_statement("foo", schema=schema) == expected
9 changes: 2 additions & 7 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,7 @@ def test_record_with_missing_properties(
},
},
[],
dedent(
"""\
INSERT INTO test_stream
(id, name, "table")
VALUES (:id, :name, :table)""",
),
'INSERT INTO test_stream (id, name, "table") VALUES (:id, :name, :table)',
),
],
ids=[
Expand All @@ -676,7 +671,7 @@ def test_sqlite_generate_insert_statement(
sink.full_table_name,
sink.schema,
)
assert dml == expected_dml
assert str(dml) == expected_dml


def test_hostile_to_sqlite(
Expand Down

0 comments on commit a4f663e

Please sign in to comment.