Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableChainFollowupJobCreationFailedException when loading Arrow table to ClickHouse #2248

Open
turtleDev opened this issue Jan 29, 2025 · 2 comments
Assignees
Labels
question Further information is requested

Comments

@turtleDev
Copy link

turtleDev commented Jan 29, 2025

dlt version

1.5.0

Describe the problem

When trying to load data from an arrow source to clickhouse destination using "merge" strategy, dlt raises dlt.load.exceptions.TableChainFollowupJobCreationFailedException exception.

This only occurs in arrow/pandas and clickhouse (source to dest) configuration. Other combinations work without issue.

Here's the full stack trace

2025-01-29 20:41:08,965|[WARNING]|240986|139845918012096|dlt|__init__.py|_check_duplicate_cursor_threshold:584|Large number of records (1000) sharing the same value of cursor field 'date'. This can happen if the cursor field has a low resolution (e.g., only stores dates without times), causing many records to share the same cursor value. Consider using a cursor column with higher resolution to reduce the deduplication state size.
2025-01-29 20:41:08,974|[WARNING]|240986|139845918012096|dlt|extractors.py|_compute_table:445|In resource: arrow_mmap, when merging arrow schema with dlt schema, several column hints were different. dlt schema hints were kept and arrow schema and data were unmodified. It is up to destination to coerce the differences when loading. Change log level to INFO for more details.
Traceback (most recent call last):
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 76, in from_table_chain
    for stmt in cls.generate_sql(table_chain, sql_client, params)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 173, in generate_sql
    merge_sql = cls.gen_merge_sql(table_chain, sql_client)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 569, in gen_merge_sql
    cls._get_row_key_col(table_chain, sql_client, root_table)
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 464, in _get_row_key_col
    raise MergeDispositionException(
dlt.destinations.exceptions.MergeDispositionException: Merge sql job for dataset name `public`.`public`, staging dataset name `public`.`public_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['testdata']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key`, `unique`, or single primary key column (e.g. `_dlt_id`) in table `testdata`.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 349, in create_followup_jobs
    if follow_up_jobs := client.create_table_chain_completed_followup_jobs(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/job_client_impl.py", line 270, in create_table_chain_completed_followup_jobs
    jobs.extend(self._create_merge_followup_jobs(table_chain))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/impl/clickhouse/clickhouse.py", line 223, in _create_merge_followup_jobs
    return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 81, in from_table_chain
    raise SqlJobCreationException(e, table_chain) from e
dlt.destinations.sql_jobs.SqlJobCreationException: Could not create SQLFollowupJob with exception Merge sql job for dataset name `public`.`public`, staging dataset name `public`.`public_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['testdata']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key`, `unique`, or single primary key column (e.g. `_dlt_id`) in table `testdata`.. Table chain: - columns:
    id:
      name: id
      nullable: true
      data_type: bigint
    value:
      name: value
      nullable: true
      data_type: double
    category:
      name: category
      nullable: true
      data_type: text
    nested:
      name: nested
      nullable: true
      data_type: json
    date:
      name: date
      nullable: false
      data_type: text
      merge_key: true
      incremental: true
  name: testdata
  write_disposition: merge
  resource: arrow_mmap
  x-normalizer:
    seen-data: true


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 608, in load
    runner.run_pool(load_step.config, load_step)
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 639, in run
    self.load_single_package(load_id, schema)
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 572, in load_single_package
    running_jobs, finalized_jobs, new_pending_exception = self.complete_jobs(
                                                          ^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 447, in complete_jobs
    self.create_followup_jobs(load_id, state, job, schema)
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 354, in create_followup_jobs
    raise TableChainFollowupJobCreationFailedException(
dlt.load.exceptions.TableChainFollowupJobCreationFailedException: Failed creating table chain followup jobs for table chain with root table testdata.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/arrow-to-clickhouse.py", line 75, in <module>
    load_info = pipeline.run(
                ^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 747, in run
    return self.load(destination, dataset_name, credentials=credentials)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 166, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 615, in load
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1738163468.926135 with exception:

<class 'dlt.load.exceptions.TableChainFollowupJobCreationFailedException'>
Failed creating table chain followup jobs for table chain with root table testdata.

Expected behavior

I expected to be able to produce data from arrow to clickhouse

Steps to reproduce

Here's a self contained script that can reproduce this issue:

import random
import dlt
import dlt.extract
from dlt.destinations.impl.clickhouse.configuration import (
    ClickHouseCredentials   
)
import pandas as pd
import numpy as np
import pyarrow as pa

"""
clickhouse setup

$ docker run -p 8123:8123 -p 9000:9000 clickhouse/clickhouse-server:24.12
$ docker exec -i ${CONTAINER_ID} /usr/bin/env clickhouse-client < _EOF
CREATE DATABASE IF NOT EXISTS "public";
CREATE USER IF NOT EXISTS dlt IDENTIFIED WITH sha256_password BY 'Dlt*12345789234567';
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON public.* TO dlt;
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON default.* TO dlt;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO dlt;
GRANT CREATE TEMPORARY TABLE, S3 ON *.* TO dlt;
_EOF
"""

@dlt.resource(
    name="arrow_mmap",
    merge_key="date", 
    columns={'date': {'merge_key': True, 'name': 'date'}}
)
def arrow_mmap(
    incremental = dlt.sources.incremental(
        "date",
        initial_value=None,
        end_value=None,
        range_end="closed",
        range_start="closed",
    )
):

    row_count = 1000
    df = pd.DataFrame(
        {
            "id": range(row_count),
            "value": np.random.rand(row_count),
            "category": np.random.choice(["A", "B", "C"], size=row_count),
            "nested": [{"a": 1, "b": 2, "c": {"d": 3}}] * row_count,
            "date": ["2024-11-05"] * row_count,
        }
    )
    yield pa.Table.from_pandas(df)


source = arrow_mmap()

credentials = ClickHouseCredentials(
    {
        "host": "localhost",
        "port": "9000",
        "username": "dlt",
        "password": "Dlt*12345789234567",
        "database": "public",
        "http_port": 8123,
        "secure": 0,
    }
)

dest = dlt.destinations.clickhouse(credentials=credentials)

pipeline = dlt.pipeline(
    "".join(random.choices("abcdefghijklmop123456", k=5)),
    destination=dest,
    dataset_name="public",
)

load_info = pipeline.run(
    source,
    table_name="testdata",
    write_disposition="merge",
)

Here's another example that works fine, using duckdb dest:

import random
import dlt
import dlt.extract
import pandas as pd
import numpy as np
import pyarrow as pa

@dlt.resource(
    name="arrow_mmap",
    merge_key="date", 
    columns={'date': {'merge_key': True, 'name': 'date'}}
)
def arrow_mmap(
    incremental = dlt.sources.incremental(
        "date",
        initial_value=None,
        end_value=None,
        range_end="closed",
        range_start="closed",
    )
):

    row_count = 1000
    df = pd.DataFrame(
        {
            "id": range(row_count),
            "value": np.random.rand(row_count),
            "category": np.random.choice(["A", "B", "C"], size=row_count),
            "nested": [{"a": 1, "b": 2, "c": {"d": 3}}] * row_count,
            "date": ["2024-11-05"] * row_count,
        }
    )
    yield pa.Table.from_pandas(df)


source = arrow_mmap()

dest = dlt.destinations.duckdb("duckdb:///out.db")

pipeline = dlt.pipeline(
    "".join(random.choices("abcdefghijklmop123456", k=5)),
    destination=dest,
    dataset_name="dlt",
)

load_info = pipeline.run(
    source,
    table_name="testdata",
    write_disposition="merge",
)

Operating system

Linux

Runtime environment

Local

Python version

3.11

dlt data source

arrow
https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas

dlt destination

Clickhouse

Additional information

clickhouse version: 24.12

@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Feb 1, 2025
@rudolfix rudolfix self-assigned this Feb 1, 2025
@rudolfix rudolfix added the question Further information is requested label Feb 1, 2025
@rudolfix
Copy link
Collaborator

rudolfix commented Feb 1, 2025

When you load arrow/panda frames, dlt will not add any columns to it, including _dlt_id if not explicitly requested. So you need to enable _dlt_id or set the ie. primary_key on your resources. it looks to me that id column is a good candidate

@turtleDev
Copy link
Author

turtleDev commented Feb 1, 2025

How do we enable _dlt_id for dataframe?

Also, this behaviour seems like an inconsistency. According to docs, setting a merge_key should be enough.

I'd be happy to contribute a fix if you can point me in the right direction.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
Status: In Progress
Development

No branches or pull requests

2 participants