Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition pruning does not work when values in predicate do not match column datatype #3278

Open
ldacey opened this issue Feb 28, 2025 · 4 comments
Labels
bug Something isn't working

Comments

@ldacey
Copy link
Contributor

ldacey commented Feb 28, 2025

Environment

Delta-rs version:
0.25.2

Binding:

Environment:

  • Cloud provider: Google/Azure
  • OS: Mac
  • Other:

Bug

What happened:
When I upgraded to 0.25 a lot of my delta tables began to fail due to pod OOM errors despite providing more and more memory to them. Disabling streamed_exec=True fixed this for me.

It seems like partition pruning is not happening - tables with hundreds or thousands are partitions are being fully scanned even when the merge predicates are specific and point directly to a single partition. For example, one of my tables is just a single date of data (76k rows) which is merged into a table partitioned by month_id and date_id. This should (and used to be) a quick merge since the predicate we use is explicit. This fails with streamed_exec=True though and debugging shows all 2600+ files are scanned.

What you expected to happen:
Predicates which filter out partitions should be respected.

How to reproduce it:

import os
import shutil
import pyarrow as pa
from deltalake import DeltaTable

table_path1 = "test_delta_not_streamed"
table_path2 = "test_delta_streamed"

for path in [table_path1, table_path2]:
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path)

schema = pa.schema([
    pa.field("month_id", pa.int32()),
    pa.field("date_id", pa.int32()),
    pa.field("unique_row_hash", pa.string()),
    pa.field("col1", pa.string())
])

dt1 = DeltaTable.create(
    table_uri=table_path1,
    schema=schema,
    partition_by=["month_id", "date_id"]
)

dt2 = DeltaTable.create(
    table_uri=table_path2,
    schema=schema,
    partition_by=["month_id", "date_id"]
)

partitions = [
    (202501, 20250101),
    (202502, 20250201),
    (202502, 20250226),
    (202503, 20250301),
]

rows = []
for month_id, date_id in partitions:
    rows.append([month_id, date_id, f"hash_{month_id}_{date_id}", "value"])

source_table = pa.Table.from_arrays(
    [
        pa.array([row[0] for row in rows], type=pa.int32()),
        pa.array([row[1] for row in rows], type=pa.int32()),
        pa.array([row[2] for row in rows], type=pa.string()),
        pa.array([row[3] for row in rows], type=pa.string()),
    ],
    names=["month_id", "date_id", "unique_row_hash", "col1"]
)

for dt in [dt1, dt2]:
    dt.merge(
        source=source_table,
        predicate="s.unique_row_hash = t.unique_row_hash",
        source_alias="s",
        target_alias="t"
    ).when_not_matched_insert_all().execute()

print(f"Table 1 has {len(dt1.files())} files")
print(f"Table 2 has {len(dt2.files())} files")


source_table_new = pa.Table.from_arrays(
    [
        pa.array([202502], type=pa.int32()),
        pa.array([20250226], type=pa.int32()),
        pa.array(["new_hash"], type=pa.string()),
        pa.array(["new_value"], type=pa.string()),
    ],
    names=["month_id", "date_id", "unique_row_hash", "col1"]
)

merge_predicate = "(s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = 202502 AND s.date_id = t.date_id AND t.date_id = 20250226)"
print(f"Using merge predicate: {merge_predicate}")

print("\nTesting with streamed_exec=False:")
result1 = dt1.merge(
    source=source_table_new,
    predicate=merge_predicate,
    source_alias="s",
    target_alias="t",
    streamed_exec=False
).when_not_matched_insert_all().execute()

print(f"streamed_exec=False results: {result1}")

print("\nTesting with streamed_exec=True:")
result2 = dt2.merge(
    source=source_table_new,
    predicate=merge_predicate,
    source_alias="s",
    target_alias="t",
    streamed_exec=True
).when_not_matched_insert_all().execute()
print(f"streamed_exec=True results: {result2}\n")


print(f"Files scanned (not streamed): {result1['num_target_files_scanned']}")
print(f"Files skipped (not streamed): {result1['num_target_files_skipped_during_scan']}")
print(f"Files scanned (streamed): {result2['num_target_files_scanned']}")
print(f"Files skipped (streamed): {result2['num_target_files_skipped_during_scan']}")

More details:
The results show that when streamed_exec=True the merge is not skipping all files.

Results:

Table 1 has 4 files
Table 2 has 4 files
Using merge predicate: (s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = 202502 AND s.date_id = t.date_id AND t.date_id = 20250226)

Testing with streamed_exec=False:
streamed_exec=False results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 0, 'num_target_files_skipped_during_scan': 4, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 0}

Testing with streamed_exec=True:
streamed_exec=True results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 1, 'num_target_files_skipped_during_scan': 3, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 1}

Files scanned (not streamed): 0
Files skipped (not streamed): 4
Files scanned (streamed): 1
Files skipped (streamed): 3

FYI - I noticed that when I quoted the partition values in the predicate it made the situation worse.

month_id and date_id are integers in the data, but my custom merge function adds partitions to the predicate and I am quoting all values:

    predicates = [
        f"s.{key} = t.{key} AND t.{key} = '{value}'"
        for key, _, value in partition_filters
    ]

