Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fail to "vectorize" velocity_from_position #68

Open
selipot opened this issue Jan 7, 2023 · 22 comments
Open

fail to "vectorize" velocity_from_position #68

selipot opened this issue Jan 7, 2023 · 22 comments
Assignees
Labels
bug Something isn't working question Further information is requested

Comments

@selipot
Copy link
Member

selipot commented Jan 7, 2023

I am attempting to apply velocity_from_position to xarray.DataArrays of lon, lat, and time. I have been following a tutorial for a similar situation. With the following ds Dataset:

ds.info()
xarray.Dataset {
dimensions:
	trajectory = 593297 ;
	obs = 1440 ;

variables:
	float64 time(trajectory, obs) ;
	float32 lat(trajectory, obs) ;
	float32 lon(trajectory, obs) ;
	int32 obs(obs) ;
	int64 trajectory(trajectory) ;
}

I can easily do:

u,v = velocity_from_position(ds.lon.isel(trajectory=0),ds.lat.isel(trajectory=0),ds.time.isel(trajectory=0))

or

u2,v2 = xr.apply_ufunc(
    velocity_from_position,
    ds.lon.isel(trajectory=0),
    ds.lat.isel(trajectory=0),
    ds.time.isel(trajectory=0),
    input_core_dims=[["obs"], ["obs"], ["obs"]],
    output_core_dims=[["obs"], ["obs"]],
    dask="allowed",
)

but the following fails:

u2,v2 = xr.apply_ufunc(
    velocity_from_position,  # first the function
    ds.lon.isel(trajectory=slice(0,10)),
    ds.lat.isel(trajectory=slice(0,10)),
    ds.time.isel(trajectory=slice(0,10)),
    input_core_dims=[["obs"], ["obs"], ["obs"]],
    output_core_dims=[["obs"],["obs"]],
    dask="allowed",
    vectorize=True,
)

and the bottom line of the error is

File ~/miniconda3/envs/research/lib/python3.10/site-packages/clouddrift/analysis.py:65, in velocity_from_position(x, y, time, coord_system, difference_scheme)
     57 # Compute dx, dy, and dt
     58 if difference_scheme == "forward":
     59 
     60     # All values except the ending boundary value are computed using the
   (...)
     63 
     64     # Time
---> 65     dt[:-1] = np.diff(time)
     66     dt[-1] = dt[-2]
     68     # Space

ValueError: could not broadcast input array from shape (10,1439) into shape (9,1440)

So yes, I get the error but I don't understand if the fix is to apply ufunc differently or make velocity_from_position more flexible?

@milancurcic
Copy link
Member

It should work if you apply the function to one trajectory at a time. I think the function could be made to correctly handle positions and times as 2-d arrays.

@selipot
Copy link
Member Author

selipot commented Jan 7, 2023

Indeed, it does work as shown above but it should be able to be fed to xarray.apply_ufunc. So I do not know not know if it is an issue with the function or my code. Maybe @philippemiron has some insight?

@milancurcic
Copy link
Member

I haven't used appy_ufunc before, but your code may very well be correct. velocity_from_position definitely does not work on 2-d arrays in its current form. We could make it so, we just need to discuss and decide whether there's a good use case for that, considering that Lagrangian data will be ragged arrays most of the time.

@selipot
Copy link
Member Author

selipot commented Jan 7, 2023

I am not sure that's the solution. The example linked above examines the case of a function that takes 1-d arrays as an input, like our function.

@milancurcic
Copy link
Member

     64     # Time
---> 65     dt[:-1] = np.diff(time)
     66     dt[-1] = dt[-2]
     68     # Space

ValueError: could not broadcast input array from shape (10,1439) into shape (9,1440)

The issue here when using our function is that np.diff() differentiates along the last (fastest-varying) axis, whereas the slice syntax (i.e. dt[:-1]) slices along the first (slowest-varying) axis. In the case of 1-d array inputs, the first and last axes are the same. But in the case of a 2-d array inputs, we get a shape mismatch.

