Skip to content

Commit

Permalink
Add new function to specify namespace for KedroSession and `kedro r…
Browse files Browse the repository at this point in the history
…un` (#2306)

* Add new namespace options for KedroSession and `kedro run`

Signed-off-by: Nok Chan <[email protected]>

* Change the shortcut from `--ns` to `-ns` to be consistent

Signed-off-by: Nok Chan <[email protected]>

* Fix lint

Signed-off-by: Nok Chan <[email protected]>

* Fix broken tests and update new test for namespace argument

Signed-off-by: Nok Chan <[email protected]>

* update docs and release notes

Signed-off-by: Nok <[email protected]>

* Fix tests

Signed-off-by: Nok <[email protected]>

* Fix test for incorrect merge

Signed-off-by: Nok <[email protected]>

---------

Signed-off-by: Nok Chan <[email protected]>
Signed-off-by: Nok <[email protected]>
  • Loading branch information
noklam authored Feb 20, 2023
1 parent d52fd50 commit 116ddd0
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 2 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Save node outputs after every `yield` before proceeding with next chunk.
* Fixed incorrect parsing of Azure Data Lake Storage Gen2 URIs used in datasets.
* Added support for loading credentials from environment variables using `OmegaConfigLoader`.
* Added new `--namespace` flag to `kedro run` to enable filtering by node namespace.
* Added a new argument `node` for all four dataset hooks.
* Added the `kedro run` flags `--nodes`, `--tags`, and `--load-versions` to replace `--node`, `--tag`, and `--load-version`.

Expand Down
1 change: 1 addition & 0 deletions docs/source/development/commands_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ the names of relevant nodes, datasets, envs, etc. in your project.
| [DEPRECATED] `kedro run --load-version=<dataset_name>:YYYY-MM-DDThh.mm.ss.sssZ` | Specify a particular dataset version (timestamp) for loading. <br /> Multiple instances allowed. <br /> NOTE: This flag will be deprecated in `Kedro 0.19.0`. Use the following flag `--load-versions` instead. |
| `kedro run --load-versions=<dataset_name>:YYYY-MM-DDThh.mm.ss.sssZ` | Specify particular dataset versions (timestamp) for loading. |
| `kedro run --pipeline=<pipeline_name>` | Run the whole pipeline by its name |
| `kedro run --namespace=<namespace>` | Run only nodes with the specified namespace |
| `kedro run --config=<config_file_name>.yml` | Specify all command line options in a named YAML configuration file |
| `kedro run --conf-source=<path_to_config_directory>` | Specify a new source directory for configuration files |
| `kedro run --params=<param_key1>:<value1>,<param_key2>:<value2>` | Does a parametrised kedro run with `{"param_key1": "value1", "param_key2": 2}`. These will take precedence over parameters defined in the `conf` directory. Additionally, dot (`.`) syntax can be used to address nested keys like `parent.child:value` |
Expand Down
1 change: 1 addition & 0 deletions docs/source/nodes_and_pipelines/modular_pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ final_pipeline = (
* Visualising the `final_pipeline` highlights how namespaces become 'super nodes' which encapsulate the wrapped pipeline.
* This example demonstrates how we can reuse the same `cook_pipeline` with slightly different arguments.
* Namespaces can also be arbitrarily nested with the `.` character.
* `kedro run --namespace=<namespace>` could be used to only run nodes with a specific namespace.

```{note}
Parameter references (`params:` and `parameters`) will not be namespaced.
Expand Down
6 changes: 5 additions & 1 deletion kedro/framework/cli/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
override the loaded ones."""
PIPELINE_ARG_HELP = """Name of the registered pipeline to run.
If not set, the '__default__' pipeline is run."""
NAMESPACE_ARG_HELP = """Name of the node namespace to run."""
PARAMS_ARG_HELP = """Specify extra parameters that you want to pass
to the context initialiser. Items must be separated by comma, keys - by colon or equals sign,
example: param1=value1,param2=value2. Each parameter is split by the first comma,
Expand Down Expand Up @@ -392,6 +393,7 @@ def activate_nbstripout(
callback=_split_load_versions,
)
@click.option("--pipeline", "-p", type=str, default=None, help=PIPELINE_ARG_HELP)
@click.option("--namespace", "-ns", type=str, default=None, help=NAMESPACE_ARG_HELP)
@click.option(
"--config",
"-c",
Expand All @@ -411,7 +413,7 @@ def activate_nbstripout(
help=PARAMS_ARG_HELP,
callback=_split_params,
)
# pylint: disable=too-many-arguments,unused-argument, too-many-locals
# pylint: disable=too-many-arguments,unused-argument,too-many-locals
def run(
tag,
tags,
Expand All @@ -430,6 +432,7 @@ def run(
config,
conf_source,
params,
namespace,
):
"""Run the pipeline."""

Expand Down Expand Up @@ -459,4 +462,5 @@ def run(
to_outputs=to_outputs,
load_versions=load_version,
pipeline_name=pipeline,
namespace=namespace,
)
4 changes: 4 additions & 0 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
from_inputs: Iterable[str] = None,
to_outputs: Iterable[str] = None,
load_versions: Dict[str, str] = None,
namespace: str = None,
) -> Dict[str, Any]:
"""Runs the pipeline with a specified runner.
Expand All @@ -336,6 +337,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
used as an end point of the new ``Pipeline``.
load_versions: An optional flag to specify a particular dataset
version timestamp to load.
namespace: The namespace of the nodes that is being run.
Raises:
ValueError: If the named or `__default__` pipeline is not
defined by `register_pipelines`.
Expand Down Expand Up @@ -382,6 +384,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
node_names=node_names,
from_inputs=from_inputs,
to_outputs=to_outputs,
node_namespace=namespace,
)

record_data = {
Expand All @@ -398,6 +401,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
"load_versions": load_versions,
"extra_params": extra_params,
"pipeline_name": pipeline_name,
"namespace": namespace,
"runner": getattr(runner, "__name__", str(runner)),
}

Expand Down
18 changes: 17 additions & 1 deletion tests/framework/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ def test_run_successfully(
to_outputs=[],
load_versions={},
pipeline_name=None,
namespace=None,
)

runner = fake_session.run.call_args_list[0][1]["runner"]
Expand Down Expand Up @@ -512,6 +513,7 @@ def test_run_specific_nodes(
to_outputs=[],
load_versions={},
pipeline_name=None,
namespace=None,
)

runner = fake_session.run.call_args_list[0][1]["runner"]
Expand Down Expand Up @@ -551,6 +553,7 @@ def test_run_with_tags(
to_outputs=[],
load_versions={},
pipeline_name=None,
namespace=None,
)

runner = fake_session.run.call_args_list[0][1]["runner"]
Expand All @@ -562,9 +565,12 @@ def test_run_with_pipeline_filters(
):
from_nodes = ["--from-nodes", "splitting_data"]
to_nodes = ["--to-nodes", "training_model"]
namespace = ["--namespace", "fake_namespace"]
tags = ["--tags", "de"]
result = CliRunner().invoke(
fake_project_cli, ["run", *from_nodes, *to_nodes, *tags], obj=fake_metadata
fake_project_cli,
["run", *from_nodes, *to_nodes, *tags, *namespace],
obj=fake_metadata,
)
assert not result.exit_code

Expand All @@ -578,6 +584,7 @@ def test_run_with_pipeline_filters(
to_outputs=[],
load_versions={},
pipeline_name=None,
namespace="fake_namespace",
)

runner = fake_session.run.call_args_list[0][1]["runner"]
Expand All @@ -601,6 +608,7 @@ def test_run_successfully_parallel(
to_outputs=[],
load_versions={},
pipeline_name=None,
namespace=None,
)

runner = fake_session.run.call_args_list[0][1]["runner"]
Expand Down Expand Up @@ -640,6 +648,7 @@ def test_run_with_config(
to_outputs=[],
load_versions={},
pipeline_name="pipeline1",
namespace=None,
)

@mark.parametrize(
Expand Down Expand Up @@ -683,6 +692,7 @@ def test_run_with_params_in_config(
to_outputs=[],
load_versions={},
pipeline_name="pipeline1",
namespace=None,
)
mock_session_create.assert_called_once_with(
env=mocker.ANY, conf_source=None, extra_params=expected
Expand Down Expand Up @@ -782,6 +792,7 @@ def test_reformat_load_versions(
to_outputs=[],
load_versions={ds: t},
pipeline_name=None,
namespace=None,
)

@mark.parametrize(
Expand Down Expand Up @@ -821,6 +832,7 @@ def test_split_load_versions(
to_outputs=[],
load_versions=lv_dict,
pipeline_name=None,
namespace=None,
)

def test_fail_reformat_load_versions(self, fake_project_cli, fake_metadata):
Expand Down Expand Up @@ -896,6 +908,7 @@ def test_safe_split_option_arguments(
to_outputs=[],
load_versions={},
pipeline_name=None,
namespace=None,
)

def test_run_with_alternative_conf_source(self, fake_project_cli, fake_metadata):
Expand Down Expand Up @@ -950,6 +963,7 @@ def test_both_node_flags(
to_outputs=[],
load_versions={},
pipeline_name=None,
namespace=None,
)

def test_both_tag_flags(
Expand Down Expand Up @@ -978,6 +992,7 @@ def test_both_tag_flags(
to_outputs=[],
load_versions={},
pipeline_name=None,
namespace=None,
)

def test_both_load_version_flags(
Expand Down Expand Up @@ -1006,4 +1021,5 @@ def test_both_load_version_flags(
to_outputs=[],
load_versions=lv_dict,
pipeline_name=None,
namespace=None,
)
4 changes: 4 additions & 0 deletions tests/framework/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ def test_run(
"load_versions": None,
"extra_params": {},
"pipeline_name": fake_pipeline_name,
"namespace": None,
"runner": mock_runner.__name__,
}

Expand Down Expand Up @@ -692,6 +693,7 @@ def test_run_multiple_times( # pylint: disable=too-many-locals
"load_versions": None,
"extra_params": {},
"pipeline_name": fake_pipeline_name,
"namespace": None,
"runner": mock_runner.__name__,
}

Expand Down Expand Up @@ -779,6 +781,7 @@ def test_run_exception( # pylint: disable=too-many-locals
"load_versions": None,
"extra_params": {},
"pipeline_name": fake_pipeline_name,
"namespace": None,
"runner": mock_runner.__name__,
}

Expand Down Expand Up @@ -846,6 +849,7 @@ def test_run_broken_pipeline_multiple_times( # pylint: disable=too-many-locals
"load_versions": None,
"extra_params": {},
"pipeline_name": fake_pipeline_name,
"namespace": None,
"runner": broken_runner.__name__,
}

Expand Down

0 comments on commit 116ddd0

Please sign in to comment.