Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The meta passed to map_partitions is not used during graph optimization #571

Open
chuyuanliu opened this issue Feb 19, 2025 · 6 comments
Open

Comments

@chuyuanliu
Copy link

When using map_partitions with a known meta provided, the function will still be evaluated with typetracers if optimize_graph is turned on. The example below will print "test_func called with typetracer" once.

import awkward as ak
import dask_awkward as dak


def test_func(array):
    if ak.backend(array) == "typetracer":
        print("test_func called with typetracer")
    else:
        print("test_func called with array")
    return array


array = ak.Array({"test": [[1, 2, 3], [4, 5], [6, 7, 8]]})
test = dak.from_awkward(array, npartitions=1)
meta = ak.Array(array.layout.to_typetracer(forget_length=True))
result = test.map_partitions(test_func, meta=meta)
result.compute(optimize_graph=True)

Is this behavior expected? Will it be possible to store a copy of meta somewhere and return it during the optimization? This is useful when some operations inside the function do not accept typetracers as argument but the structure of the final returned array is determinate.

@martindurant
Copy link
Collaborator

@pfackeldey , please point to your declarative mapper function, perhaps with an example for this specific case. We need docs around this to be rock solid!

@chuyuanliu : the only way to automatically know what columns are used within a given function is to run it. Furthermore, if the output columns are not derived from the very same buffers coming in, we can no longer trace what is required. In practice, that would mean we cannot optimise away loading any column passed to an opaque function with map_partitions. A common pattern in the past has been to wrap a function and check for typetracers (as you hint), but @pfackeldey 's contribution makes this much nicer.

@agoose77
Copy link
Collaborator

@chuyuanliu if you want to touch all inputs to the function (i.e. mark all data buffers as required), then there is dak.length_zero_array_or_identity which should enable the function to be typetraced, by replacing all tracers with length-zero NumPy arrays.

@pfackeldey
Copy link
Collaborator

pfackeldey commented Feb 19, 2025

I think you're referring to #551. This was an attempt to make map_partitions more flexible. It supports a needs argument which lets you specify needed columns and thus skipping the typetracing step.

In general, I'd rather have #565 though instead of this declarative map (I'm considering closing #551). This new dak.manual.optimize_columns lets you optimize the IO projection ahead of time and thus skips typetracing afterwards. If you don't want IO optimization at all, you can also just disable it via config: https://dask-awkward.readthedocs.io/en/stable/how-to/configuration.html#optimization-specific-table (probably also as an argument to .compute?)

@martindurant
Copy link
Collaborator

probably also as an argument to .compute?

optimize=False turns off all algorithms, you can't turn off only dask-awkward's contribution this way.

@chuyuanliu
Copy link
Author

Thanks for all the comments.
@pfackeldey If I understand correctly, #565 usually still requires at least one run with typetracer in order to dump a set of necessary columns with dak.inspect.report_necessary_columns since the column dependencies in the realistic analyses are so complicated to be decided by hand. If that's the case, the untraceable functions still need to be bypassed somehow.

For the old code migration, #551 sounds like exactly what I need, but there is a case that is probably not covered by the current static needs argument: the required column depends on an additional argument, e.g.

def test_func(array: ak.Array, cond: str):
    match cond:
        case "cond1":
            # some untraceable operations with "col1" and "col2"
            ...
        case "cond2":
            # some untraceable operations with "col1" and "col3"
            ...

This may require the need to add all of the three columns, but only two are actually accessed. Since the cond in the example is not a delayed object, the actual column can be decided during a typetracer run (if possible). To remove the redundancy, it is helpful to let need accept a callable with exactly the same argument as the decorated function and return either a mapping of iterables of columns or a typetracer. For the example above, this can be:

def mock_func(array: ak.Array, cond: str):
    # array here can be a typetracer
    match cond:
        case "cond1":
            return {"array": ["col1", "col2"]}  # or a typetracer derived from array
            ...
        case "cond2":
            return {"array": ["col1", "col3"]}  # or a typetracer derived from array
            ...

For the new code written from scratch to work with dask awkward, wrap the "atomic" untraceable functions with dak.length_zero_array_or_identity maybe a better practice, given most of the errors come from to_numpy called on the typetracer.

@pfackeldey
Copy link
Collaborator

pfackeldey commented Feb 20, 2025

Hi @chuyuanliu,

@pfackeldey If I understand correctly, #565 usually still requires at least one run with typetracer in order to dump a set of necessary columns with dak.inspect.report_necessary_columns since the column dependencies in the realistic analyses are so complicated to be decided by hand. If that's the case, the untraceable functions still need to be bypassed somehow.

Yes, that is true. This mechanism allows you to run the typetracer to infer all needed columns, and then add by hand the missing ones for other if-branches. This may be not a perfect workflow, but this is the price we're paying for the 'sharp bits' of a tracing mechanism.

Problematic if-branches are (afaik) the ones that depend on the current partition number (which a user won't ever have access to during execution of a single partition I think), if-branches that depend on global variables that are different at trace and execution time, or if-branches that depend on numeric values of columns.

We're currently in the process of adding lazy (or virtual) arrays to awkward-array. My plan is to add them afterwards to uproot.dask (we should to that for dak.from_parquet aswell). The idea here is that all columns inferred by typetracing will be loaded ahead of time, but the remaining ones will be lazily loadable if needed (e.g. by an untraced if-branch) at execution time. This will solve this issue automatically at the minimal inefficiency cost of additional read calls.

My current understanding for the output mocking is that if you do the "manual" column projection/optimization, and then provide a meta to map_partitions there won't be any tracing, because all input layers are marked as 'already-optimized/non-projectable'. That way, no typetracer is created in the first place that could trace it, and the provided meta will be used as output instead of executing the mapped function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants