From 9e658d11c230c31fc5b62199b87943f039054de4 Mon Sep 17 00:00:00 2001 From: Chris White Date: Mon, 1 Oct 2018 10:14:01 -0700 Subject: [PATCH 1/3] I cant believe how easy that was (to implement map for the local executor) --- src/prefect/engine/executors/local.py | 14 ++++++ tests/core/test_task_map.py | 66 ++++++++++++++++++++------- 2 files changed, 63 insertions(+), 17 deletions(-) diff --git a/src/prefect/engine/executors/local.py b/src/prefect/engine/executors/local.py index 3097c5932189..f41379f2d987 100644 --- a/src/prefect/engine/executors/local.py +++ b/src/prefect/engine/executors/local.py @@ -1,6 +1,9 @@ # Licensed under LICENSE.md; also available at https://www.prefect.io/licenses/alpha-eula +from typing import Any, Callable, Iterable + from prefect.engine.executors.base import Executor +from prefect.utilities.executors import dict_to_list class LocalExecutor(Executor): @@ -9,6 +12,17 @@ class LocalExecutor(Executor): the local thread. To be used mainly for debugging purposes. """ + def map( + self, fn: Callable, *args: Any, upstream_states=None, **kwargs: Any + ) -> Iterable[Any]: + + states = dict_to_list(upstream_states) + results = [] + for elem in states: + results.append(self.submit(fn, *args, upstream_states=elem, **kwargs)) + + return results + def submit(self, fn, *args, **kwargs): """ Submit a function to the executor for execution. Returns the result of the computation. diff --git a/tests/core/test_task_map.py b/tests/core/test_task_map.py index a7961eb0d0ce..931f3fcaf6c1 100644 --- a/tests/core/test_task_map.py +++ b/tests/core/test_task_map.py @@ -48,7 +48,9 @@ def test_calling_map_with_bind_returns_self(): assert res is a -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_spawns_new_tasks(executor): ll = ListTask() a = AddTask() @@ -64,7 +66,9 @@ def test_map_spawns_new_tasks(executor): assert [r.result for r in slist] == [2, 3, 4] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_over_parameters(executor): a = AddTask() @@ -80,7 +84,9 @@ def test_map_over_parameters(executor): assert [r.result for r in slist] == [2, 3, 4] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_internally_returns_a_list(executor): ll = ListTask() ii = IdTask() @@ -93,7 +99,9 @@ def test_map_internally_returns_a_list(executor): assert s.result[res].result == [2, 3, 4] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_composition(executor): ll = ListTask() a = AddTask() @@ -109,7 +117,9 @@ def test_map_composition(executor): assert [r.result for r in slist] == [3, 4, 5] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_multiple_map_arguments(executor): ll = ListTask() a = AddTask() @@ -125,7 +135,9 @@ def test_multiple_map_arguments(executor): assert [r.result for r in slist] == [2, 4, 6] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_failures_dont_leak_out(executor): ii = IdTask() ll = ListTask() @@ -143,7 +155,9 @@ def test_map_failures_dont_leak_out(executor): assert [r.result for r in slist] == [None, 1, 0.5] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_can_handle_fixed_kwargs(executor): ll = ListTask() a = AddTask() @@ -159,7 +173,9 @@ def test_map_can_handle_fixed_kwargs(executor): assert [r.result for r in slist] == [6, 7, 8] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_can_handle_nonkeyed_upstreams(executor): ll = ListTask() @@ -174,7 +190,9 @@ def test_map_can_handle_nonkeyed_upstreams(executor): assert [r.result for r in slist] == [[1, 2, 3] for _ in range(3)] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_can_handle_nonkeyed_mapped_upstreams(executor): ii = IdTask() ll = ListTask() @@ -191,7 +209,9 @@ def test_map_can_handle_nonkeyed_mapped_upstreams(executor): assert [r.result for r in slist] == [[1, 2, 3] for _ in range(3)] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_can_handle_nonkeyed_nonmapped_upstreams_and_mapped_args(executor): ii = IdTask() ll = ListTask() @@ -207,7 +227,9 @@ def test_map_can_handle_nonkeyed_nonmapped_upstreams_and_mapped_args(executor): assert [r.result for r in slist] == [[1 + i, 2 + i, 3 + i] for i in range(3)] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_tracks_non_mapped_upstream_tasks(executor): div = DivTask() @@ -230,7 +252,9 @@ def register(x): ) -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_allows_for_retries(executor): ii = IdTask() ll = ListTask() @@ -259,7 +283,9 @@ def test_map_allows_for_retries(executor): assert [s.result for s in new] == [100, 1.0, 0.5] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_can_handle_nonkeyed_mapped_upstreams_and_mapped_args(executor): ii = IdTask() ll = ListTask() @@ -276,7 +302,7 @@ def test_map_can_handle_nonkeyed_mapped_upstreams_and_mapped_args(executor): assert [r.result for r in slist] == [[1 + i, 2 + i, 3 + i] for i in range(3)] -@pytest.mark.parametrize("executor", ["mproc", "mthread"], indirect=True) +@pytest.mark.parametrize("executor", ["local", "mproc", "mthread"], indirect=True) def test_map_behaves_like_zip_with_differing_length_results(executor): "Tests that map stops combining elements after the smallest list is exhausted." @@ -318,7 +344,9 @@ def ll(n): ) -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_map_works_with_retries_and_cached_states(executor): """ This test isn't meant to test the correct way of handling caching for mapped @@ -389,7 +417,9 @@ def rec(s): state = f.run(executor=executor) -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_task_map_can_be_passed_to_upstream_with_and_without_map(executor): @prefect.task def ll(): @@ -415,7 +445,9 @@ def append_four(l): assert [s.result for s in state.result[again]] == [3, 4, 5] -@pytest.mark.parametrize("executor", ["sync", "mproc", "mthread"], indirect=True) +@pytest.mark.parametrize( + "executor", ["local", "sync", "mproc", "mthread"], indirect=True +) def test_task_map_doesnt_assume_purity_of_functions(executor): @prefect.task def ll(): From 5f8124630bc384f33fad6d4dc10189588fc171df Mon Sep 17 00:00:00 2001 From: Chris White Date: Mon, 1 Oct 2018 10:15:07 -0700 Subject: [PATCH 2/3] Add entry to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c4c18d078f1..47396b3760f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - added new `flow.replace()` method for swapping out tasks within flows - [#230](https://github.com/PrefectHQ/prefect/pull/230) - add `debug` kwarg to `DaskExecutor` for optionally silencing dask logs - [#209](https://github.com/PrefectHQ/prefect/issues/209) - update `BokehRunner` for visualizing mapped tasks - [#220](https://github.com/PrefectHQ/prefect/issues/220) +- implement `map` functionality for the `LocalExecutor` - [#233](https://github.com/PrefectHQ/prefect/issues/233) ### Fixes From eb5cd8dd58eefe41a73338da7bce3e1b1a658e94 Mon Sep 17 00:00:00 2001 From: Chris White Date: Mon, 1 Oct 2018 10:28:06 -0700 Subject: [PATCH 3/3] Update executor doc as well --- src/prefect/engine/executors/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/prefect/engine/executors/__init__.py b/src/prefect/engine/executors/__init__.py index 798e5b5acf83..4dbe7b0e05fb 100644 --- a/src/prefect/engine/executors/__init__.py +++ b/src/prefect/engine/executors/__init__.py @@ -20,7 +20,6 @@ - `LocalExecutor`: the no frills, straightforward executor - great for simple debugging; tasks are executed immediately upon being called by `executor.submit()`. - Note that the `map` feature is currently _not_ supported with this executor. - `SynchronousExecutor`: an executor that runs on `dask` primitives with the synchronous dask scheduler; currently the default executor - `DaskExecutor`: the most feature-rich of the executors, this executor runs