Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Flowchart bug with datasets within a nested modular pipeline. #1863

Closed
wants to merge 6 commits into from
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions package/kedro_viz/data_access/repositories/modular_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""`kedro_viz.data_access.repositories.modular_pipelines`
defines repository to centralise access to modular pipelines data."""
from typing import Dict, Optional, Union
from typing import Dict, List, Optional, Union

from kedro_viz.constants import ROOT_MODULAR_PIPELINE_ID
from kedro_viz.models.flowchart import (
Expand All @@ -14,6 +14,28 @@
)


def _check_is_internal_input_output(
modular_pipeline_id: str, input_node_modular_pipeline: List
) -> Optional[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The definition is a bit confusing to me, can we have -

def _is_internal_node(
    modular_pipeline_id: str, internal_modular_pipelines: List
) -> Optional[str]:

"""Extracts the ID of the outer modular pipeline containing a nested modular pipeline.

Args:
modular_pipeline_id: The ID of the nested modular pipeline.

Returns:
The ID of the outer modular pipeline containing the nested modular pipeline, if available.

Example:
>>> _get_outer_modular_pipeline_id("namespace_modular_pipeline.modular_pipeline")
'namespace_modular_pipeline'
"""
all_modular_pipeline_ids = modular_pipeline_id.split(".")
for pipeline_id in all_modular_pipeline_ids:
if pipeline_id is not None and pipeline_id in input_node_modular_pipeline:
return True
return False


class ModularPipelinesRepository:
"""Repository for the set of modular pipelines in a registered pipeline.
Internally, the repository models the set of modular pipelines as a tree using child-references.
Expand Down Expand Up @@ -122,7 +144,10 @@ def add_input(
f"Attempt to add a non-data node as input to modular pipeline {modular_pipeline_id}"
)

is_internal_input = modular_pipeline_id in input_node.modular_pipelines
is_internal_input = _check_is_internal_input_output(
modular_pipeline_id, input_node.modular_pipelines
)

if is_internal_input:
self.tree[modular_pipeline_id].internal_inputs.add(input_node.id)
else:
Expand Down Expand Up @@ -154,8 +179,10 @@ def add_output(self, modular_pipeline_id: str, output_node: GraphNode):
raise ValueError(
f"Attempt to add a non-data node as input to modular pipeline {modular_pipeline_id}"
)
is_internal_output = _check_is_internal_input_output(
modular_pipeline_id, output_node.modular_pipelines
)

is_internal_output = modular_pipeline_id in output_node.modular_pipelines
if is_internal_output:
self.tree[modular_pipeline_id].internal_outputs.add(output_node.id)
else:
Expand Down