Skip to content

Commit

Permalink
fix: Use managed read transactions for queries in metadata service (#…
Browse files Browse the repository at this point in the history
…2119)

* Use managed read transactions for queries in metadata service

Signed-off-by: Kristen Armes <[email protected]>

* Return same type as before

Signed-off-by: Kristen Armes <[email protected]>

---------

Signed-off-by: Kristen Armes <[email protected]>
  • Loading branch information
kristenarmes authored Mar 22, 2023
1 parent 34b7eef commit 61ef5d8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
18 changes: 15 additions & 3 deletions metadata/metadata_service/proxy/neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from flask import current_app, has_app_context
from neo4j import GraphDatabase, Record # noqa: F401
from neo4j import GraphDatabase, Record, Transaction # noqa: F401
from neo4j.api import (SECURITY_TYPE_SECURE,
SECURITY_TYPE_SELF_SIGNED_CERTIFICATE, parse_neo4j_uri)
from neo4j.exceptions import ClientError
Expand Down Expand Up @@ -56,6 +56,16 @@
LOGGER = logging.getLogger(__name__)


def execute_statement(tx: Transaction, stmt: str, params: dict = None) -> List[Record]:
"""
Executes statement against Neo4j. If execution fails, it rollsback and raises exception.
"""
LOGGER.debug('Executing statement: %s with params %s', stmt, params)

result = tx.run(stmt, parameters=params)
return [record for record in result]


def get_single_record(records_list: List[Record]) -> Record:
"""
Helper method to get single item from _execute_cypher_query return when only one item is expected.
Expand Down Expand Up @@ -551,14 +561,16 @@ def _safe_get(self, dct, *keys):
def _execute_cypher_query(self, *,
statement: str,
param_dict: Dict[str, Any]) -> List[Record]:
"""
Execute Cypher queries using managed read transactions
"""
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('Executing Cypher query: {statement} with params {params}: '.format(statement=statement,
params=param_dict))
start = time.time()
try:
with self._driver.session(database=self._database_name) as session:
result = session.run(query=statement, **param_dict)
return [record for record in result]
return session.read_transaction(execute_statement, statement, param_dict)

finally:
# TODO: Add support on statsd
Expand Down
2 changes: 1 addition & 1 deletion metadata/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

__version__ = '3.12.0'
__version__ = '3.12.1'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
Expand Down

0 comments on commit 61ef5d8

Please sign in to comment.