Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql_table() query_adapter_callback function with custom extraction query fails with KeyError during pipeline.extract() (pyarrow backend) #2374

Open
acaruso7 opened this issue Mar 5, 2025 · 1 comment
Assignees
Labels
bug Something isn't working question Further information is requested

Comments

@acaruso7
Copy link

acaruso7 commented Mar 5, 2025

dlt version

1.5.0

Describe the problem

https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1740596179916109

I am using sql_table() source with pyarrow backend to extract data from a mysql db with a custom extraction query. The custom extraction query is passed to query_adapter_callback as a string, and selects a subset of columns from source table, as well as some additional derived columns (example below)

Upon calling pipeline.extract(), a dlt.extract.exceptions.ResourceExtractionError is thrown. The stack trace indicates a KeyError for one of the table columns which is not defined in custom extraction query result set

More details with full reproducable example and stack trace below

Note: I am following along with the example in the documentation here

Expected behavior

query_adapter_callback with custom extraction query should extract only the columns defined in query into pyarrow table. KeyError should not occur

Steps to reproduce

Full reproducible example using public mysql db and duckdb destination:

import functools

import dlt
from dlt.sources.sql_database import sql_table

import sqlalchemy as sa
from sqlalchemy import create_engine, engine


mysql_engine = create_engine(
    engine.URL.create(
        drivername="mysql+mysqldb",
        host="mysql-rfam-public.ebi.ac.uk",
        port=4497,
        database="Rfam",
        username="rfamro",
        password="",
    )
)

pipeline = dlt.pipeline(
    pipeline_name="test",
    destination="duckdb",
)


def query_adapter_callback(
    query, table, incremental=None, engine=None, custom_extract_query_override=None
) -> sa.sql.elements.TextClause:
    if custom_extract_query_override:
        t_query = sa.text(custom_extract_query_override)
    else:
        t_query = s.text(query)

    return t_query


custom_extract_query_override = """
select
	  author_id
	, name
        , UPPER(name) as my_custom_column
	-- other columns deliberately excluded
from author;
"""

table_source = sql_table(
    credentials=mysql_engine,
    table="author",
    backend="pyarrow",
    query_adapter_callback=functools.partial(
        query_adapter_callback,
        custom_extract_query_override=custom_extract_query_override,
    ),
)

pipeline.extract(table_source)