So my real-world example is impacted by this. I was under the impression that we need to always use strings with delta-rs. Maybe that is just for partitions filters, like [("month_id", "=", "202502")], and not predicates?

# note that the date/month are now quoted as strings
Using merge predicate: (s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = '202502' AND s.date_id = t.date_id AND t.date_id = '20250226')

Testing with streamed_exec=False:
streamed_exec=False results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 0, 'num_target_files_skipped_during_scan': 4, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 11, 'scan_time_ms': 0, 'rewrite_time_ms': 0}

Testing with streamed_exec=True:
streamed_exec=True results: {'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1, 'num_target_files_scanned': 4, 'num_target_files_skipped_during_scan': 0, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 10, 'scan_time_ms': 0, 'rewrite_time_ms': 1}

Files scanned (not streamed): 0
Files skipped (not streamed): 4
Files scanned (streamed): 4
Files skipped (streamed): 0
@ldacey ldacey added the bug Something isn't working label Feb 28, 2025
@ion-elgreco
Copy link
Collaborator

Thanks for the repro, I'll look at it :)

@ldacey
Copy link
Contributor Author

ldacey commented Feb 28, 2025

Yep - the main issue I am facing is actually JUST the quotes I added around my predicate values. If I remove those quotes then this works great (I was able to use the rust engine to merge my big wide table for the first time ever successfully).

@ion-elgreco ion-elgreco changed the title Partition pruning does not work during merge when streamed_exec=True Partition pruning does not work when values in predicate do not match column type Feb 28, 2025
@ion-elgreco
Copy link
Collaborator

Indeed, so streaming exec works fine, the issue or potential not yet supported functionality is that the datafusion pruning_predicate doesn't coerce the expr values to the column type while pruning

@ion-elgreco ion-elgreco changed the title Partition pruning does not work when values in predicate do not match column type Partition pruning does not work when values in predicate do not match column datatype Feb 28, 2025
@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Feb 28, 2025

Updated repro:

import os
import shutil
import pyarrow as pa
from deltalake import DeltaTable

table_path1 = "data/test_delta_incorrect_predicate"
table_path2 = "data/test_delta_correct_predicate"

for path in [table_path1, table_path2]:
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path)

schema = pa.schema([
    pa.field("month_id", pa.int32()),
    pa.field("date_id", pa.int32()),
    pa.field("unique_row_hash", pa.string()),
    pa.field("col1", pa.string())
])

dt1 = DeltaTable.create(
    table_uri=table_path1,
    schema=schema,
    partition_by=["month_id", "date_id"]
)

dt2 = DeltaTable.create(
    table_uri=table_path2,
    schema=schema,
    partition_by=["month_id", "date_id"]
)

partitions = [
    (202501, 20250101),
    (202502, 20250201),
    (202502, 20250226),
    (202503, 20250301),
]

rows = []
for month_id, date_id in partitions:
    rows.append([month_id, date_id, f"hash_{month_id}_{date_id}", "value"])

source_table = pa.Table.from_arrays(
    [
        pa.array([row[0] for row in rows], type=pa.int32()),
        pa.array([row[1] for row in rows], type=pa.int32()),
        pa.array([row[2] for row in rows], type=pa.string()),
        pa.array([row[3] for row in rows], type=pa.string()),
    ],
    names=["month_id", "date_id", "unique_row_hash", "col1"]
)

for dt in [dt1, dt2]:
    dt.merge(
        source=source_table,
        predicate="s.unique_row_hash = t.unique_row_hash",
        source_alias="s",
        target_alias="t"
    ).when_not_matched_insert_all().execute()

print(f"Table 1 has {len(dt1.files())} files")
print(f"Table 2 has {len(dt2.files())} files")


source_table_new = pa.Table.from_arrays(
    [
        pa.array([202502], type=pa.int32()),
        pa.array([20250226], type=pa.int32()),
        pa.array(["new_hash"], type=pa.string()),
        pa.array(["new_value"], type=pa.string()),
    ],
    names=["month_id", "date_id", "unique_row_hash", "col1"]
)

correct_merge_predicate = "(s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = 202502 AND s.date_id = t.date_id AND t.date_id = 20250226)"
incorrect_merge_predicate = "(s.unique_row_hash = t.unique_row_hash) AND (s.month_id = t.month_id AND t.month_id = '202502' AND s.date_id = t.date_id AND t.date_id = '20250226')"

result1 = dt1.merge(
    source=source_table_new,
    predicate=incorrect_merge_predicate,
    source_alias="s",
    target_alias="t",
).when_not_matched_insert_all().execute()

result2 = dt2.merge(
    source=source_table_new,
    predicate=correct_merge_predicate,
    source_alias="s",
    target_alias="t",
).when_not_matched_insert_all().execute()


print(f"Files scanned (incorrect predicate): {result1['num_target_files_scanned']}")
print(f"Files skipped (incorrect predicate ): {result1['num_target_files_skipped_during_scan']}")
print(f"Files scanned (correct predicate): {result2['num_target_files_scanned']}")
print(f"Files skipped (correct predicate): {result2['num_target_files_skipped_during_scan']}")
Table 1 has 4 files
Table 2 has 4 files
Files scanned (incorrect predicate): 0
Files skipped (incorrect predicate ): 4
Files scanned (correct predicate): 1
Files skipped (correct predicate): 3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants