Skip to content

Commit

Permalink
feat: added addition fields config to publisher (#1898)
Browse files Browse the repository at this point in the history
* feat: added optional published_by publishing metadata to nodes

Signed-off-by: Allison Suarez Miranda <[email protected]>

* bumped to 6.9.0

Signed-off-by: Allison Suarez Miranda <[email protected]>

* set default value for publish by to None

Signed-off-by: Allison Suarez Miranda <[email protected]>

* added quote around published by value

Signed-off-by: Allison Suarez Miranda <[email protected]>

* made publisher accept a dict of fields through config

Signed-off-by: Allison Suarez Miranda <[email protected]>

* autopep

Signed-off-by: Allison Suarez Miranda <[email protected]>

* lint

Signed-off-by: Allison Suarez Miranda <[email protected]>

* r in wrong place

Signed-off-by: Allison Suarez Miranda <[email protected]>

* morel inting

Signed-off-by: Allison Suarez Miranda <[email protected]>

* rerun checks

* flake

Signed-off-by: Allison Suarez Miranda <[email protected]>

* isort

Signed-off-by: Allison Suarez Miranda <[email protected]>
  • Loading branch information
allisonsuarez authored Jun 23, 2022
1 parent a2a082a commit 75b858e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 19 deletions.
49 changes: 36 additions & 13 deletions databuilder/databuilder/publisher/neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from io import open
from os import listdir
from os.path import isfile, join
from typing import List, Set
from typing import (
Dict, List, Set,
)

import neo4j
import pandas
Expand Down Expand Up @@ -49,21 +51,27 @@

NEO4J_USER = 'neo4j_user'
NEO4J_PASSWORD = 'neo4j_password'
# NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting
NEO4J_ENCRYPTED = 'neo4j_encrypted'
"""NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting."""
# NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS
# cert against system CAs
NEO4J_VALIDATE_SSL = 'neo4j_validate_ssl'
"""NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS cert against system CAs."""


# This will be used to provide unique tag to the node and relationship
JOB_PUBLISH_TAG = 'job_publish_tag'

# any additional fields that should be added to nodes and rels through config
ADDITIONAL_FIELDS = 'additional_fields'

# Neo4j property name for published tag
PUBLISHED_TAG_PROPERTY_NAME = 'published_tag'

# Neo4j property name for last updated timestamp
LAST_UPDATED_EPOCH_MS = 'publisher_last_updated_epoch_ms'

# A boolean flag to indicate if publisher_metadata (e.g. published_tag, publisher_last_updated_epoch_ms)
# A boolean flag to indicate if publisher_metadata (e.g. published_tag,
# publisher_last_updated_epoch_ms)
# will be included as properties of the Neo4j nodes
ADD_PUBLISHER_METADATA = 'add_publisher_metadata'

Expand Down Expand Up @@ -103,6 +111,7 @@
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
NEO4J_ENCRYPTED: True,
NEO4J_VALIDATE_SSL: False,
ADDITIONAL_FIELDS: {},
ADD_PUBLISHER_METADATA: True,
RELATION_PREPROCESSOR: NoopRelationPreprocessor()})

Expand All @@ -116,7 +125,8 @@
class Neo4jCsvPublisher(Publisher):
"""
A Publisher takes two folders for input and publishes to Neo4j.
One folder will contain CSV file(s) for Node where the other folder will contain CSV file(s) for Relationship.
One folder will contain CSV file(s) for Node where the other folder will contain CSV
file(s) for Relationship.
Neo4j follows Label Node properties Graph and more information about this is in:
https://neo4j.com/docs/developer-manual/current/introduction/graphdb-concepts/
Expand Down Expand Up @@ -156,13 +166,16 @@ def init(self, conf: ConfigTree) -> None:
self.deadlock_node_labels = set(conf.get_list(NEO4J_DEADLOCK_NODE_LABELS, default=[]))
self.labels: Set[str] = set()
self.publish_tag: str = conf.get_string(JOB_PUBLISH_TAG)
self.additional_fields: Dict = conf.get(ADDITIONAL_FIELDS)
self.add_publisher_metadata: bool = conf.get_bool(ADD_PUBLISHER_METADATA)
if self.add_publisher_metadata and not self.publish_tag:
raise Exception(f'{JOB_PUBLISH_TAG} should not be empty')

self._relation_preprocessor = conf.get(RELATION_PREPROCESSOR)

LOGGER.info('Publishing Node csv files %s, and Relation CSV files %s', self._node_files, self._relation_files)
LOGGER.info('Publishing Node csv files %s, and Relation CSV files %s',
self._node_files,
self._relation_files)

def _list_files(self, conf: ConfigTree, path_key: str) -> List[str]:
"""
Expand Down Expand Up @@ -230,7 +243,8 @@ def _create_indices(self, node_file: str) -> None:
LOGGER.info('Creating indices. (Existing indices will be ignored)')

