Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evaluate efficiency in querying nested fields in Parquet of different packages #37

Open
kpto opened this issue Jan 24, 2025 · 2 comments
Labels
enhancement New feature or request refactoring This issue is a refactoring

Comments

@kpto
Copy link
Collaborator

kpto commented Jan 24, 2025

While working on issue #36, I quickly came across a question that how flexible in querying nested fields the API should be. If an user only wants the information from a nested column, should my API be able to query just the nested column and return it like a separated dataset or the user can always just query the surface columns and he needs to unfold the nested columns under himself? This then leads to another question, is Parquet even designed to be queried in this fashion and can a package leverage such a design? If they are not then there is no point in providing a flexible API. Even if they are, in our current stage I tend to keep our design simple unless we absolutely need to. Still, I want to understand more about Parquet as I think it is important for later development so I conducted a study anyway and this issue serves as a record of my findings. This also serves as a notebook of how to query nested fields.

Parquet is capable to be queried just nested columns:

A quick search revealed that Parquet is designed to do so. Indeed a Parquet file only records leaf columns so for the following schema,

{
    "id": str,
    "struct": {
        "nested_arr": [
            {
                "field_a": str,
                "field_b": str
            },
            ...
        ]
    }
}

there are just 3 columns in a Parquet file, id, struct.nested_arr.field_a and struct.nested_arr.field_b. Two additional information for each row of a nested column repetition level and definition level is used to reconstruct the hierarchy. The internet has a lot of explanation regarding this and here is one of them from the official Parquet specification: https://github.com/julienledem/redelm/wiki/The-striping-and-assembly-algorithms-from-the-Dremel-paper

Knowing this, it means in theory it is possible to query just the field_b and imagine if a user only needs it and is querying over cloud storage, this would be beneficial as a lot of data could be skipped. The next question is are existing packages implemented to leverage this design.

Summary

In short, yes for pyarrow and polars, but only when the schema is simple. A perfect yes for duckdb it seems surprisingly. A reminder though their internal implementation is very different so the interpretation could be incorrect.

Due to the separated computing server design of pyspark, it's behaviour is unstable and debugging is difficult so I excluded it after a few tries.

Benchmark

Fake data generation

A fake large Parquet with the following schema is generated.

