From 99b84e4ac501efcf946cec66ba59157e661a1e84 Mon Sep 17 00:00:00 2001 From: Ravi Kumar Pilla Date: Tue, 19 Dec 2023 03:45:18 -0600 Subject: [PATCH] Remove dataset factory discovery (#1688) Dataset Factory Pattern discovery was introduced to discover datasets (mostly Tracking datasets used in Experiment Tracking) before we populate kedro viz data repositories Due to this discovery, datasets that users do not have access to, are either timed-out or raise exceptions. This causes Kedro Viz to timeout or fail. This PR removes the dataset factory pattern discovery implementation as a temporary fix. This restricts users from using Dataset Factory Patterns for Experiment Tracking --- package/kedro_viz/data_access/managers.py | 33 +---- package/kedro_viz/server.py | 2 +- .../test_api/test_graphql/test_queries.py | 27 +--- .../tests/test_data_access/test_managers.py | 118 +++++------------- package/tests/test_server.py | 8 +- 5 files changed, 42 insertions(+), 146 deletions(-) diff --git a/package/kedro_viz/data_access/managers.py b/package/kedro_viz/data_access/managers.py index cadb05a737..8835e4fe05 100644 --- a/package/kedro_viz/data_access/managers.py +++ b/package/kedro_viz/data_access/managers.py @@ -69,40 +69,17 @@ def set_db_session(self, db_session_class: sessionmaker): """Set db session on repositories that need it.""" self.runs.set_db_session(db_session_class) - def resolve_dataset_factory_patterns( - self, catalog: DataCatalog, pipelines: Dict[str, KedroPipeline] - ): - """Resolve dataset factory patterns in data catalog by matching - them against the datasets in the pipelines. - """ - for pipeline in pipelines.values(): - if hasattr(pipeline, "data_sets"): - # Support for Kedro 0.18.x - datasets = pipeline.data_sets() - else: - datasets = pipeline.datasets() - - for dataset_name in datasets: - try: - catalog.exists(dataset_name) - # pylint: disable=broad-except - except Exception as exc: # pragma: no cover - logger.warning( - "'%s' does not exist. Full exception: %s: %s", - dataset_name, - type(exc).__name__, - exc, - ) - - def add_catalog(self, catalog: DataCatalog, pipelines: Dict[str, KedroPipeline]): + def add_catalog(self, catalog: DataCatalog): """Resolve dataset factory patterns, add the catalog to the CatalogRepository and relevant tracking datasets to TrackingDatasetRepository. Args: catalog: The DataCatalog instance to add. - pipelines: A dictionary which holds project pipelines """ - self.resolve_dataset_factory_patterns(catalog, pipelines) + + # TODO: Implement dataset factory pattern discovery for + # experiment tracking datasets. + self.catalog.set_catalog(catalog) for dataset_name, dataset in self.catalog.as_dict().items(): diff --git a/package/kedro_viz/server.py b/package/kedro_viz/server.py index 54a5dd7cc2..65d0c0d551 100644 --- a/package/kedro_viz/server.py +++ b/package/kedro_viz/server.py @@ -38,7 +38,7 @@ def populate_data( session_class = make_db_session_factory(session_store.location) data_access_manager.set_db_session(session_class) - data_access_manager.add_catalog(catalog, pipelines) + data_access_manager.add_catalog(catalog) # add dataset stats before adding pipelines as the data nodes # need stats information and they are created during add_pipelines diff --git a/package/tests/test_api/test_graphql/test_queries.py b/package/tests/test_api/test_graphql/test_queries.py index 16cfd36ae4..6367ebb7e5 100644 --- a/package/tests/test_api/test_graphql/test_queries.py +++ b/package/tests/test_api/test_graphql/test_queries.py @@ -68,11 +68,8 @@ def test_run_tracking_data_query( client, example_tracking_catalog, data_access_manager_with_runs, - example_pipelines, ): - data_access_manager_with_runs.add_catalog( - example_tracking_catalog, example_pipelines - ) + data_access_manager_with_runs.add_catalog(example_tracking_catalog) example_run_id = example_run_ids[0] response = client.post( @@ -173,15 +170,9 @@ def test_run_tracking_data_query( assert response.json() == expected_response def test_metrics_data( - self, - client, - example_tracking_catalog, - data_access_manager_with_runs, - example_pipelines, + self, client, example_tracking_catalog, data_access_manager_with_runs ): - data_access_manager_with_runs.add_catalog( - example_tracking_catalog, example_pipelines - ) + data_access_manager_with_runs.add_catalog(example_tracking_catalog) response = client.post( "/graphql", @@ -295,11 +286,8 @@ def test_graphql_run_tracking_data( data_access_manager_with_runs, show_diff, expected_response, - example_pipelines, ): - data_access_manager_with_runs.add_catalog( - example_multiple_run_tracking_catalog, example_pipelines - ) + data_access_manager_with_runs.add_catalog(example_multiple_run_tracking_catalog) response = client.post( "/graphql", @@ -355,11 +343,9 @@ def test_graphql_run_tracking_data_at_least_one_empty_run( data_access_manager_with_runs, show_diff, expected_response, - example_pipelines, ): data_access_manager_with_runs.add_catalog( - example_multiple_run_tracking_catalog_at_least_one_empty_run, - example_pipelines, + example_multiple_run_tracking_catalog_at_least_one_empty_run ) response = client.post( @@ -393,10 +379,9 @@ def test_graphql_run_tracking_data_all_empty_runs( data_access_manager_with_runs, show_diff, expected_response, - example_pipelines, ): data_access_manager_with_runs.add_catalog( - example_multiple_run_tracking_catalog_all_empty_runs, example_pipelines + example_multiple_run_tracking_catalog_all_empty_runs ) response = client.post( diff --git a/package/tests/test_data_access/test_managers.py b/package/tests/test_data_access/test_managers.py index c81f9819f8..ce08fa56e1 100644 --- a/package/tests/test_data_access/test_managers.py +++ b/package/tests/test_data_access/test_managers.py @@ -9,7 +9,6 @@ from kedro_viz.constants import DEFAULT_REGISTERED_PIPELINE_ID, ROOT_MODULAR_PIPELINE_ID from kedro_viz.data_access.managers import DataAccessManager -from kedro_viz.data_access.repositories.catalog import CatalogRepository from kedro_viz.models.flowchart import ( DataNode, GraphEdge, @@ -25,14 +24,10 @@ def identity(x): class TestAddCatalog: - def test_add_catalog( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], - ): + def test_add_catalog(self, data_access_manager: DataAccessManager): dataset = CSVDataset(filepath="dataset.csv") catalog = DataCatalog(datasets={"dataset": dataset}) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) assert data_access_manager.catalog.get_catalog() is catalog @@ -70,11 +65,7 @@ def test_add_node_with_modular_pipeline( "uk.data_science.modular_pipeline", ] - def test_add_node_input( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], - ): + def test_add_node_input(self, data_access_manager: DataAccessManager): dataset = CSVDataset(filepath="dataset.csv") dataset_name = "x" registered_pipeline_id = "my_pipeline" @@ -89,7 +80,7 @@ def test_add_node_input( catalog = DataCatalog( datasets={dataset_name: dataset}, ) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) data_access_manager.add_dataset(registered_pipeline_id, dataset_name) data_node = data_access_manager.add_node_input( registered_pipeline_id, dataset_name, task_node @@ -113,15 +104,11 @@ def test_add_node_input( } } - def test_add_parameters_as_node_input( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], - ): + def test_add_parameters_as_node_input(self, data_access_manager: DataAccessManager): parameters = {"train_test_split": 0.1, "num_epochs": 1000} catalog = DataCatalog() catalog.add_feed_dict({"parameters": parameters}) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) registered_pipeline_id = "my_pipeline" kedro_node = node(identity, inputs="parameters", outputs="output") task_node = data_access_manager.add_node(registered_pipeline_id, kedro_node) @@ -132,13 +119,11 @@ def test_add_parameters_as_node_input( assert task_node.parameters == parameters def test_add_single_parameter_as_node_input( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], + self, data_access_manager: DataAccessManager ): catalog = DataCatalog() catalog.add_feed_dict({"params:train_test_split": 0.1}) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) registered_pipeline_id = "my_pipeline" kedro_node = node(identity, inputs="params:train_test_split", outputs="output") task_node = data_access_manager.add_node(registered_pipeline_id, kedro_node) @@ -151,12 +136,11 @@ def test_add_single_parameter_as_node_input( def test_parameters_yaml_namespace_not_added_to_modular_pipelines( self, data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], ): parameter_name = "params:uk.data_science.train_test_split.ratio" catalog = DataCatalog() catalog.add_feed_dict({parameter_name: 0.1}) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) registered_pipeline_id = "my_pipeline" kedro_node = node( identity, @@ -176,11 +160,7 @@ def test_parameters_yaml_namespace_not_added_to_modular_pipelines( # make sure parameters YAML namespace not accidentally added to the modular pipeline tree assert "uk.data_science.train_test_split" not in modular_pipelines_tree - def test_add_node_output( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], - ): + def test_add_node_output(self, data_access_manager: DataAccessManager): dataset = CSVDataset(filepath="dataset.csv") registered_pipeline_id = "my_pipeline" dataset_name = "x" @@ -195,7 +175,7 @@ def test_add_node_output( catalog = DataCatalog( datasets={dataset_name: dataset}, ) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) data_access_manager.add_dataset(registered_pipeline_id, dataset_name) data_node = data_access_manager.add_node_output( registered_pipeline_id, dataset_name, task_node @@ -220,15 +200,11 @@ def test_add_node_output( class TestAddDataset: - def test_add_dataset( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], - ): + def test_add_dataset(self, data_access_manager: DataAccessManager): dataset = CSVDataset(filepath="dataset.csv") dataset_name = "x" catalog = DataCatalog(datasets={dataset_name: dataset}) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) data_access_manager.add_dataset("my_pipeline", dataset_name) # dataset should be added as a graph node @@ -241,12 +217,10 @@ def test_add_dataset( assert not graph_node.modular_pipelines def test_add_memory_dataset_when_dataset_not_in_catalog( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], + self, data_access_manager: DataAccessManager ): catalog = DataCatalog() - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) data_access_manager.add_dataset("my_pipeline", "memory_dataset") # dataset should be added as a graph node nodes_list = data_access_manager.nodes.as_list() @@ -256,16 +230,14 @@ def test_add_memory_dataset_when_dataset_not_in_catalog( assert isinstance(graph_node.kedro_obj, MemoryDataset) def test_add_dataset_with_modular_pipeline( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], + self, data_access_manager: DataAccessManager ): dataset = CSVDataset(filepath="dataset.csv") dataset_name = "uk.data_science.x" catalog = DataCatalog( datasets={dataset_name: dataset}, ) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) data_access_manager.add_dataset("my_pipeline", dataset_name) nodes_list = data_access_manager.nodes.as_list() graph_node: DataNode = nodes_list[0] @@ -274,16 +246,12 @@ def test_add_dataset_with_modular_pipeline( "uk.data_science", ] - def test_add_all_parameters( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], - ): + def test_add_all_parameters(self, data_access_manager: DataAccessManager): catalog = DataCatalog() catalog.add_feed_dict( {"parameters": {"train_test_split": 0.1, "num_epochs": 1000}} ) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) data_access_manager.add_dataset("my_pipeline", "parameters") nodes_list = data_access_manager.nodes.as_list() @@ -296,14 +264,10 @@ def test_add_all_parameters( "num_epochs": 1000, } - def test_add_single_parameter( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], - ): + def test_add_single_parameter(self, data_access_manager: DataAccessManager): catalog = DataCatalog() catalog.add_feed_dict({"params:train_test_split": 0.1}) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) data_access_manager.add_dataset("my_pipeline", "params:train_test_split") nodes_list = data_access_manager.nodes.as_list() assert len(nodes_list) == 1 @@ -313,13 +277,11 @@ def test_add_single_parameter( assert graph_node.parameter_value == 0.1 def test_add_dataset_with_params_prefix( - self, - data_access_manager: DataAccessManager, - example_pipelines: Dict[str, Pipeline], + self, data_access_manager: DataAccessManager ): catalog = DataCatalog() catalog.add_feed_dict({"params_train_test_split": 0.1}) - data_access_manager.add_catalog(catalog, example_pipelines) + data_access_manager.add_catalog(catalog) data_access_manager.add_dataset("my_pipeline", "params_train_test_split") nodes_list = data_access_manager.nodes.as_list() assert len(nodes_list) == 1 @@ -335,7 +297,7 @@ def test_add_pipelines( example_pipelines: Dict[str, Pipeline], example_catalog: DataCatalog, ): - data_access_manager.add_catalog(example_catalog, example_pipelines) + data_access_manager.add_catalog(example_catalog) data_access_manager.add_pipelines(example_pipelines) assert [p.id for p in data_access_manager.registered_pipelines.as_list()] == [ @@ -381,9 +343,7 @@ def test_add_pipelines_with_transcoded_data( example_transcoded_pipelines: Dict[str, Pipeline], example_transcoded_catalog: DataCatalog, ): - data_access_manager.add_catalog( - example_transcoded_catalog, example_transcoded_pipelines - ) + data_access_manager.add_catalog(example_transcoded_catalog) data_access_manager.add_pipelines(example_transcoded_pipelines) assert any( isinstance(node, TranscodedDataNode) @@ -405,7 +365,7 @@ def test_different_reigstered_pipelines_having_modular_pipeline_with_same_name( ), } - data_access_manager.add_catalog(DataCatalog(), registered_pipelines) + data_access_manager.add_catalog(DataCatalog()) data_access_manager.add_pipelines(registered_pipelines) modular_pipeline_tree = ( data_access_manager.create_modular_pipelines_tree_for_registered_pipeline( @@ -420,7 +380,7 @@ def test_get_default_selected_pipelines_without_default( example_pipelines: Dict[str, Pipeline], example_catalog: DataCatalog, ): - data_access_manager.add_catalog(example_catalog, example_pipelines) + data_access_manager.add_catalog(example_catalog) del example_pipelines[DEFAULT_REGISTERED_PIPELINE_ID] data_access_manager.add_pipelines(example_pipelines) assert not data_access_manager.registered_pipelines.get_pipeline_by_id( @@ -475,7 +435,7 @@ def test_add_pipelines_with_circular_modular_pipelines( registered_pipelines = { "__default__": internal + external, } - data_access_manager.add_catalog(DataCatalog(), registered_pipelines) + data_access_manager.add_catalog(DataCatalog()) data_access_manager.add_pipelines(registered_pipelines) data_access_manager.create_modular_pipelines_tree_for_registered_pipeline( DEFAULT_REGISTERED_PIPELINE_ID @@ -494,25 +454,3 @@ def test_add_pipelines_with_circular_modular_pipelines( digraph.add_edge(edge.source, edge.target) with pytest.raises(nx.NetworkXNoCycle): nx.find_cycle(digraph) - - -class TestResolveDatasetFactoryPatterns: - def test_resolve_dataset_factory_patterns( - self, - example_catalog, - pipeline_with_datasets_mock, - pipeline_with_data_sets_mock, - data_access_manager: DataAccessManager, - ): - pipelines = { - "pipeline1": pipeline_with_datasets_mock, - "pipeline2": pipeline_with_data_sets_mock, - } - new_catalog = CatalogRepository() - new_catalog.set_catalog(example_catalog) - - assert "model_inputs#csv" not in new_catalog.as_dict().keys() - - data_access_manager.resolve_dataset_factory_patterns(example_catalog, pipelines) - - assert "model_inputs#csv" in new_catalog.as_dict().keys() diff --git a/package/tests/test_server.py b/package/tests/test_server.py index 4dc25b0e84..2e5f03740e 100644 --- a/package/tests/test_server.py +++ b/package/tests/test_server.py @@ -70,9 +70,7 @@ def test_run_server_from_project( example_pipelines, ): run_server() - patched_data_access_manager.add_catalog.assert_called_once_with( - example_catalog, example_pipelines - ) + patched_data_access_manager.add_catalog.assert_called_once_with(example_catalog) patched_data_access_manager.add_pipelines.assert_called_once_with( example_pipelines ) @@ -95,9 +93,7 @@ def test_run_server_from_project_with_sqlite_store( ): run_server() # assert that when running server, data are added correctly to the data access manager - patched_data_access_manager.add_catalog.assert_called_once_with( - example_catalog, example_pipelines - ) + patched_data_access_manager.add_catalog.assert_called_once_with(example_catalog) patched_data_access_manager.add_pipelines.assert_called_once_with( example_pipelines )