Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema parsing/resolution is done serially or something (slower than resolving it concurrently with a python threadpool) #21034

Open
2 tasks done
kszlim opened this issue Jan 31, 2025 · 2 comments
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@kszlim
Copy link
Contributor

kszlim commented Jan 31, 2025

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import os
import time
import polars as pl
from concurrent.futures import ThreadPoolExecutor

os.environ["POLARS_FORCE_ASYNC"] = "1"

# Directory to store the Parquet files.
DATA_DIR = "data"

def generate_dataset(num_files: int = 200, rows: int = 4000, cols: int = 10000) -> None:
    """
    Generate a single DataFrame with the specified number of rows and columns,
    then write it to `num_files` Parquet files in the DATA_DIR folder.
    
    Each column "colX" will contain the numbers [X, X+1, ..., X+rows-1].
    """
    os.makedirs(DATA_DIR, exist_ok=True)
    print(f"Generating a DataFrame with {rows} rows and {cols} columns...")
    df = pl.DataFrame({f"col{i}": list(range(i, i + rows)) for i in range(cols)})
    
    print(f"Writing {num_files} Parquet files to '{DATA_DIR}'...")
    for i in range(num_files):
        file_path = os.path.join(DATA_DIR, f"file_{i}.parquet")
        if os.path.exists(file_path):
            continue
        df.write_parquet(file_path)
    print("Dataset generation complete.\n")

def sequential_scan(pq_paths: list) -> pl.DataFrame:
    """
    Sequentially scans each Parquet file, concatenates the lazy frames,
    collects the result, and prints the time taken.
    """
    start = time.time()
    lazy_frames = []
    for path in pq_paths:
        lazy_frames.append(pl.scan_parquet(path))
    result = pl.concat(lazy_frames).collect_schema()
    elapsed = time.time() - start
    print(f"Sequential scan took {elapsed:.2f} seconds.")
    return elapsed

def concurrent_scan(pq_paths: list) -> pl.DataFrame:
    """
    Uses ThreadPoolExecutor to scan Parquet files concurrently.
    For each file, we scan it lazily and call collect_schema() (which preloads the schema)
    before concatenating and collecting the result.
    """
    def get_lazy_frame(path: str) -> pl.LazyFrame:
        ldf = pl.scan_parquet(path)
        ldf.collect_schema()
        return ldf

    start = time.time()
    with ThreadPoolExecutor() as pool:
        lazy_frames = list(pool.map(get_lazy_frame, pq_paths))
    result = pl.concat(lazy_frames).collect_schema()
    elapsed = time.time() - start
    print(f"Concurrent scan took {elapsed:.2f} seconds.")
    return elapsed

if __name__ == '__main__':
    # Step 1. Generate the dataset (200 Parquet files, each with a 2000x2000 DataFrame).
    generate_dataset(num_files=200)

    pq_paths = [os.path.join(DATA_DIR, f"file_{i}.parquet") for i in range(200)]
    
    print("Running sequential scan...")
    result_seq = sequential_scan(pq_paths)
    
    print("\nRunning concurrent scan...")
    result_conc = concurrent_scan(pq_paths)

Log output

Generating a DataFrame with 4000 rows and 10000 columns...
Writing 200 Parquet files to 'data'...
Dataset generation complete.

Running sequential scan...
Sequential scan took 4.72 seconds.

Running concurrent scan...
Concurrent scan took 1.86 seconds.

Issue description

It takes much longer if I collect the schema purely within polars (without doing it concurrently in a python threadpool).

This difference is dramatically larger when done against files hosted in the cloud (i'm getting a 7-20x speed difference).

Expected behavior

Should be equivalently fast or faster doing it in pure polars.

Installed versions

--------Version info---------
Polars:              1.21.0
Index type:          UInt32
Platform:            Linux-5.10.230-202.885.x86_64-x86_64-with-glibc2.26
Python:              3.11.7 (main, Dec  5 2023, 22:00:36) [GCC 7.3.1 20180712 (Red Hat 7.3.1-17)]
LTS CPU:             False

----Optional dependencies----
Azure CLI            <not installed>
adbc_driver_manager  <not installed>
altair               <not installed>
azure.identity       <not installed>
boto3                1.36.6
cloudpickle          3.1.1
connectorx           <not installed>
deltalake            <not installed>
fastexcel            <not installed>
fsspec               2024.12.0
gevent               <not installed>
google.auth          <not installed>
great_tables         0.16.1
matplotlib           <not installed>
numpy                2.2.2
openpyxl             <not installed>
pandas               2.2.3
pyarrow              18.1.0
pydantic             <not installed>
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>
@kszlim kszlim added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Jan 31, 2025
@kszlim
Copy link
Contributor Author

kszlim commented Jan 31, 2025

Also an interesting thing I've noticed is that i'm much more likely to get:

Traceback (most recent call last):
  File "/path/to/project/src/my_module/data/script.py", line 126, in <module>
    print(ldf.collect())
          ^^^^^^^^^^^^^
  File "/path/to/lib/python3.11/site-packages/polars/lazyframe/frame.py", line 2056, in collect
    return wrap_df(ldf.collect(callback))
                   ^^^^^^^^^^^^^^^^^^^^^
polars.exceptions.ComputeError: Object at location redacted.parquet not found: Client error with status 404 Not Found: No Body

Resolved plan until failure:

	---> FAILED HERE RESOLVING 'group_by' <---
 WITH_COLUMNS:
 [String(redacted_str).alias("some_id")]
   SELECT [col("my_column_0"), col("my_column_1"), col("my_column_2"), col("my_column_3"), col("my_column_4"),] FROM
    Parquet SCAN [s3://redacted/data.parquet]
    PROJECT */9799 COLUMNS

When doing the concat without concurrent collect_schema (done in python).

When doing the concurrent collect_schema i don't seem to encounter this issue.

I notice with the concurrent collect_schema I see to hit a peak of 500MB/s in network throughput, whilst I get a 30MB/s peak with the pure polars (without python threading + collect_schema) solution.

@kszlim
Copy link
Contributor Author

kszlim commented Feb 3, 2025

The problem might exist here?
https://github.com/pola-rs/polars/blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs#L367

It seems like maybe this can be done concurrently (at least the calls for parsing the individual parquet metadata) but is being done in a serial blocking manner?

Not 100% sure, but maybe you can do a par_iter, but that might put async IO work on the CPU thread pool?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

1 participant