with open(node_file, 'r', encoding='utf8') as node_csv:
for node_record in pandas.read_csv(node_csv, na_filter=False).to_dict(orient='records'):
for node_record in pandas.read_csv(node_csv,
na_filter=False).to_dict(orient='records'):
label = node_record[NODE_LABEL_KEY]
if label not in self.labels:
self._try_create_index(label)
Expand All @@ -240,9 +254,10 @@ def _create_indices(self, node_file: str) -> None:

def _publish_node(self, node_file: str, tx: Transaction) -> Transaction:
"""
Iterate over the csv records of a file, each csv record transform to Merge statement and will be executed.
All nodes should have a unique key, and this method will try to create unique index on the LABEL when it sees
first time within a job scope.
Iterate over the csv records of a file, each csv record transform to Merge statement
and will be executed.
All nodes should have a unique key, and this method will try to create unique index on
the LABEL when it sees first time within a job scope.
Example of Cypher query executed by this method:
MERGE (col_test_id1:Column {key: 'presto://gold.test_schema1/test_table1/test_id1'})
ON CREATE SET col_test_id1.name = 'test_id1',
Expand All @@ -257,7 +272,8 @@ def _publish_node(self, node_file: str, tx: Transaction) -> Transaction:
"""

with open(node_file, 'r', encoding='utf8') as node_csv:
for node_record in pandas.read_csv(node_csv, na_filter=False).to_dict(orient="records"):
for node_record in pandas.read_csv(node_csv,
na_filter=False).to_dict(orient="records"):
stmt = self.create_node_merge_statement(node_record=node_record)
params = self._create_props_param(node_record)
tx = self._execute_statement(stmt, tx, params)
Expand Down Expand Up @@ -312,7 +328,8 @@ def _publish_relation(self, relation_file: str, tx: Transaction) -> Transaction:

count = 0
with open(relation_file, 'r', encoding='utf8') as relation_csv:
for rel_record in pandas.read_csv(relation_csv, na_filter=False).to_dict(orient="records"):
for rel_record in pandas.read_csv(relation_csv,
na_filter=False).to_dict(orient="records"):
# TODO not sure if deadlock on badge node arises in preporcessing or not
stmt, params = self._relation_preprocessor.preprocess_cypher(
start_label=rel_record[RELATION_START_LABEL],
Expand Down Expand Up @@ -396,7 +413,8 @@ def _create_props_body(self,
identifier.key1 = 'val1' , identifier.key2 = 'val2', identifier.key3 = val3
:param record_dict: A dict represents CSV row
:param excludes: set of excluded columns that does not need to be in properties (e.g: KEY, LABEL ...)
:param excludes: set of excluded columns that does not need to be in properties
(e.g: KEY, LABEL ...)
:param identifier: identifier that will be used in CYPHER query as shown on above example
:return: Properties body for Cypher statement
"""
Expand All @@ -414,6 +432,11 @@ def _create_props_body(self,
props.append(f"{identifier}.{PUBLISHED_TAG_PROPERTY_NAME} = '{self.publish_tag}'")
props.append(f"{identifier}.{LAST_UPDATED_EPOCH_MS} = timestamp()")

# add additional metatada fields from config
for k, v in self.additional_fields.items():
val = v if isinstance(v, int) or isinstance(v, float) else f"'{v}'"
props.append(f"{identifier}.{k}= {val}")

return ', '.join(props)

def _execute_statement(self,
Expand Down
16 changes: 10 additions & 6 deletions databuilder/setup.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import os

from setuptools import find_packages, setup

__version__ = '6.8.0'
__version__ = '6.9.0'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'requirements.txt')
with open(requirements_path, 'r') as requirements_file:
requirements = requirements_file.readlines()

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements-dev.txt')
with open(requirements_path) as requirements_file:
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'requirements-dev.txt')
with open(requirements_path, 'r') as requirements_file:
requirements_dev = requirements_file.readlines()

kafka = ['confluent-kafka==1.0.0']
Expand Down Expand Up @@ -93,7 +96,8 @@
]

all_deps = requirements + requirements_dev + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid + spark + feast + neptune + rds + atlas + salesforce + oracle + teradata
bigquery + jsonpath + db2 + dremio + druid + spark + feast + neptune + rds \
+ atlas + salesforce + oracle + teradata

setup(
name='amundsen-databuilder',
Expand Down

0 comments on commit 75b858e

Please sign in to comment.