@selipot
Copy link
Member Author

selipot commented Jan 8, 2023

To be more useful and widely applicable the function should take an argument indicating the dimension along which to conduct the time derivative operation so that it is applicable to arrays of any dimension. In addition, it should also be able to handle ragged arrays of position.

@philippemiron
Copy link
Contributor

It's been a few months since I played with apply_ufunc. If I remember correctly from the EarthCube Notebook, the functions are applied by chunk; I'm not sure it would work as expected using slice...

I believe the easiest would be to use Awkward Array and perform a simple loop:

u,v = ak.zeros_like(lon), ak.zeros_like(lat)   # with lon,lat ak.Array
for i in range(0, len(lon)):
   u[i], v[i] = velocity_from_position(lon[i], lat[i])

@selipot
Copy link
Member Author

selipot commented Jan 9, 2023

I'll try that. But will it be fast and use dask/parallel computing? As an example I have 5.5M 60-day long hourly trajectories ... so perhaps if I chunk per trajectories? In general we need the analysis function of clouddrift to be "vectorizable" in a seamless way for users of xarray and awkward.

@milancurcic
Copy link
Member

I haven't used Dask, but this looks like should do the job: https://examples.dask.org/applications/embarrassingly-parallel.html. On its own, I don't think Philippe's for-loop snippet will run in parallel.

@selipot
Copy link
Member Author

selipot commented Jan 10, 2023

More developments: I found that the following works if the input DataArrays are not chunked:

u,v = xr.apply_ufunc(
    velocity_from_position,  # first the function
    ds.lon.isel(trajectory=slice(0,10000)),  # now arguments in the order expected by 'velocity_from_position'
    ds.lat.isel(trajectory=slice(0,10000)),
    ds.time.isel(trajectory=slice(0,10000)),
    #input_core_dims=[[], ['time']],
    input_core_dims=[["obs"],["obs"], ["obs"]],
    output_core_dims=[["obs"], ["obs"]],
    vectorize=True,
)

but if the dataArrays are chunked then we need the following : (note the option dask="allowed" and the .load() function)

u2,v2 = xr.apply_ufunc(
    velocity_from_position,  # first the function
    dsc.lon.isel(trajectory=slice(0,10000)).load(),  # now arguments in the order expected by 'velocity_from_position'
    dsc.lat.isel(trajectory=slice(0,10000)).load(),
    dsc.time.isel(trajectory=slice(0,10000)).load(),
    #input_core_dims=[[], ['time']],
    input_core_dims=[["obs"],["obs"], ["obs"]],
    output_core_dims=[["obs"], ["obs"]],
    dask="allowed",
    vectorize=True,
)

Anyone understand why the load function is needed?

Note that the above applies to a subset of my example dataset, 10000 out of 593297 trajectories. If I try to apply this to the entire dataset, the first option succeeds on my desktop but the chunked option fails with a local cluster, which I am finding odd ...

@selipot
Copy link
Member Author

selipot commented Jan 12, 2023

Note that the chunked case above works only if the size of the chunks in the trajectory dimension is 1. That is if the function is fed 1-d array arguments I think. So the way forward appears to modify our function to handle n-d array.

@milancurcic
Copy link
Member

This is now implemented in the main branch. Give it a try. And I will try running directly with Dask (without the Xarray wrapper) and see if that produces the expected result.

@milancurcic
Copy link
Member

FWIW, I ran velocity_from_position on inputs of shape (600, 1000) (traj, obs) using dask.delayed on my 6-core laptop. The result is the same (and correct) as if I run the function without Dask, which is a good outcome. I don't get any speed up, though, and actually the Dask variant is slightly slower. It's possible that the inputs are too small to yield any benefit from parallelism.

@selipot
Copy link
Member Author

selipot commented Jan 13, 2023

@milancurcic Can you share that test code please?

@philippemiron
Copy link
Contributor

philippemiron commented Jan 13, 2023

