From 116ddd015e81d2a6930a0dfbe83e630e526634f4 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 20 Feb 2023 16:56:14 +0800 Subject: [PATCH] Add new function to specify namespace for `KedroSession` and `kedro run` (#2306) * Add new namespace options for KedroSession and `kedro run` Signed-off-by: Nok Chan * Change the shortcut from `--ns` to `-ns` to be consistent Signed-off-by: Nok Chan * Fix lint Signed-off-by: Nok Chan * Fix broken tests and update new test for namespace argument Signed-off-by: Nok Chan * update docs and release notes Signed-off-by: Nok * Fix tests Signed-off-by: Nok * Fix test for incorrect merge Signed-off-by: Nok --------- Signed-off-by: Nok Chan Signed-off-by: Nok --- RELEASE.md | 1 + docs/source/development/commands_reference.md | 1 + .../nodes_and_pipelines/modular_pipelines.md | 1 + kedro/framework/cli/project.py | 6 +++++- kedro/framework/session/session.py | 4 ++++ tests/framework/cli/test_cli.py | 18 +++++++++++++++++- tests/framework/session/test_session.py | 4 ++++ 7 files changed, 33 insertions(+), 2 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index f800f3c086..4429027e83 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -19,6 +19,7 @@ * Save node outputs after every `yield` before proceeding with next chunk. * Fixed incorrect parsing of Azure Data Lake Storage Gen2 URIs used in datasets. * Added support for loading credentials from environment variables using `OmegaConfigLoader`. +* Added new `--namespace` flag to `kedro run` to enable filtering by node namespace. * Added a new argument `node` for all four dataset hooks. * Added the `kedro run` flags `--nodes`, `--tags`, and `--load-versions` to replace `--node`, `--tag`, and `--load-version`. diff --git a/docs/source/development/commands_reference.md b/docs/source/development/commands_reference.md index ee17e2205c..bc2ab7233a 100644 --- a/docs/source/development/commands_reference.md +++ b/docs/source/development/commands_reference.md @@ -339,6 +339,7 @@ the names of relevant nodes, datasets, envs, etc. in your project. | [DEPRECATED] `kedro run --load-version=:YYYY-MM-DDThh.mm.ss.sssZ` | Specify a particular dataset version (timestamp) for loading.
Multiple instances allowed.
NOTE: This flag will be deprecated in `Kedro 0.19.0`. Use the following flag `--load-versions` instead. | | `kedro run --load-versions=:YYYY-MM-DDThh.mm.ss.sssZ` | Specify particular dataset versions (timestamp) for loading. | | `kedro run --pipeline=` | Run the whole pipeline by its name | +| `kedro run --namespace=` | Run only nodes with the specified namespace | | `kedro run --config=.yml` | Specify all command line options in a named YAML configuration file | | `kedro run --conf-source=` | Specify a new source directory for configuration files | | `kedro run --params=:,:` | Does a parametrised kedro run with `{"param_key1": "value1", "param_key2": 2}`. These will take precedence over parameters defined in the `conf` directory. Additionally, dot (`.`) syntax can be used to address nested keys like `parent.child:value` | diff --git a/docs/source/nodes_and_pipelines/modular_pipelines.md b/docs/source/nodes_and_pipelines/modular_pipelines.md index 77212393b8..7ee1d3a75c 100644 --- a/docs/source/nodes_and_pipelines/modular_pipelines.md +++ b/docs/source/nodes_and_pipelines/modular_pipelines.md @@ -239,6 +239,7 @@ final_pipeline = ( * Visualising the `final_pipeline` highlights how namespaces become 'super nodes' which encapsulate the wrapped pipeline. * This example demonstrates how we can reuse the same `cook_pipeline` with slightly different arguments. * Namespaces can also be arbitrarily nested with the `.` character. +* `kedro run --namespace=` could be used to only run nodes with a specific namespace. ```{note} Parameter references (`params:` and `parameters`) will not be namespaced. diff --git a/kedro/framework/cli/project.py b/kedro/framework/cli/project.py index ab93978620..912a8b5a90 100644 --- a/kedro/framework/cli/project.py +++ b/kedro/framework/cli/project.py @@ -55,6 +55,7 @@ override the loaded ones.""" PIPELINE_ARG_HELP = """Name of the registered pipeline to run. If not set, the '__default__' pipeline is run.""" +NAMESPACE_ARG_HELP = """Name of the node namespace to run.""" PARAMS_ARG_HELP = """Specify extra parameters that you want to pass to the context initialiser. Items must be separated by comma, keys - by colon or equals sign, example: param1=value1,param2=value2. Each parameter is split by the first comma, @@ -392,6 +393,7 @@ def activate_nbstripout( callback=_split_load_versions, ) @click.option("--pipeline", "-p", type=str, default=None, help=PIPELINE_ARG_HELP) +@click.option("--namespace", "-ns", type=str, default=None, help=NAMESPACE_ARG_HELP) @click.option( "--config", "-c", @@ -411,7 +413,7 @@ def activate_nbstripout( help=PARAMS_ARG_HELP, callback=_split_params, ) -# pylint: disable=too-many-arguments,unused-argument, too-many-locals +# pylint: disable=too-many-arguments,unused-argument,too-many-locals def run( tag, tags, @@ -430,6 +432,7 @@ def run( config, conf_source, params, + namespace, ): """Run the pipeline.""" @@ -459,4 +462,5 @@ def run( to_outputs=to_outputs, load_versions=load_version, pipeline_name=pipeline, + namespace=namespace, ) diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index a435ffd582..50fa264548 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -313,6 +313,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals from_inputs: Iterable[str] = None, to_outputs: Iterable[str] = None, load_versions: Dict[str, str] = None, + namespace: str = None, ) -> Dict[str, Any]: """Runs the pipeline with a specified runner. @@ -336,6 +337,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals used as an end point of the new ``Pipeline``. load_versions: An optional flag to specify a particular dataset version timestamp to load. + namespace: The namespace of the nodes that is being run. Raises: ValueError: If the named or `__default__` pipeline is not defined by `register_pipelines`. @@ -382,6 +384,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals node_names=node_names, from_inputs=from_inputs, to_outputs=to_outputs, + node_namespace=namespace, ) record_data = { @@ -398,6 +401,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals "load_versions": load_versions, "extra_params": extra_params, "pipeline_name": pipeline_name, + "namespace": namespace, "runner": getattr(runner, "__name__", str(runner)), } diff --git a/tests/framework/cli/test_cli.py b/tests/framework/cli/test_cli.py index 062f21b813..0edd7427cb 100644 --- a/tests/framework/cli/test_cli.py +++ b/tests/framework/cli/test_cli.py @@ -473,6 +473,7 @@ def test_run_successfully( to_outputs=[], load_versions={}, pipeline_name=None, + namespace=None, ) runner = fake_session.run.call_args_list[0][1]["runner"] @@ -512,6 +513,7 @@ def test_run_specific_nodes( to_outputs=[], load_versions={}, pipeline_name=None, + namespace=None, ) runner = fake_session.run.call_args_list[0][1]["runner"] @@ -551,6 +553,7 @@ def test_run_with_tags( to_outputs=[], load_versions={}, pipeline_name=None, + namespace=None, ) runner = fake_session.run.call_args_list[0][1]["runner"] @@ -562,9 +565,12 @@ def test_run_with_pipeline_filters( ): from_nodes = ["--from-nodes", "splitting_data"] to_nodes = ["--to-nodes", "training_model"] + namespace = ["--namespace", "fake_namespace"] tags = ["--tags", "de"] result = CliRunner().invoke( - fake_project_cli, ["run", *from_nodes, *to_nodes, *tags], obj=fake_metadata + fake_project_cli, + ["run", *from_nodes, *to_nodes, *tags, *namespace], + obj=fake_metadata, ) assert not result.exit_code @@ -578,6 +584,7 @@ def test_run_with_pipeline_filters( to_outputs=[], load_versions={}, pipeline_name=None, + namespace="fake_namespace", ) runner = fake_session.run.call_args_list[0][1]["runner"] @@ -601,6 +608,7 @@ def test_run_successfully_parallel( to_outputs=[], load_versions={}, pipeline_name=None, + namespace=None, ) runner = fake_session.run.call_args_list[0][1]["runner"] @@ -640,6 +648,7 @@ def test_run_with_config( to_outputs=[], load_versions={}, pipeline_name="pipeline1", + namespace=None, ) @mark.parametrize( @@ -683,6 +692,7 @@ def test_run_with_params_in_config( to_outputs=[], load_versions={}, pipeline_name="pipeline1", + namespace=None, ) mock_session_create.assert_called_once_with( env=mocker.ANY, conf_source=None, extra_params=expected @@ -782,6 +792,7 @@ def test_reformat_load_versions( to_outputs=[], load_versions={ds: t}, pipeline_name=None, + namespace=None, ) @mark.parametrize( @@ -821,6 +832,7 @@ def test_split_load_versions( to_outputs=[], load_versions=lv_dict, pipeline_name=None, + namespace=None, ) def test_fail_reformat_load_versions(self, fake_project_cli, fake_metadata): @@ -896,6 +908,7 @@ def test_safe_split_option_arguments( to_outputs=[], load_versions={}, pipeline_name=None, + namespace=None, ) def test_run_with_alternative_conf_source(self, fake_project_cli, fake_metadata): @@ -950,6 +963,7 @@ def test_both_node_flags( to_outputs=[], load_versions={}, pipeline_name=None, + namespace=None, ) def test_both_tag_flags( @@ -978,6 +992,7 @@ def test_both_tag_flags( to_outputs=[], load_versions={}, pipeline_name=None, + namespace=None, ) def test_both_load_version_flags( @@ -1006,4 +1021,5 @@ def test_both_load_version_flags( to_outputs=[], load_versions=lv_dict, pipeline_name=None, + namespace=None, ) diff --git a/tests/framework/session/test_session.py b/tests/framework/session/test_session.py index 9f6d623150..1cccce0426 100644 --- a/tests/framework/session/test_session.py +++ b/tests/framework/session/test_session.py @@ -623,6 +623,7 @@ def test_run( "load_versions": None, "extra_params": {}, "pipeline_name": fake_pipeline_name, + "namespace": None, "runner": mock_runner.__name__, } @@ -692,6 +693,7 @@ def test_run_multiple_times( # pylint: disable=too-many-locals "load_versions": None, "extra_params": {}, "pipeline_name": fake_pipeline_name, + "namespace": None, "runner": mock_runner.__name__, } @@ -779,6 +781,7 @@ def test_run_exception( # pylint: disable=too-many-locals "load_versions": None, "extra_params": {}, "pipeline_name": fake_pipeline_name, + "namespace": None, "runner": mock_runner.__name__, } @@ -846,6 +849,7 @@ def test_run_broken_pipeline_multiple_times( # pylint: disable=too-many-locals "load_versions": None, "extra_params": {}, "pipeline_name": fake_pipeline_name, + "namespace": None, "runner": broken_runner.__name__, }