Skip to content

Commit

Permalink
Merge pull request #238 from PrefectHQ/more-map
Browse files Browse the repository at this point in the history
Map for LocalExecutor
  • Loading branch information
cicdw authored Oct 1, 2018
2 parents aba6cf0 + 8fb9a28 commit 3d78398
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Add `debug` kwarg to `DaskExecutor` for optionally silencing dask logs - [#209](https://github.com/PrefectHQ/prefect/issues/209)
- Update `BokehRunner` for visualizing mapped tasks - [#220](https://github.com/PrefectHQ/prefect/issues/220)
- Env var configuration settings are typed - [#204](https://github.com/PrefectHQ/prefect/pull/204)
- Implement `map` functionality for the `LocalExecutor` - [#233](https://github.com/PrefectHQ/prefect/issues/233)

### Fixes

Expand Down
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sidebarDepth: 1
- Update `BokehRunner` for visualizing mapped tasks
- Env var configuration settings are typed
- Support for user configuration files
- Implement `map` functionality for the `LocalExecutor`

### Fixes

Expand Down
1 change: 0 additions & 1 deletion src/prefect/engine/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
- `LocalExecutor`: the no frills, straightforward executor - great for simple
debugging; tasks are executed immediately upon being called by `executor.submit()`.
Note that the `map` feature is currently _not_ supported with this executor.
- `SynchronousExecutor`: an executor that runs on `dask` primitives with the
synchronous dask scheduler; currently the default executor
- `DaskExecutor`: the most feature-rich of the executors, this executor runs
Expand Down
14 changes: 14 additions & 0 deletions src/prefect/engine/executors/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Licensed under LICENSE.md; also available at https://www.prefect.io/licenses/alpha-eula

from typing import Any, Callable, Iterable

from prefect.engine.executors.base import Executor
from prefect.utilities.executors import dict_to_list


class LocalExecutor(Executor):
Expand All @@ -9,6 +12,17 @@ class LocalExecutor(Executor):
the local thread. To be used mainly for debugging purposes.
"""

def map(
self, fn: Callable, *args: Any, upstream_states=None, **kwargs: Any
) -> Iterable[Any]:

states = dict_to_list(upstream_states)
results = []
for elem in states:
results.append(self.submit(fn, *args, upstream_states=elem, **kwargs))

return results

def submit(self, fn, *args, **kwargs):
"""
Submit a function to the executor for execution. Returns the result of the computation.
Expand Down
66 changes: 49 additions & 17 deletions tests/core/test_task_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def test_calling_map_with_bind_returns_self():
assert res is a


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_spawns_new_tasks(executor):
ll = ListTask()
a = AddTask()
Expand All @@ -64,7 +66,9 @@ def test_map_spawns_new_tasks(executor):
assert [r.result for r in slist] == [2, 3, 4]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_over_parameters(executor):
a = AddTask()

Expand All @@ -80,7 +84,9 @@ def test_map_over_parameters(executor):
assert [r.result for r in slist] == [2, 3, 4]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_internally_returns_a_list(executor):
ll = ListTask()
ii = IdTask()
Expand All @@ -93,7 +99,9 @@ def test_map_internally_returns_a_list(executor):
assert s.result[res].result == [2, 3, 4]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_composition(executor):
ll = ListTask()
a = AddTask()
Expand All @@ -109,7 +117,9 @@ def test_map_composition(executor):
assert [r.result for r in slist] == [3, 4, 5]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_multiple_map_arguments(executor):
ll = ListTask()
a = AddTask()
Expand All @@ -125,7 +135,9 @@ def test_multiple_map_arguments(executor):
assert [r.result for r in slist] == [2, 4, 6]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_failures_dont_leak_out(executor):
ii = IdTask()
ll = ListTask()
Expand All @@ -143,7 +155,9 @@ def test_map_failures_dont_leak_out(executor):
assert [r.result for r in slist] == [None, 1, 0.5]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_can_handle_fixed_kwargs(executor):
ll = ListTask()
a = AddTask()
Expand All @@ -159,7 +173,9 @@ def test_map_can_handle_fixed_kwargs(executor):
assert [r.result for r in slist] == [6, 7, 8]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_can_handle_nonkeyed_upstreams(executor):
ll = ListTask()

Expand All @@ -174,7 +190,9 @@ def test_map_can_handle_nonkeyed_upstreams(executor):
assert [r.result for r in slist] == [[1, 2, 3] for _ in range(3)]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_can_handle_nonkeyed_mapped_upstreams(executor):
ii = IdTask()
ll = ListTask()
Expand All @@ -191,7 +209,9 @@ def test_map_can_handle_nonkeyed_mapped_upstreams(executor):
assert [r.result for r in slist] == [[1, 2, 3] for _ in range(3)]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_can_handle_nonkeyed_nonmapped_upstreams_and_mapped_args(executor):
ii = IdTask()
ll = ListTask()
Expand All @@ -207,7 +227,9 @@ def test_map_can_handle_nonkeyed_nonmapped_upstreams_and_mapped_args(executor):
assert [r.result for r in slist] == [[1 + i, 2 + i, 3 + i] for i in range(3)]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_tracks_non_mapped_upstream_tasks(executor):
div = DivTask()

Expand All @@ -230,7 +252,9 @@ def register(x):
)


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_allows_for_retries(executor):
ii = IdTask()
ll = ListTask()
Expand Down Expand Up @@ -259,7 +283,9 @@ def test_map_allows_for_retries(executor):
assert [s.result for s in new] == [100, 1.0, 0.5]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_can_handle_nonkeyed_mapped_upstreams_and_mapped_args(executor):
ii = IdTask()
ll = ListTask()
Expand All @@ -276,7 +302,7 @@ def test_map_can_handle_nonkeyed_mapped_upstreams_and_mapped_args(executor):
assert [r.result for r in slist] == [[1 + i, 2 + i, 3 + i] for i in range(3)]


@pytest.mark.parametrize("executor", ["mproc", "mthread"], indirect=True)
@pytest.mark.parametrize("executor", ["local", "mproc", "mthread"], indirect=True)
def test_map_behaves_like_zip_with_differing_length_results(executor):
"Tests that map stops combining elements after the smallest list is exhausted."

Expand Down Expand Up @@ -318,7 +344,9 @@ def ll(n):
)


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_map_works_with_retries_and_cached_states(executor):
"""
This test isn't meant to test the correct way of handling caching for mapped
Expand Down Expand Up @@ -389,7 +417,9 @@ def rec(s):
state = f.run(executor=executor)


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_task_map_can_be_passed_to_upstream_with_and_without_map(executor):
@prefect.task
def ll():
Expand All @@ -415,7 +445,9 @@ def append_four(l):
assert [s.result for s in state.result[again]] == [3, 4, 5]


@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True)
@pytest.mark.parametrize(
"executor", ["local", "sync", "mproc", "mthread"], indirect=True
)
def test_task_map_doesnt_assume_purity_of_functions(executor):
@prefect.task
def ll():
Expand Down

0 comments on commit 3d78398

Please sign in to comment.