{
    "id": str,
    {
        "struct_arr": [
            {
                "arr_a": str[],
                "arr_b": str[]
            },
            ...
        ],
        "str": str
    }

The following SQL is used to generate the Parquet using duckdb.

COPY (
    SELECT
        CAST(gen_random_uuid() AS STRING) AS id,
        {
            struct_arr: [
                {
                    arr_a: [CAST(gen_random_uuid() AS STRING)],
                    arr_b: [CAST(gen_random_uuid() AS STRING) FOR i IN range(1000000)]
                }
            ],
            str: CAST(gen_random_uuid() AS STRING)
        } AS struct,
    FROM range(10)
)
TO 'fake-large.parquet'

The nested field arr_b under the struct array struct_arr is very long so when this field is scanned, it should significantly slows down the process. If a Parquet query engine is perfectly implemented, it should be able to skip it unless we want it. The script produces a file than is around 350MB, I tried a larger file at first but quickly found that a larger file put a larger stress on memory and frequent memory management renders tests not comparable. A smaller but repeated test does a better job.

Tests

A test against a package includes the following queries, to minimise the overhead, streaming is also not used here:
full: scanning the whole file as is
id: the easiest one, just querying a surface column, should be lightning fast
struct: the surface column that includes the long array nested field, providing a ground reference
struct.str: the easier version of nested field query, a nested field under a struct
struct.struct_arr: the nested field that contains the long nested array, another reference
***struct.struct_arr.arr_a: the hardest part, a short array under a struct under an array. In theory the query should be fast
struct.struct_arr.arr_b: the long array being an obstacle at the same level as the arr_a, another reference

timeit was used to time a query, each query is repeated 3 times and the min was taken.

from timeit import repeat
from typing import Callable

def run(functions: list[Callable]):
    for func in functions:
        print(f"{func.__name__}: {min(repeat(func, number=1))}")

polars

import polars as pl
from base import run

def query(projection):
    q = pl.scan_parquet("fake-large.parquet")
    if projection is not None:
        q = q.select(projection)
    q.collect()

def full():
    query(None)

def id():
    query(["id"])

def struct():
    query(["struct"])

def struct_str():
    query(pl.col("struct").struct.field("str"))

def struct_arr():
    query(pl.col("struct").struct.field("struct_arr"))

def struct_arr_arr_a():
    query(pl.col("struct").struct.field("struct_arr").explode().struct.field("arr_a"))

def struct_arr_arr_b():
    query(pl.col("struct").struct.field("struct_arr").explode().struct.field("arr_b"))

run([full, id, struct, struct_str, struct_arr, struct_arr_arr_a, struct_arr_arr_b])
full: 0.6195595001336187
id: 0.0003410999197512865
struct: 0.6696877002250403
struct_str: 0.6695178998634219
struct_arr: 0.6743880999274552
struct_arr_arr_a: 0.7422495998907834
struct_arr_arr_b: 0.7367976000532508

Querying id is lightning fast comparing to full query showing that it can query surface columns, yet, no significant difference between all nested fields so it seems polars cannot query individual nested columns but only pulling the whole record at once.

pyarrow

from pyarrow import dataset as ds
import pyarrow as pa
import pyarrow.compute as pc
from base import run

def query(projection):
    dataset = ds.dataset("fake-large.parquet")
    if projection is not None:
        dataset.to_table(columns=projection)
    dataset.to_table()

def full():
    query(None)

def id():
    query(["id"])

def struct():
    query(["struct"])

def struct_str():
    query(["struct.str"])

def struct_arr():
    query(["struct.struct_arr"])

def struct_arr_arr_a():
    nav = pc.field("struct", "struct_arr").cast(pa.list_(pa.struct([pa.field("arr_a", pa.list_(pa.string()))])))
    query({ "col": nav })

def struct_arr_arr_b():
    nav = pc.field("struct", "struct_arr").cast(pa.list_(pa.struct([pa.field("arr_b", pa.list_(pa.string()))])))
    query({ "col": nav })

run([full, id, struct, struct_str, struct_arr, struct_arr_arr_a, struct_arr_arr_b])
full: 0.6816763000097126
id: 0.6916280998848379
struct: 1.3991429000161588
struct_str: 1.3926803001668304
struct_arr: 1.3659655000083148
struct_arr_arr_a: 1.366409600013867
struct_arr_arr_b: 1.3437673998996615

Strangely querying everything and querying just the id is equally fast, I don't know how to explain this. pyarrow also seems to not be able to query individual nested columns. I cannot be sure is type casting the right way to query nested columns under an array but this is the only method I could find.

duckdb

import duckdb
from base import run

def query(sql):
    duckdb.sql(sql).fetchall()

def full():
    sql = "SELECT * FROM read_parquet('fake-large.parquet')"
    query(sql)

def id():
    sql = "SELECT id FROM read_parquet('fake-large.parquet')"
    query(sql)

def struct():
    sql = "SELECT struct FROM read_parquet('fake-large.parquet')"
    query(sql)

def struct_str():
    sql = "SELECT struct.str FROM read_parquet('fake-large.parquet')"
    query(sql)

def struct_arr():
    sql = "SELECT struct.struct_arr FROM read_parquet('fake-large.parquet')"
    query(sql)

def struct_arr_arr_a():
    sql = "SELECT arr_a FROM (SELECT unnest(struct.struct_arr, max_depth := 2) FROM read_parquet('fake-large.parquet'))"
    query(sql)

def struct_arr_arr_b():
    sql = "SELECT arr_b FROM (SELECT unnest(struct.struct_arr, max_depth := 2) FROM read_parquet('fake-large.parquet'))"
    query(sql)

run([full, id, struct, struct_str, struct_arr, struct_arr_arr_a, struct_arr_arr_b])
full: 4.941312199924141
id: 0.0006832999642938375
struct: 4.80762069998309
struct_str: 1.1204313000198454
struct_arr: 4.696077500004321
struct_arr_arr_a: 1.11207429994829
struct_arr_arr_b: 4.66957230004482

This is a surprising one, while overall duckdb is the slowest, the time of different nested columns queries do suggest that duckdb i capable to query individual nested columns. Since struct_arr_arr_a always has only one element, the column lengths of struct_str and struct_arr_arr_a in the Parquet file should be roughly the same and the query time seems to agree with it. full, struct, struct_arr and struct_arr_arr_b all have a similar query time because all of them involves scanning the longest column. All in all the result is sensible and suggests that surprisingly duckdb has the best implementation in terms of leveraging Parquet design even though it's designed to be a generic universal query engine against many formats.

Remarks

Even if a package is perfect, providing a fully flexible API that can reflect it's capability is perhaps over-optimisation, at least for now. In the future this may be beneficial if querying over cloud becomes common. This study though made me have a better understanding of Parquet format and existing data science packages. Since polars still struggles to stream, polars reminds out of consideration. It took me a while to figure out how to deal with pyarrow high memory usage. It has a default, aggressive caching policy which could be turned off by supplying fragment_scan_options=ds.ParquetFragmentScanOptions(pre_buffer=False). While duckdb is slower but their attention to details continuously impress me. pyarrow is more like a low level Parquet file inspector then a query engine. It's API is harder to use but it is the most feature completed in terms of Parquet support. pyspark often crashes with no debugging information available right away due to it's separated computing design. When the computing server gives up, the Python client got a connection closed exception rather than the cause which is unhelpful. It's high maintenance cost makes it remain out of consideration. pyarrow and duckdb right now are my top candidates.

This issue serves as a subtask of new API design #36, will be closed once it is finished.

@kpto kpto added enhancement New feature or request refactoring This issue is a refactoring labels Jan 24, 2025
@kpto kpto added this to the Version v0.4.0 milestone Jan 24, 2025
@kpto kpto added this to OTAR3088 Jan 24, 2025
@slobentanzer
Copy link
Collaborator

@kpto nice, very informative!

pyarrow is more like a low level Parquet file inspector then a query engine

What does this mean, practically? Isn't pyarrow the programmatic Python implementation of Arrow (so should also be for querying)?

@kpto
Copy link
Collaborator Author

kpto commented Jan 24, 2025

@slobentanzer Sorry for confusing you, I mean pyarrow is more focused on low level comprehensive inspection and manipulation of a Parquet file then seeing it as a high level data container but it still equips a lot of computation which makes it useful for data processing. pyarrow is able to inspect the file structure of a Parquet file, including all metadata and even row groups. You can as well construct a new Parquet file from the ground up. polars users often need to use pyarrow for these functionalities.

That being said, I just found that duckdb actually offers a similarly comprehensive API for Parquet files, amazing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request refactoring This issue is a refactoring
Projects
Status: No status
Development

No branches or pull requests

2 participants