Full stack trace:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/pipe_iterator.py", line 277, in _get_source_item
    pipe_item = next(gen)
  File "/usr/local/lib/python3.10/site-packages/dlt/sources/sql_database/helpers.py", line 301, in table_rows
    yield from loader.load_rows(backend_kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/sources/sql_database/helpers.py", line 178, in load_rows
    yield from self._load_rows(query, backend_kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/sources/sql_database/helpers.py", line 200, in _load_rows
    yield row_tuples_to_arrow(
  File "/usr/local/lib/python3.10/site-packages/dlt/common/configuration/inject.py", line 247, in _wrap
    return f(*bound_args.args, **bound_args.kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/sources/sql_database/arrow_helpers.py", line 22, in row_tuples_to_arrow
    return _row_tuples_to_arrow(
  File "/usr/local/lib/python3.10/site-packages/dlt/common/libs/pyarrow.py", line 615, in row_tuples_to_arrow
    columnar_known_types = {
  File "/usr/local/lib/python3.10/site-packages/dlt/common/libs/pyarrow.py", line 616, in <dictcomp>
    col["name"]: columnar[col["name"]]
KeyError: 'initials'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 471, in extract
    self._extract_source(
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 1239, in _extract_source
    load_id = extract.extract(
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/extract.py", line 421, in extract
    self._extract_single_source(
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/extract.py", line 344, in _extract_single_source
    for pipe_item in pipes:
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/pipe_iterator.py", line 162, in __next__
    pipe_item = self._get_source_item()
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/pipe_iterator.py", line 307, in _get_source_item
    raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex
dlt.extract.exceptions.ResourceExtractionError: In processing pipe author: extraction of resource author in generator table_rows caused an exception: 'initials'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/opt/prefect/flows/subflows/repro_dlt_query_adapter_issue.py", line 57, in <module>
    pipeline.extract(table_source)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 180, in _wrap
    rv = f(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 166, in _wrap
    return f(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 492, in extract
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage extract when processing package 1741133041.594346 with exception:

<class 'dlt.extract.exceptions.ResourceExtractionError'>
In processing pipe author: extraction of resource author in generator table_rows caused an exception: 'initials'

I dug into the internals of the arrow_helpers.row_tuples_to_arrow function a bit and noticed some odd behavior

def row_tuples_to_arrow(

The value of the columns variable is

{'author_id': {'name': 'author_id', 'nullable': False, 'data_type': 'bigint'}, 'name': {'name': 'name', 'nullable': False, 'data_type': 'text'}, 'last_name': {'name': 'last_name', 'nullable': True, 'data_type': 'text'}, 'initials': {'name': 'initials', 'nullable': True, 'data_type': 'text'}, 'orcid': {'name': 'orcid', 'nullable': True, 'data_type': 'text'}, 'synonyms': {'name': 'synonyms', 'nullable': True, 'data_type': 'text'}, 'my_custom_column': {'name': 'my_custom_column'}}

which is not what I would expect. This output contains all columns from the source table, as well as my additional derived column, my_custom_column

The value of len(rows[0]) is 3, which is what I would expect - 3 columns in results, which aligns to my custom extraction query

The value of columnar.keys() is dict_keys(['author_id', 'name', 'last_name']) - not what I would expect. last_name is not included in the result set of my custom query

It seems something may be going wrong with the operation here, where columns are zipped with rows

columnar = {
col: dat.ravel() for col, dat in zip(columns, np.vsplit(pivoted_rows, len(pivoted_rows)))
}

This may lead to a KeyError later

Operating system

Linux

Runtime environment

Docker, Docker Compose

Python version

3.10

dlt data source

sql_table() with pyarrow backend

dlt destination

No response

Other deployment details

No response

Additional information

Separately from the issue described here, I wonder whether I am going about this the right way. Is this the preferred approach for extracting custom column sets (including derived columns) from source tables?

An additional requirement I should mention for my own use case is that the custom extraction query must be passed the the query_adapter_callback as a raw SQL string - it cannot be built using native sqlalchemy syntax as I am re-using the same query elsewhere

@acaruso7 acaruso7 changed the title sql_table() query_adapter_callback function with custom extraction query fails with KeyError (pyarrow backend) sql_table() query_adapter_callback function with custom extraction query fails with KeyError during pipeline.extract() (pyarrow backend) Mar 5, 2025
@rudolfix rudolfix added the bug Something isn't working label Mar 9, 2025
@rudolfix rudolfix self-assigned this Mar 9, 2025
@rudolfix rudolfix added the question Further information is requested label Mar 9, 2025
@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Mar 9, 2025
@rudolfix
Copy link
Collaborator

rudolfix commented Mar 9, 2025

@acaruso7 are you able to check the newest version of dlt? we added a handler for your case:

yield row_tuples_to_arrow(
                        partition,
                        columns=_add_missing_columns(self.columns, columns),
                        tz=backend_kwargs.get("tz", "UTC"),
                    )

_add_missing_columns will add missing columns according to your cursor. then we'll try to derive the data types from data (using pyarrow)

the correct way of implementing this is to use table_adapter and add data types explicitly. for example:

def _add_change_tracking_columns(table: Table) -> None:
        required_columns = [
            ("_dlt_sys_change_version", sa.BigInteger, {"nullable": True}),
            ("_dlt_deleted", sa.Text, {"default": None, "nullable": True}),
        ]

        for col_name, col_type, col_kwargs in required_columns:
            if col_name not in table.c:
                table.append_column(
                    sa.Column(col_name, col_type, **col_kwargs)  # type: ignore[arg-type]
                )

defines two additional columns that are computed columns added by the query adapter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working question Further information is requested
Projects
Status: In Progress
Development

No branches or pull requests

2 participants