diff --git a/metadata/metadata_service/proxy/neo4j_proxy.py b/metadata/metadata_service/proxy/neo4j_proxy.py index 161cbfedd6..17882e1563 100644 --- a/metadata/metadata_service/proxy/neo4j_proxy.py +++ b/metadata/metadata_service/proxy/neo4j_proxy.py @@ -27,7 +27,7 @@ from beaker.cache import CacheManager from beaker.util import parse_cache_config_options from flask import current_app, has_app_context -from neo4j import GraphDatabase, Record # noqa: F401 +from neo4j import GraphDatabase, Record, Transaction # noqa: F401 from neo4j.api import (SECURITY_TYPE_SECURE, SECURITY_TYPE_SELF_SIGNED_CERTIFICATE, parse_neo4j_uri) from neo4j.exceptions import ClientError @@ -56,6 +56,16 @@ LOGGER = logging.getLogger(__name__) +def execute_statement(tx: Transaction, stmt: str, params: dict = None) -> List[Record]: + """ + Executes statement against Neo4j. If execution fails, it rollsback and raises exception. + """ + LOGGER.debug('Executing statement: %s with params %s', stmt, params) + + result = tx.run(stmt, parameters=params) + return [record for record in result] + + def get_single_record(records_list: List[Record]) -> Record: """ Helper method to get single item from _execute_cypher_query return when only one item is expected. @@ -551,14 +561,16 @@ def _safe_get(self, dct, *keys): def _execute_cypher_query(self, *, statement: str, param_dict: Dict[str, Any]) -> List[Record]: + """ + Execute Cypher queries using managed read transactions + """ if LOGGER.isEnabledFor(logging.DEBUG): LOGGER.debug('Executing Cypher query: {statement} with params {params}: '.format(statement=statement, params=param_dict)) start = time.time() try: with self._driver.session(database=self._database_name) as session: - result = session.run(query=statement, **param_dict) - return [record for record in result] + return session.read_transaction(execute_statement, statement, param_dict) finally: # TODO: Add support on statsd diff --git a/metadata/setup.py b/metadata/setup.py index 4849902581..717fb6eeb0 100644 --- a/metadata/setup.py +++ b/metadata/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -__version__ = '3.12.0' +__version__ = '3.12.1' requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') with open(requirements_path) as requirements_file: