Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inherit tags from nodes for modular pipelines #1878

Merged
merged 21 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package/kedro_viz/data_access/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
modular_pipelines.add_output(
current_modular_pipeline_id, output_node
)
# add tags
jitu5 marked this conversation as resolved.
Show resolved Hide resolved
if current_modular_pipeline_id is not None:
modular_pipelines.add_tags(current_modular_pipeline_id, task_node.tags)

def add_node(self, registered_pipeline_id: str, node: KedroNode) -> TaskNode:
"""Add a Kedro node as a TaskNode to the NodesRepository
Expand Down
22 changes: 22 additions & 0 deletions package/kedro_viz/data_access/repositories/modular_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""`kedro_viz.data_access.repositories.modular_pipelines`
defines repository to centralise access to modular pipelines data."""

from typing import Dict, Optional, Union

from kedro_viz.constants import ROOT_MODULAR_PIPELINE_ID
Expand Down Expand Up @@ -161,6 +162,27 @@ def add_output(self, modular_pipeline_id: str, output_node: GraphNode):
else:
self.tree[modular_pipeline_id].external_outputs.add(output_node.id)

def add_tags(self, modular_pipeline_id: str, node_tags: set):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going through the modular pipelines code and found that we update the modular_pipeline pipelines set with node.pipelines. May be we can update the tags at that point too.

Like at line 239 in this file inside extract_from_node function having something like - modular_pipeline.tags.update(node.tags). In that case we do not need this function and also do not need a call at line 203 in managers.py. What do you think ? @rashidakanchwala

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how I believe @jitu5 approached it previously.
He had some code in the extract_from_node function, which I found very confusing because the function was originally defined to extract namespaces, but now it's getting tags.
I think it's important to have clean functions that handle one task each, rather than one function doing too many things.

I'm also personally confused by the extract_from_node function being called three times here - link to GitHub code.
This is the exactly code refactor/simplification that Ivan mentioned to us the other day.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with having clean functions. If you see our add_pipeline function, it does all the below tasks -

  1. Populates the RegisteredPipelinesRepository
  2. Populates the ModularPipelinesRepository
  3. Populates the GraphNodesRepository
  4. Apart from the above tasks it also creates the taskNode, DataNode, assigns inputs/outputs to the created nodes

Now if we add updating tags of the created modularPipeline, the function add_pipeline is already complex and we are adding this bit which only belongs to modularPipeline.

Yes, extract_from_node also does more than just extracting namespace, it does the below tasks but everything here belongs to modularPipeline -

  1. Creates the modularPipeline
  2. Updates the pipelines set with the node's pipelines
  3. Adds the node as a child of the createdModularPipeline

We should probably rename this to something like add_modular_pipeline. Why I feel tags of the modular pipeline should be updated at this place, because this is where we create the modular pipeline using the node namespace and update the required attributes/properties. It is fine to have add_tags as a separate function which updates the tags set (not recursively though, how @jitu5 did initially) and call that from extract_from_node.

Also, I feel having clean functions is also about grouping tasks which belong to a particular section (in this case modularPipelines). We did this when adding resolve_dataset_factory_patterns which does the task of resolving dataset patterns in the catalog. But since this belongs to catalog we added it inside add_catalog. Again add_catalog is doing more than just adding catalog, it also adds tracking datasets (another function to refactor 🗡️ ).

Thank you

"""
Add tags to a modular pipeline.

Args:
modular_pipeline_id: ID of the modular pipeline to add the tags to.
node_tags: The tags to add to the modular pipeline.

Example:
>>> modular_pipelines = ModularPipelinesRepository()
>>> node_tags = {"tag1", "tag2"}
>>> modular_pipelines.add_tags("data_science", node_tags)
>>> data_science_pipeline = modular_pipelines.get_or_create_modular_pipeline(
... "data_science"
... )
>>> assert "tag1" in data_science_pipeline.tags
>>> assert "tag2" in data_science_pipeline.tags
"""
if modular_pipeline_id in self.tree:
self.tree[modular_pipeline_id].tags |= node_tags

def add_child(self, modular_pipeline_id: str, child: ModularPipelineChild):
"""Add a child to a modular pipeline.
Args:
Expand Down
2 changes: 2 additions & 0 deletions package/kedro_viz/services/modular_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""`kedro_viz.services.modular_pipelines` defines modular pipelines-related business logic.
The service layer consist of pure functions operating on domain models.
"""

from typing import Dict

from kedro_viz.constants import ROOT_MODULAR_PIPELINE_ID
Expand Down Expand Up @@ -78,6 +79,7 @@ def expand_tree(
)

expanded_tree[parent_id].pipelines.update(modular_pipeline_node.pipelines)
expanded_tree[parent_id].tags.update(modular_pipeline_node.tags)
expanded_tree[parent_id].children.add(
ModularPipelineChild(
id=f"{parent_id}.{chunks[i]}",
Expand Down
10 changes: 5 additions & 5 deletions package/tests/test_api/test_rest/test_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def assert_example_data(response_data):
{
"id": "uk.data_processing",
"name": "uk.data_processing",
"tags": [],
"tags": ["split"],
"pipelines": ["__default__"],
"type": "modularPipeline",
"modular_pipelines": None,
Expand All @@ -195,7 +195,7 @@ def assert_example_data(response_data):
{
"id": "uk.data_science",
"name": "uk.data_science",
"tags": [],
"tags": ["train"],
"pipelines": ["__default__"],
"type": "modularPipeline",
"modular_pipelines": None,
Expand All @@ -206,7 +206,7 @@ def assert_example_data(response_data):
{
"id": "uk",
"name": "uk",
"tags": [],
"tags": ["split", "train"],
"pipelines": ["__default__"],
"type": "modularPipeline",
"modular_pipelines": None,
Expand Down Expand Up @@ -731,7 +731,7 @@ def test_get_pipeline(self, client):
{
"id": "uk",
"name": "uk",
"tags": [],
"tags": ["train"],
"pipelines": ["data_science"],
"type": "modularPipeline",
"modular_pipelines": None,
Expand All @@ -742,7 +742,7 @@ def test_get_pipeline(self, client):
{
"id": "uk.data_science",
"name": "uk.data_science",
"tags": [],
"tags": ["train"],
"pipelines": ["data_science"],
"type": "modularPipeline",
"modular_pipelines": None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,14 @@ def test_add_output_should_raise_if_adding_non_data_node(self, identity):
modular_pipelines = ModularPipelinesRepository()
with pytest.raises(ValueError):
modular_pipelines.add_output("data_science", task_node)

def test_add_tags(self):
modular_pipelines = ModularPipelinesRepository()
node_tags = {"tag1", "tag2"}
modular_pipelines.get_or_create_modular_pipeline("data_science")
modular_pipelines.add_tags("data_science", node_tags)
data_science_pipeline = modular_pipelines.get_or_create_modular_pipeline(
"data_science"
)
assert "tag1" in data_science_pipeline.tags
assert "tag2" in data_science_pipeline.tags
Loading