From 8c3ceb7f1b274daa8de9e842789beb21dfeae6d7 Mon Sep 17 00:00:00 2001 From: Ben Dye Date: Fri, 11 Aug 2023 12:11:07 -0400 Subject: [PATCH] Refactor Neo4jProxy table owners query for easier customization (#2182) * refactor _exec_owners_query and get_table Signed-off-by: Ben Dye * remove prior owners logic and references Signed-off-by: Ben Dye * WIP Signed-off-by: Ben Dye * update tests Signed-off-by: Ben Dye * Add comment explaining testing-mock logic Signed-off-by: Ben Dye * Cleanups Signed-off-by: Ben Dye * Bump version Signed-off-by: Ben Dye --------- Signed-off-by: Ben Dye --- .../metadata_service/proxy/neo4j_proxy.py | 39 ++++++++++++------- metadata/setup.py | 2 +- metadata/tests/unit/proxy/test_neo4j_proxy.py | 20 ++++++---- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/metadata/metadata_service/proxy/neo4j_proxy.py b/metadata/metadata_service/proxy/neo4j_proxy.py index 17882e1563..abfad1be7c 100644 --- a/metadata/metadata_service/proxy/neo4j_proxy.py +++ b/metadata/metadata_service/proxy/neo4j_proxy.py @@ -159,8 +159,9 @@ def get_table(self, *, table_uri: str) -> Table: cols, last_neo4j_record = self._exec_col_query(table_uri) readers = self._exec_usage_query(table_uri) + owners = self._exec_owners_query(table_uri) - wmk_results, table_writer, table_apps, timestamp_value, owners, tags, source, \ + wmk_results, table_writer, table_apps, timestamp_value, tags, source, \ badges, prog_descs, resource_reports = self._exec_table_query(table_uri) joins, filters = self._exec_table_query_query(table_uri) @@ -340,14 +341,34 @@ def _exec_usage_query(self, table_uri: str) -> List[Reader]: return readers + @timer_with_counter + def _exec_owners_query(self, table_uri: str) -> List[User]: + # Return Value: List[User] + owners_query = textwrap.dedent(""" + MATCH (owner:User)<-[:OWNER]-(tbl:Table {key: $tbl_key}) + RETURN collect(distinct owner) as owner_records + """) + owners_neo4j_records = self._execute_cypher_query(statement=owners_query, + param_dict={'tbl_key': table_uri}) + + owners_neo4j_records = get_single_record(owners_neo4j_records) + + owners = [] # type: List[User] + for owner_neo4j_record in owners_neo4j_records.get('owner_records', []): + owner_data = self._get_user_details(user_id=owner_neo4j_record['email']) + owner = self._build_user_from_record(record=owner_data) + owners.append(owner) + + return owners + @timer_with_counter def _exec_table_query(self, table_uri: str) -> Tuple: """ Queries one Cypher record with watermark list, Application, - ,timestamp, owner records and tag records. + ,timestamp, and tag records. """ - # Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, owner records, tag records) + # Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, tag records) table_level_query = textwrap.dedent("""\ MATCH (tbl:Table {key: $tbl_key}) @@ -355,7 +376,6 @@ def _exec_table_query(self, table_uri: str) -> Tuple: OPTIONAL MATCH (app_producer:Application)-[:GENERATES]->(tbl) OPTIONAL MATCH (app_consumer:Application)-[:CONSUMES]->(tbl) OPTIONAL MATCH (tbl)-[:LAST_UPDATED_AT]->(t:Timestamp) - OPTIONAL MATCH (owner:User)<-[:OWNER]-(tbl) OPTIONAL MATCH (tbl)-[:TAGGED_BY]->(tag:Tag{tag_type: $tag_normal_type}) OPTIONAL MATCH (tbl)-[:HAS_BADGE]->(badge:Badge) OPTIONAL MATCH (tbl)-[:SOURCE]->(src:Source) @@ -365,7 +385,6 @@ def _exec_table_query(self, table_uri: str) -> Tuple: collect(distinct app_producer) as producing_apps, collect(distinct app_consumer) as consuming_apps, t.last_updated_timestamp as last_updated_timestamp, - collect(distinct owner) as owner_records, collect(distinct tag) as tag_records, collect(distinct badge) as badge_records, src, @@ -405,12 +424,6 @@ def _exec_table_query(self, table_uri: str) -> Tuple: timestamp_value = table_records['last_updated_timestamp'] - owner_record = [] - - for owner in table_records.get('owner_records', []): - owner_data = self._get_user_details(user_id=owner['email']) - owner_record.append(self._build_user_from_record(record=owner_data)) - src = None if table_records['src']: @@ -423,7 +436,7 @@ def _exec_table_query(self, table_uri: str) -> Tuple: resource_reports = self._extract_resource_reports_from_query(table_records.get('resource_reports', [])) - return wmk_results, table_writer, table_apps, timestamp_value, owner_record,\ + return wmk_results, table_writer, table_apps, timestamp_value,\ tags, src, badges, prog_descriptions, resource_reports @timer_with_counter @@ -434,7 +447,7 @@ def _exec_table_query_query(self, table_uri: str) -> Tuple: on the table. """ - # Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, owner records, tag records) + # Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, tag records) table_query_level_query = textwrap.dedent(""" MATCH (tbl:Table {key: $tbl_key}) OPTIONAL MATCH (tbl)-[:COLUMN]->(col:Column)-[COLUMN_JOINS_WITH]->(j:Join) diff --git a/metadata/setup.py b/metadata/setup.py index 717fb6eeb0..e2776310f6 100644 --- a/metadata/setup.py +++ b/metadata/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -__version__ = '3.12.1' +__version__ = '3.12.2' requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') with open(requirements_path) as requirements_file: diff --git a/metadata/tests/unit/proxy/test_neo4j_proxy.py b/metadata/tests/unit/proxy/test_neo4j_proxy.py index 6144b096c4..d7f6207f5d 100644 --- a/metadata/tests/unit/proxy/test_neo4j_proxy.py +++ b/metadata/tests/unit/proxy/test_neo4j_proxy.py @@ -171,13 +171,6 @@ def setUp(self) -> None: } ], 'last_updated_timestamp': 1, - 'owner_records': [ - { - 'key': 'tester@example.com', - 'email': 'tester@example.com', - 'updated_at': 0, - } - ], 'tag_records': [ { 'key': 'test', @@ -236,6 +229,14 @@ def setUp(self) -> None: ] }] + owners_results = [{'owner_records': [ + { + 'key': 'tester@example.com', + 'email': 'tester@example.com', + 'updated_at': 0, + } + ], }] + last_updated_timestamp = '01' self.col_usage_return_value = [ @@ -250,6 +251,8 @@ def setUp(self) -> None: self.table_common_usage = table_common_usage + self.owners_return_value = owners_results + self.col_bar_id_1_expected_type_metadata = self._get_col_bar_id_1_expected_type_metadata() self.col_bar_id_2_expected_type_metadata = self._get_col_bar_id_2_expected_type_metadata() @@ -355,9 +358,11 @@ def test_health_neo4j(self) -> None: def test_get_table(self) -> None: with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute: + # mock database return values such that we match ordering of queries executed in Neo4jProxy.get_table mock_execute.side_effect = [ self.col_usage_return_value, [], + self.owners_return_value, self.table_level_return_value, self.table_common_usage, [] @@ -445,6 +450,7 @@ def test_get_table_view_only(self) -> None: mock_execute.side_effect = [ col_usage_return_value, [], + self.owners_return_value, self.table_level_return_value, self.table_common_usage, []