I think it should more efficient to use Dask.bag instead of delayed due to the the high number of items. See Avoid too many tasks in the Dask best practices.

@milancurcic
Copy link
Member

Here's my original snippet.

from clouddrift.analysis import velocity_from_position
import dask
import numpy as np

num_obs = 1000
num_traj = 6000

lon = np.reshape(np.tile(np.linspace(-180, 180, num_obs), num_traj), (num_traj, num_obs))

lat = np.zeros((lon.shape))
for n in range(num_traj):
    lat[n] = n / num_traj * 60 # from 0 to 60N

time = np.reshape(np.tile(np.linspace(0, 1e7, num_obs), num_traj), (num_traj, num_obs))

client = dask.distributed.Client(threads_per_worker=6, n_workers=1)
velocity_from_position_parallel = dask.delayed(velocity_from_position)
res = velocity_from_position_parallel(lon, lat, time)
u, v = res.compute()

Too many tasks is not the problem here, actually the opposite. I make only one function call with dask.delayed, and that doesn't seem to be what it's made for. After reading more about dask.delayed, it seems to be made for running multiple functions concurrently (sigh). Despite this, the dask.delayed variant of velocity_from_position runs twice as fast on my laptop vs. the serial execution of the function if I increase num_traj from 600 to 6000. I don't understand why that is.

I'll experiment with other dask idioms as well as dask+xarray.

@philippemiron
Copy link
Contributor

I initially thought you were calling the function num_traj times. I think the use case of dask would be when the number of trajectories is two high to be performed all at once.

So something like this could replace the calculation part:

n_chunks = 10

delayed_results = []
for i in range(0, n_chunks-1):
    arr = range(i*int(num_traj/n_chunks), min(num_traj, (i+1)*int(num_traj/n_chunks))) 
    res = dask.delayed(velocity_from_position)(lon[arr], 
                                               lat[arr], 
                                               time[arr])
    delayed_results.append(res)

results = dask.compute(delayed_results)

I don't have super large dataset to test this with, but maybe @selipot could give it a try?

@milancurcic
Copy link
Member

You're right. I naively assumed that Dask would chunk the data for me under the hood.

@philippemiron
Copy link
Contributor

philippemiron commented Mar 25, 2023

With apply_ragged this should be possible right? cc @selipot

@selipot
Copy link
Member Author

selipot commented May 17, 2023

Yes I think this issue is old before apply_ragged was written. But I don't fully understand how apply_ragged is parallelized. No chunk or dask involved?

@philippemiron
Copy link
Contributor

philippemiron commented May 17, 2023

The function is executed on a different thread for each trajectory and the results are reassembled afterwards. See this section,

# parallel execution
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    res = executor.map(lambda x: func(*x, **kwargs), iter)
# concatenate the outputs
res = [item if isinstance(item, Iterable) else [item] for item in res]

max_workers = None by default which is set to: Default value of max_workers is changed to min(32, os.cpu_count() + 4), according to the concurrent.futures doc. But you can also set the parameter.

def apply_ragged(
    func: callable,
    arrays: list[np.ndarray],
    rowsize: list[int],
    *args: tuple,
    max_workers: int = None,
    **kwargs: dict,
)

@milancurcic
Copy link
Member

We can benchmark, but my understanding is that the concurrency (different from parallelism) used in apply_ragged will maybe parallelize some functions sometimes. Functions that do slow stuff like I/O, downloading data (and perhaps even plotting) are likely to run faster. CPU-bound stuff (numerical calculations) are likely to run similarly or slower.

I think this issue intended to ask how to run things truly in parallel, i.e. distribute the computation on different CPUs (whether via Dask, MPI, multiprocessing, or otherwise). I don't think we have a solution yet, but getting it to run with Dask is probably the lowest-hanging fruit among the possible approaches.

(and vectorization is yet a different concept from concurrency and parallelization; it's about putting multiple array elements into a long register and running one operation on all of them.)

@kevinsantana11 kevinsantana11 added bug Something isn't working question Further information is requested labels Apr 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants