Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When normalizing a load package using pyarrow, the schema seems to be ignored #2229

Open
neuromantik33 opened this issue Jan 21, 2025 · 6 comments
Assignees
Labels
bug Something isn't working

Comments

@neuromantik33
Copy link
Contributor

dlt version

1.4.1

Describe the problem

At rare occasions, my dlt pipeline fails when trying to add the _dlt_load_id to a pyarrow table. This is the exception I'm getting

The above exception was caused by the following exception:
pyarrow.lib.ArrowInvalid: Schema at index 1 was different: 
id: int32 not null
user_id: int32
begin_at: timestamp[us, tz=UTC]
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32
team_id: int32
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
vs
id: int32 not null
user_id: int32 not null
begin_at: timestamp[us, tz=UTC] not null
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32 not null
team_id: int32 not null
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32 not null
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null

Stack Trace:
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 471, in extract
    self._extract_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 1239, in _extract_source
    load_id = extract.extract(
              ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 416, in extract
    self._extract_single_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 330, in _extract_single_source
    with self.manage_writers(load_id, source):
  File "/usr/local/lib/python3.12/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 368, in manage_writers
    self.extract_storage.close_writers(load_id)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/storage.py", line 78, in close_writers
    storage.close_writers(load_id, skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/storages/data_item_storage.py", line 85, in close_writers
    writer.close(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 176, in close
    self._flush_and_close_file(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 260, in _flush_and_close_file
    self._flush_items(allow_empty_file)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 250, in _flush_items
    self._writer.write_data(self._buffered_items)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/writers.py", line 471, in write_data
    table = concat_batches_and_tables_in_order(items)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/common/libs/pyarrow.py", line 572, in concat_batches_and_tables_in_order
    return pyarrow.concat_tables(tables, promote_options="none")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 6106, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status

However when looking at the schema.json in the load package at ./normalize/c85674d3774aa0bd/1737392430.155199/schema.json. This is what I see:

$ cat ./normalize/c85674d3774aa0bd/1737392430.155199/schema.json | jq .tables.scale_teams 
{
  "columns": {
    "id": {
      "name": "id",
      "nullable": false,
      "data_type": "bigint",
      "precision": 32
    },
    "user_id": {
      "name": "user_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "begin_at": {
      "name": "begin_at",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "created_at": {
      "name": "created_at",
      "nullable": false,
      "data_type": "timestamp",
      "precision": 6
    },
    "updated_at": {
      "name": "updated_at",
      "nullable": false,
      "data_type": "timestamp",
      "precision": 6
    },
    "scale_id": {
      "name": "scale_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "team_id": {
      "name": "team_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "comment": {
      "name": "comment",
      "nullable": true,
      "data_type": "text"
    },
    "old_feedback": {
      "name": "old_feedback",
      "nullable": true,
      "data_type": "text"
    },
    "feedback_rating": {
      "name": "feedback_rating",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "final_mark": {
      "name": "final_mark",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "truant_id": {
      "name": "truant_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "flag_id": {
      "name": "flag_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "token": {
      "name": "token",
      "nullable": true,
      "data_type": "text"
    },
    "ip": {
      "name": "ip",
      "nullable": true,
      "data_type": "text"
    },
    "internship_id": {
      "name": "internship_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "filled_at": {
      "name": "filled_at",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "_pg_lsn": {
      "name": "_pg_lsn",
      "nullable": true,
      "data_type": "bigint"
    },
    "_pg_deleted_ts": {
      "name": "_pg_deleted_ts",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6
    },
    "_pg_commit_ts": {
      "name": "_pg_commit_ts",
      "nullable": true,
      "data_type": "timestamp",
      "precision": 6,
      "x-bigquery-partition": true
    },
    "_pg_tx_id": {
      "name": "_pg_tx_id",
      "nullable": true,
      "data_type": "bigint",
      "precision": 32
    },
    "_dlt_load_id": {
      "name": "_dlt_load_id",
      "data_type": "text",
      "nullable": false
    }
  },
  "write_disposition": "append",
  "file_format": "parquet",
  "name": "scale_teams",
  "resource": "scale_teams",
  "x-normalizer": {
    "seen-data": true
  }
}

Here is the diff between the 2 schemas

2,3c2,3
< user_id: int32
< begin_at: timestamp[us, tz=UTC]
---
> user_id: int32 not null
> begin_at: timestamp[us, tz=UTC] not null
6,7c6,7
< scale_id: int32
< team_id: int32
---
> scale_id: int32 not null
> team_id: int32 not null
13c13
< flag_id: int32
---
> flag_id: int32 not null

Expected behavior

I would expect that during normalization that the schema be respected. However I'm not sure this is an issue with pyarrow or dlt's uses of it.

Steps to reproduce

This is difficult to reproduce since this is using a custom source. I'll see if I can work out an example.

Operating system

Linux

Runtime environment

Kubernetes

Python version

3.11

dlt data source

pg_legacy_replication :)

dlt destination

Google BigQuery

Other deployment details

Do failed normalized jobs never get loaded when the pipeline is run later in the future? This is a bit problematic for me since I'm using a custom source which reads from a WAL and commits offsets after the load phase.

Additional information

No response

@rudolfix
Copy link
Collaborator

@neuromantik33 from the exception I see that the problem is
user_id: int32 -> user_id: int32 not null

so nullability has changed. this gives me some kind of idea what to check

@rudolfix rudolfix self-assigned this Jan 27, 2025
@rudolfix rudolfix added the bug Something isn't working label Jan 27, 2025
@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Jan 27, 2025
@neuromantik33
Copy link
Contributor Author

Maybe you should hold off on this bug for the moment. I just looked at the parquet file and it is empty which I don't understand how this edge case was reached so I'm also investigating further on my end. One thing that I find troubling however is that jobs that fail during the normalization phase are not retried later by dlt, so I end up have these missing load packages:

root@data-k8s-worker-v8-3-1:/tmp/.dlt/pipelines/intra_db_log# find normalize/ -type f -exec ls -lh {} \;
-rwxrwxrw- 1 nobody nogroup 0 Jan 20 18:00 normalize/c85674d3774aa0bd/1737392430.155199/new_jobs/scale_teams.6cc4f44b86.0.parquet
-rwxrwxrw- 1 nobody nogroup 18K Jan 20 18:00 normalize/c85674d3774aa0bd/1737392430.155199/schema.json
-rwxrwxrw- 1 nobody nogroup 160 Jan 20 18:00 normalize/c85674d3774aa0bd/1737392430.155199/load_package_state.json
-rwxrwxrw- 1 nobody nogroup 0 Jan 22 20:00 normalize/ff3d93f51cd69fc9/1737572428.71939/new_jobs/scale_teams.897250a472.0.parquet
-rwxrwxrw- 1 nobody nogroup 18K Jan 22 20:00 normalize/ff3d93f51cd69fc9/1737572428.71939/schema.json
-rwxrwxrw- 1 nobody nogroup 160 Jan 22 20:00 normalize/ff3d93f51cd69fc9/1737572428.71939/load_package_state.json
-rwxrwxrw- 1 nobody nogroup 13K Jan 26 02:19 normalize/45f192e49cbcfbb8/1737854377.3847513/schema.json
-rwxrwxrw- 1 nobody nogroup 160 Jan 26 02:19 normalize/45f192e49cbcfbb8/1737854377.3847513/load_package_state.json
-rwxrwxrw- 1 nobody nogroup 5 Dec 27 19:00 normalize/.version
-rwxrwxrw- 1 nobody nogroup 8.6K Jan  8 01:00 normalize/a85fb134476632ba/1736294435.0182338/schema.json
-rwxrwxrw- 1 nobody nogroup 160 Jan  8 01:00 normalize/a85fb134476632ba/1736294435.0182338/load_package_state.json
-rwxrwxrw- 1 nobody nogroup 0 Jan 21 11:00 normalize/a382ccf15525f9a2/1737453621.1105795/new_jobs/scale_teams.3549f58746.0.parquet
-rwxrwxrw- 1 nobody nogroup 18K Jan 21 11:00 normalize/a382ccf15525f9a2/1737453621.1105795/new_jobs/teams_users.600f99629e.0.parquet
-rwxrwxrw- 1 nobody nogroup 211K Jan 21 11:00 normalize/a382ccf15525f9a2/1737453621.1105795/new_jobs/teams.b1c45ade5a.0.parquet
-rwxrwxrw- 1 nobody nogroup 18K Jan 21 11:00 normalize/a382ccf15525f9a2/1737453621.1105795/schema.json
-rwxrwxrw- 1 nobody nogroup 160 Jan 21 11:00 normalize/a382ccf15525f9a2/1737453621.1105795/load_package_state.json

@neuromantik33
Copy link
Contributor Author

I'm going to close this since this is obviously a side effect of something happening upstream. Thanks for investigating however 🙏

@github-project-automation github-project-automation bot moved this from In Progress to Done in dlt core library Jan 27, 2025
@neuromantik33 neuromantik33 reopened this Jan 28, 2025
@neuromantik33
Copy link
Contributor Author

So it happened again this morning and I'm including all of my logs and pipeline state. There are the logs of the dlt pipeline running under dagster

$ k logs dagster-run-154869b6-f1b7-4502-81e2-63e71aa996bb-42rfm 
Defaulted container "dagster" out of: dagster, init-storage (init)
2025-01-28 10:00:21,796|[INFO]|1|139968031730560|dlt|__init__.py|google_spreadsheet:115|Processing range student_submissions!A1:AB1000 with name student_submissions
2025-01-28 10:00:22 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 1 - RUN_START - Started execution of run for "__ASSET_JOB".
2025-01-28 10:00:22 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 1 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 1)
2025-01-28 10:00:22 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 1 - intra_db_log - STEP_WORKER_STARTING - Launching subprocess for "intra_db_log".
2025-01-28 10:00:24 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 16 - intra_db_log - STEP_WORKER_STARTED - Executing step "intra_db_log" in subprocess.
2025-01-28 10:00:29,572|[INFO]|16|140601976155008|dlt|__init__.py|google_spreadsheet:115|Processing range student_submissions!A1:AB1000 with name student_submissions
2025-01-28 10:00:29 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 16 - intra_db_log - RESOURCE_INIT_STARTED - Starting initialization of resources [dlt, io_manager].
2025-01-28 10:00:29 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 16 - intra_db_log - RESOURCE_INIT_SUCCESS - Finished initialization of resources [dlt, io_manager].
2025-01-28 10:00:30 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 16 - LOGS_CAPTURED - Started capturing logs in process (pid: 16).
2025-01-28 10:00:30 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 16 - intra_db_log - STEP_START - Started execution of step "intra_db_log".

  Telemetry:

  As an open-source project, we collect usage statistics to inform development priorities. For more
  information, read https://docs.dagster.io/getting-started/telemetry.

  We will not see or store any data that is processed by your code.

  To opt-out, add the following to $DAGSTER_HOME/dagster.yaml, creating that file if necessary:

    telemetry:
      enabled: false


  Welcome to Dagster!

  If you have any questions or would like to engage with the Dagster team, please join us on Slack
  (https://bit.ly/39dvSsF).

2025-01-28 10:00:31,263|[INFO]|16|140601976155008|dlt|helpers.py|create_replication_slot:140|Replication slot 'dlt_replication_raw_intra_v2_db' cannot be created because it already exists
2025-01-28 10:00:32,248|[INFO]|16|140601976155008|dlt|pipeline.py|_restore_state_from_destination:1551|The state was restored from the destination bigquery(dlt.destinations.bigquery):raw_intra_v2_db
2025-01-28 10:00:36,551|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 2019 events for table 'teams'
2025-01-28 10:00:36,552|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 915 events for table 'scale_teams'
2025-01-28 10:00:36,552|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 110 events for table 'experiences'
2025-01-28 10:00:36,570|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 1787 events for table 'locations'
2025-01-28 10:00:36,571|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 166 events for table 'teams_users'
2025-01-28 10:00:36,581|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 3 events for table 'campus'
2025-01-28 10:00:39,501|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 1768 events for table 'locations'
2025-01-28 10:00:39,501|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 1728 events for table 'teams'
2025-01-28 10:00:39,501|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 1049 events for table 'scale_teams'
2025-01-28 10:00:39,501|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 159 events for table 'teams_users'
2025-01-28 10:00:39,540|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 135 events for table 'experiences'
2025-01-28 10:00:39,553|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 9 events for table 'campus'
2025-01-28 10:00:39 +0000 - dagster - ERROR - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 16 - intra_db_log - STEP_FAILURE - Execution of step "intra_db_log" failed.

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "intra_db_log"::

dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage extract when processing package 1738058432.5505114 with exception:

<class 'pyarrow.lib.ArrowInvalid'>
Schema at index 1 was different: 
id: int32 not null
user_id: int32
begin_at: timestamp[us, tz=UTC]
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32
team_id: int32
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
vs
id: int32 not null
user_id: int32 not null
begin_at: timestamp[us, tz=UTC] not null
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32 not null
team_id: int32 not null
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32 not null
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null

Stack Trace:
  File "/usr/local/lib/python3.12/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.12/site-packages/dagster/_utils/__init__.py", line 490, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/opt/dagster/app/intra/assets/dlt.py", line 68, in raw_intra_log_tables
    yield from dlt.run(context, dlt_source=dlt_repl_wal, dlt_pipeline=dlt_pipeline)
  File "/usr/local/lib/python3.12/site-packages/dagster_embedded_elt/dlt/dlt_event_iterator.py", line 76, in __next__
    return next(self._inner_iterator)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster_embedded_elt/dlt/resource.py", line 286, in _run
    load_info = dlt_pipeline.run(dlt_source, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 735, in run
    self.extract(
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 180, in _wrap
    rv = f(self, *args, **kwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 166, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 492, in extract
    raise PipelineStepFailed(

The above exception was caused by the following exception:
pyarrow.lib.ArrowInvalid: Schema at index 1 was different: 
id: int32 not null
user_id: int32
begin_at: timestamp[us, tz=UTC]
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32
team_id: int32
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
vs
id: int32 not null
user_id: int32 not null
begin_at: timestamp[us, tz=UTC] not null
created_at: timestamp[us, tz=UTC] not null
updated_at: timestamp[us, tz=UTC] not null
scale_id: int32 not null
team_id: int32 not null
comment: string
old_feedback: string
feedback_rating: int32
final_mark: int32
truant_id: int32
flag_id: int32 not null
token: string
ip: string
internship_id: int32
filled_at: timestamp[us, tz=UTC]
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null

Stack Trace:
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 471, in extract
    self._extract_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 1239, in _extract_source
    load_id = extract.extract(
              ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 416, in extract
    self._extract_single_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 330, in _extract_single_source
    with self.manage_writers(load_id, source):
  File "/usr/local/lib/python3.12/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 368, in manage_writers
    self.extract_storage.close_writers(load_id)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/storage.py", line 78, in close_writers
    storage.close_writers(load_id, skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/storages/data_item_storage.py", line 85, in close_writers
    writer.close(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 176, in close
    self._flush_and_close_file(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 260, in _flush_and_close_file
    self._flush_items(allow_empty_file)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 250, in _flush_items
    self._writer.write_data(self._buffered_items)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/writers.py", line 471, in write_data
    table = concat_batches_and_tables_in_order(items)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/common/libs/pyarrow.py", line 572, in concat_batches_and_tables_in_order
    return pyarrow.concat_tables(tables, promote_options="none")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 6106, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status

------------------------------- Extract intra_db -------------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 337.10 MB (46.10%) | CPU usage: 0.00%

------------------------------- Extract intra_db -------------------------------
Resources: 0/1 (0.0%) | Time: 4.02s | Rate: 0.00/s
teams: 2019  | Time: 0.00s | Rate: 313640732.44/s
Memory usage: 319.53 MB (46.00%) | CPU usage: 0.00%

------------------------------- Extract intra_db -------------------------------
Resources: 0/1 (0.0%) | Time: 4.03s | Rate: 0.00/s
teams: 2019  | Time: 0.01s | Rate: 184366.01/s
scale_teams: 915  | Time: 0.00s | Rate: 239861760.00/s
Memory usage: 320.57 MB (46.00%) | CPU usage: 0.00%

------------------------------- Extract intra_db -------------------------------
Resources: 0/1 (0.0%) | Time: 4.03s | Rate: 0.00/s
teams: 2019  | Time: 0.02s | Rate: 134366.27/s
scale_teams: 915  | Time: 0.00s | Rate: 224327.11/s
teams_users: 166  | Time: 0.00s | Rate: 46416964.27/s
Memory usage: 320.82 MB (46.00%) | CPU usage: 0.00%

------------------------------- Extract intra_db -------------------------------
Resources: 0/1 (0.0%) | Time: 6.95s | Rate: 0.00/s
teams: 2019  | Time: 2.93s | Rate: 687.93/s
scale_teams: 915  | Time: 2.92s | Rate: 312.94/s
teams_users: 325  | Time: 2.92s | Rate: 111.31/s
Memory usage: 326.01 MB (46.50%) | CPU usage: 0.00%

------------------------------- Extract intra_db -------------------------------
Resources: 0/1 (0.0%) | Time: 6.95s | Rate: 0.00/s
teams: 2019  | Time: 2.94s | Rate: 687.09/s
scale_teams: 915  | Time: 2.93s | Rate: 312.55/s
teams_users: 325  | Time: 2.92s | Rate: 111.17/s
experiences: 110  | Time: 0.00s | Rate: 24282812.63/s
Memory usage: 326.01 MB (46.50%) | CPU usage: 0.00%

------------------------------- Extract intra_db -------------------------------
Resources: 0/1 (0.0%) | Time: 6.97s | Rate: 0.00/s
teams: 2019  | Time: 2.95s | Rate: 684.29/s
scale_teams: 915  | Time: 2.94s | Rate: 311.27/s
teams_users: 325  | Time: 2.94s | Rate: 110.71/s
experiences: 110  | Time: 0.01s | Rate: 9148.06/s
locations: 1787  | Time: 0.00s | Rate: 416401180.44/s
Memory usage: 326.52 MB (46.50%) | CPU usage: 0.00%

------------------------------- Extract intra_db -------------------------------
Resources: 0/1 (0.0%) | Time: 7.01s | Rate: 0.00/s
teams: 3747  | Time: 3.00s | Rate: 1249.59/s
scale_teams: 1964  | Time: 2.99s | Rate: 657.37/s
teams_users: 325  | Time: 2.98s | Rate: 108.93/s
experiences: 110  | Time: 0.06s | Rate: 1830.11/s
locations: 3555  | Time: 0.05s | Rate: 73930.52/s
campus: 3  | Time: 0.00s | Rate: 740171.29/s
Memory usage: 328.07 MB (46.50%) | CPU usage: 0.00%

------------------------------- Extract intra_db -------------------------------
Resources: 1/1 (100.0%) | Time: 7.04s | Rate: 0.14/s
teams: 3747  | Time: 3.03s | Rate: 1237.46/s
scale_teams: 1964  | Time: 3.02s | Rate: 650.97/s
teams_users: 325  | Time: 3.01s | Rate: 107.87/s
experiences: 245  | Time: 0.09s | Rate: 2737.48/s
locations: 3555  | Time: 0.08s | Rate: 45883.75/s
campus: 12  | Time: 0.03s | Rate: 408.21/s
Memory usage: 332.48 MB (46.50%) | CPU usage: 0.00%

2025-01-28 10:00:40 +0000 - dagster - DEBUG - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 1 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 17.84s (pid: 1)
2025-01-28 10:00:41 +0000 - dagster - ERROR - __ASSET_JOB - 154869b6-f1b7-4502-81e2-63e71aa996bb - 1 - RUN_FAILURE - Execution of run for "__ASSET_JOB" failed. Steps failed: ['intra_db_log'].

FYI it is the scale_teams table that constantly poses problem. I added a guardrail to only emit arrow tables if there was any data and looking at the logs it seems like it is the case.

$ k logs dagster-run-154869b6-f1b7-4502-81e2-63e71aa996bb-42rfm | grep scale_teams
Defaulted container "dagster" out of: dagster, init-storage (init)
2025-01-28 10:00:36,552|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 915 events for table 'scale_teams'
2025-01-28 10:00:39,501|[INFO]|16|140601976155008|dlt|helpers.py|flush_batch:406|Flushing 1049 events for table 'scale_teams'
scale_teams: 915  | Time: 0.00s | Rate: 239861760.00/s
scale_teams: 915  | Time: 0.00s | Rate: 224327.11/s
scale_teams: 915  | Time: 2.92s | Rate: 312.94/s
scale_teams: 915  | Time: 2.93s | Rate: 312.55/s
scale_teams: 915  | Time: 2.94s | Rate: 311.27/s
scale_teams: 1964  | Time: 2.99s | Rate: 657.37/s
scale_teams: 1964  | Time: 3.02s | Rate: 650.97/s

What makes this bug more troubling is that all of this occurs when closing the parquet file, where all flushing and arrow concatentation is taking place, resulting in a 0 length parquet file as you can see here

root@data-k8s-worker-v8-3-1:/tmp/.dlt/pipelines/intra_db_log# find -type f | grep 1738058432.5505114 | xargs ls -lh
-rw-r--r-- 1 nobody nogroup  160 Jan 28 11:00 ./normalize/a05f2b52f799613c/1738058432.5505114/load_package_state.json
-rw-r--r-- 1 nobody nogroup    0 Jan 28 11:00 ./normalize/a05f2b52f799613c/1738058432.5505114/new_jobs/scale_teams.d119c6e408.0.parquet
-rw-r--r-- 1 nobody nogroup 249K Jan 28 11:00 ./normalize/a05f2b52f799613c/1738058432.5505114/new_jobs/teams.4ac2e32c46.0.parquet
-rw-r--r-- 1 nobody nogroup  17K Jan 28 11:00 ./normalize/a05f2b52f799613c/1738058432.5505114/schema.json

This is directly taken from the k8s node which is temporarily storing all dlt state to handle retrying failed job. I've also set the flag to delete completed jobs (ie. LOAD__DELETE_COMPLETED_JOBS: "true") which I don't think works correctly since my job never completed the normalization step
https://dlthub.com/docs/running-in-production/running#data-left-behind

@neuromantik33
Copy link
Contributor Author

So it happened again with another table, this time I really have no idea how this can possibly happen. There must be a bug in how I build my arrow tables however the fact that the parquet file gets wipes makes it difficult to investigate and try to correct. Here are my new logs. Here are the failed parquet load packages (which I was hoping would get retried automatically but they aren't.

/tmp/.dlt/pipelines# find -type f | grep 1739779237.1482742 | xargs ls -lh
-rwxrwxrw- 1 nobody nogroup  160 Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/load_package_state.json
-rwxrwxrw- 1 nobody nogroup    0 Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/new_jobs/experiences.baea96fd7c.0.parquet
-rwxrwxrw- 1 nobody nogroup  71K Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/new_jobs/locations.d2d011f36a.0.parquet
-rwxrwxrw- 1 nobody nogroup  73K Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/new_jobs/scale_teams.4926c2cab8.0.parquet
-rwxrwxrw- 1 nobody nogroup 158K Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/new_jobs/slots.d6856f9413.0.parquet
-rwxrwxrw- 1 nobody nogroup 138K Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/new_jobs/teams.2957c444ee.0.parquet
-rwxrwxrw- 1 nobody nogroup  13K Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/new_jobs/teams_users.f6a27385c6.0.parquet
-rwxrwxrw- 1 nobody nogroup 433K Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/new_jobs/versions.6501df078a.0.parquet
-rwxrwxrw- 1 nobody nogroup  22K Feb 17 09:00 ./intra_db_log/normalize/598009729e80ce74/1739779237.1482742/schema.json

And here is the log of the failed job

2025-02-17 08:00:35,147|[INFO]|16|140478234573696|dlt|helpers.py|create_replication_slot:140|Replication slot 'dlt_replication_raw_intra_v2_db' cannot be created because it already exists
2025-02-17 08:00:36,945|[INFO]|16|140478234573696|dlt|pipeline.py|_restore_state_from_destination:1550|The state was restored from the destination bigquery(dlt.destinations.bigquery):raw_intra_v2_db
2025-02-17 08:00:39,089|[WARNING]|16|140478234573696|dlt|schema_types.py|_to_dlt_column_type:123|No type found for type_id '114' and modifier 'json'
2025-02-17 08:00:39,089|[WARNING]|16|140478234573696|dlt|schema_types.py|_to_dlt_column_type:123|No type found for type_id '114' and modifier 'json'
2025-02-17 08:00:39,816|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 508 events for table 'locations'
2025-02-17 08:00:39,817|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 913 events for table 'scale_teams'
2025-02-17 08:00:39,817|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 1660 events for table 'versions'
2025-02-17 08:00:39,817|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 749 events for table 'teams'
2025-02-17 08:00:39,829|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 1097 events for table 'slots'
2025-02-17 08:00:39,830|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 53 events for table 'teams_users'
2025-02-17 08:00:39,830|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 20 events for table 'experiences'
2025-02-17 08:00:39,974|[WARNING]|16|140478234573696|dlt|schema_types.py|_to_dlt_column_type:123|No type found for type_id '114' and modifier 'json'
2025-02-17 08:00:39,974|[WARNING]|16|140478234573696|dlt|schema_types.py|_to_dlt_column_type:123|No type found for type_id '114' and modifier 'json'
2025-02-17 08:00:40,761|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 620 events for table 'teams'
2025-02-17 08:00:40,766|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 1050 events for table 'versions'
2025-02-17 08:00:40,788|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 337 events for table 'scale_teams'
2025-02-17 08:00:40,801|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 65 events for table 'teams_users'
2025-02-17 08:00:40,804|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 719 events for table 'locations'
2025-02-17 08:00:40,805|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 2200 events for table 'slots'
2025-02-17 08:00:40,806|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 29 events for table 'experiences'
2025-02-17 08:00:40,807|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 3 events for table 'project_sessions'
2025-02-17 08:00:40,811|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 2 events for table 'projects'
2025-02-17 08:00:40,811|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 5 events for table 'campus'
2025-02-17 08:00:41,077|[WARNING]|16|140478234573696|dlt|schema_types.py|_to_dlt_column_type:123|No type found for type_id '114' and modifier 'json'
2025-02-17 08:00:41,077|[WARNING]|16|140478234573696|dlt|schema_types.py|_to_dlt_column_type:123|No type found for type_id '114' and modifier 'json'
2025-02-17 08:00:41,678|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 889 events for table 'versions'
2025-02-17 08:00:41,679|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 26 events for table 'campus'
2025-02-17 08:00:41,686|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 789 events for table 'locations'
2025-02-17 08:00:41,691|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 554 events for table 'teams'
2025-02-17 08:00:41,698|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 45 events for table 'teams_users'
2025-02-17 08:00:41,701|[WARNING]|16|140478234573696|dlt|pyarrow.py|row_tuples_to_arrow:686|Field objectives was reflected as JSON type and needs to be serialized back to string to be placed in arrow table. This will slow data extraction down. You should cast JSON field to STRING in your database system ie. by creating and extracting an SQL VIEW that selects with cast.
2025-02-17 08:00:41,709|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 2289 events for table 'slots'
2025-02-17 08:00:41,710|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 357 events for table 'scale_teams'
2025-02-17 08:00:41,710|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 45 events for table 'experiences'
2025-02-17 08:00:41,710|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 4 events for table 'project_sessions'
2025-02-17 08:00:41,710|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 4 events for table 'projects'
2025-02-17 08:00:42,103|[WARNING]|16|140478234573696|dlt|schema_types.py|_to_dlt_column_type:123|No type found for type_id '114' and modifier 'json'
2025-02-17 08:00:42,103|[WARNING]|16|140478234573696|dlt|schema_types.py|_to_dlt_column_type:123|No type found for type_id '114' and modifier 'json'
2025-02-17 08:00:42,172|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 6 events for table 'teams'
2025-02-17 08:00:42,181|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 35 events for table 'locations'
2025-02-17 08:00:42,181|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 30 events for table 'versions'
2025-02-17 08:00:42,182|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 3 events for table 'campus'
2025-02-17 08:00:42,186|[WARNING]|16|140478234573696|dlt|pyarrow.py|row_tuples_to_arrow:686|Field objectives was reflected as JSON type and needs to be serialized back to string to be placed in arrow table. This will slow data extraction down. You should cast JSON field to STRING in your database system ie. by creating and extracting an SQL VIEW that selects with cast.
2025-02-17 08:00:42,192|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 3 events for table 'project_sessions'
2025-02-17 08:00:42,193|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 3 events for table 'projects'
2025-02-17 08:00:42,193|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 100 events for table 'slots'
2025-02-17 08:00:42,193|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 9 events for table 'experiences'
2025-02-17 08:00:42,195|[INFO]|16|140478234573696|dlt|helpers.py|flush_batch:410|Flushing 5 events for table 'scale_teams'
2025-02-17 08:00:42,198|[WARNING]|16|140478234573696|dlt|pyarrow.py|row_tuples_to_arrow:686|Field objectives was reflected as JSON type and needs to be serialized back to string to be placed in arrow table. This will slow data extraction down. You should cast JSON field to STRING in your database system ie. by creating and extracting an SQL VIEW that selects with cast.
2025-02-17 08:00:42 +0000 - dagster - ERROR - __ASSET_JOB - dca1157f-da3f-4dcc-a7a4-2806383233e6 - 16 - intra_db_log - STEP_FAILURE - Execution of step "intra_db_log" failed.

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "intra_db_log"::

dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage extract when processing package 1739779237.1482742 with exception:

<class 'pyarrow.lib.ArrowInvalid'>
Schema at index 3 was different: 
id: int32 not null
user_id: int32
skill_id: int32
experiancable_id: int32
experiancable_type: string
experience: int32
created_at: timestamp[us, tz=UTC]
cursus_id: int32
is_bonus: bool
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
vs
id: int32
user_id: int32
skill_id: int32
experiancable_id: int32
experiancable_type: string
experience: int32
created_at: timestamp[us, tz=UTC]
cursus_id: int32
is_bonus: bool
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null

Stack Trace:
  File "/usr/local/lib/python3.12/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.12/site-packages/dagster/_utils/__init__.py", line 490, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/opt/dagster/app/intra/assets/dlt.py", line 82, in raw_intra_log_tables
    yield from merge_dlt_results(
  File "/opt/dagster/app/intra_dlt/utils.py", line 82, in merge_dlt_results
    for result in chain(*dlt_results):
  File "/usr/local/lib/python3.12/site-packages/dagster_embedded_elt/dlt/dlt_event_iterator.py", line 76, in __next__
    return next(self._inner_iterator)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster_embedded_elt/dlt/resource.py", line 286, in _run
    load_info = dlt_pipeline.run(dlt_source, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 225, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 274, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 734, in run
    self.extract(
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 225, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 179, in _wrap
    rv = f(self, *args, **kwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 165, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 274, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 491, in extract
    raise PipelineStepFailed(

The above exception was caused by the following exception:
pyarrow.lib.ArrowInvalid: Schema at index 3 was different: 
id: int32 not null
user_id: int32
skill_id: int32
experiancable_id: int32
experiancable_type: string
experience: int32
created_at: timestamp[us, tz=UTC]
cursus_id: int32
is_bonus: bool
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
vs
id: int32
user_id: int32
skill_id: int32
experiancable_id: int32
experiancable_type: string
experience: int32
created_at: timestamp[us, tz=UTC]
cursus_id: int32
is_bonus: bool
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null

Stack Trace:
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 470, in extract
    self._extract_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 1238, in _extract_source
    load_id = extract.extract(
              ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 435, in extract
    self._extract_single_source(
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 349, in _extract_single_source
    with self.manage_writers(load_id, source):
  File "/usr/local/lib/python3.12/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/extract.py", line 387, in manage_writers
    self.extract_storage.close_writers(load_id)
  File "/usr/local/lib/python3.12/site-packages/dlt/extract/storage.py", line 78, in close_writers
    storage.close_writers(load_id, skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/storages/data_item_storage.py", line 85, in close_writers
    writer.close(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 176, in close
    self._flush_and_close_file(skip_flush=skip_flush)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 260, in _flush_and_close_file
    self._flush_items(allow_empty_file)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/buffered.py", line 250, in _flush_items
    self._writer.write_data(self._buffered_items)
  File "/usr/local/lib/python3.12/site-packages/dlt/common/data_writers/writers.py", line 471, in write_data
    table = concat_batches_and_tables_in_order(items)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dlt/common/libs/pyarrow.py", line 634, in concat_batches_and_tables_in_order
    return pyarrow.concat_tables(tables, promote_options="none")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 6106, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status

I can try to correct it myself but I welcome any pointers on how to approach this problem.

neuromantik33 pushed a commit to neuromantik33/verified-sources that referenced this issue Feb 19, 2025
- No longer inferring schemas if the first row_msg in a batch is a DELETE operation
- Instead uses sqlalchemy to reflect the schema
- In pg9.6, pg_current_xlog_location wasn't reliable which would cause the message consumer to hang until new data was flushed to WAL
- Doesn't fix but was the cause for dlt-hub/dlt#2229 (was able to reproduce in the added test case)
- Some minor refactoring
@neuromantik33
Copy link
Contributor Author

Just adding another occurrence for reference purposes. I'm progressively squashing out the bugs causing this, but I must say, pyarrow is a very cruel mistress—no schemas divergences are allowed whatsoever! 😬

<class 'pyarrow.lib.ArrowInvalid'>
Schema at index 3 was different: 
id: int64 not null
begin_at: timestamp[us]
end_at: timestamp[us]
user_id: int64
created_at: timestamp[us]
scale_team_id: int64
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null
vs
id: int64 not null
begin_at: timestamp[us, tz=UTC]
end_at: timestamp[us, tz=UTC]
user_id: int64
created_at: timestamp[us, tz=UTC]
scale_team_id: int64
_pg_lsn: int64
_pg_deleted_ts: timestamp[us, tz=UTC]
_pg_commit_ts: timestamp[us, tz=UTC]
_pg_tx_id: int32
_dlt_load_id: dictionary<values=string, indices=int8, ordered=0> not null

I think I can reproduce the issue in a test case after seeing it enough times.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
Development

No branches or pull requests

2 participants