Skip to content

Commit

Permalink
feat: Adding ability to include conditions along with node and relati…
Browse files Browse the repository at this point in the history
…on types in the neo4j staleness removal task (#1464)

* Adding ability to include conditions along with node and relation types in the neo4j staleness removal task

Signed-off-by: Kristen Armes <[email protected]>

* Addressing PR comments

Signed-off-by: Kristen Armes <[email protected]>

* Fixing up query templates

Signed-off-by: Kristen Armes <[email protected]>

* Fixing type issue in test

Signed-off-by: Kristen Armes <[email protected]>

* Fixing formatting requirements

Signed-off-by: Kristen Armes <[email protected]>
  • Loading branch information
kristenarmes authored Sep 3, 2021
1 parent 45d96dd commit 1b25881
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 165 deletions.
26 changes: 26 additions & 0 deletions databuilder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,32 @@ You can think this approach as TTL based eviction. This is particularly useful w

Above configuration is trying to delete stale usage relation (READ, READ_BY), by deleting READ or READ_BY relation that has not been published past 3 days. If number of elements to be removed is more than 10% per type, this task will be aborted without executing any deletion.

#### Using node and relation conditions to remove stale data
You may want to remove stale nodes and relations that meet certain conditions rather than all of a given type. To do this, you can specify the inputs to be a list of **TargetWithCondition** objects that each define a target type and a condition. Only stale nodes or relations of that type and that meet the condition will be removed when using this type of input.

Node conditions can make use of the predefined variable `target` which represents the node. Relation conditions can include the variables `target`, `start_node`, and `end_node` where `target` represents the relation and `start_node`/`end_node` represent the nodes on either side of the target relation. For some examples of conditions see below.

from databuilder.task.neo4j_staleness_removal_task import TargetWithCondition

task = Neo4jStalenessRemovalTask()
job_config_dict = {
'job.identifier': 'remove_stale_data_job',
'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
'task.remove_stale_data.neo4j_user': neo4j_user,
'task.remove_stale_data.neo4j_password': neo4j_password,
'task.remove_stale_data.staleness_max_pct': 10,
'task.remove_stale_data.target_nodes': [TargetWithCondition('Table', '(target)-[:COLUMN]->(:Column)'), # All Table nodes that have a directional COLUMN relation to a Column node
TargetWithCondition('Column', '(target)-[]-(:Table) AND target.name=\'column_name\'')], # All Column nodes named 'column_name' that have some relation to a Table node
'task.remove_stale_data.target_relations': [TargetWithCondition('COLUMN', '(start_node:Table)-[target]->(end_node:Column)'), # All COLUMN relations that connect from a Table node to a Column node
TargetWithCondition('COLUMN', '(start_node:Column)-[target]-(end_node)')], # All COLUMN relations that connect any direction between a Column node and another node
'task.remove_stale_data.milliseconds_to_expire': 86400000 * 3
}
job_config = ConfigFactory.from_dict(job_config_dict)
job = DefaultJob(conf=job_config, task=task)
job.launch()

You can include multiple inputs of the same type with different conditions as seen in the **target_relations** list above. Attribute checks can also be added as shown in the **target_nodes** list.

#### Dry run
Deletion is always scary and it's better to perform dryrun before put this into action. You can use Dry run to see what sort of Cypher query will be executed.

Expand Down
202 changes: 116 additions & 86 deletions databuilder/databuilder/task/neo4j_staleness_removal_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import textwrap
import time
from typing import (
Any, Dict, Iterable,
Any, Dict, Iterable, Union,
)

import neo4j
Expand Down Expand Up @@ -54,6 +54,12 @@
MARKER_VAR_NAME = 'marker'


class TargetWithCondition:
def __init__(self, target_type: str, condition: str) -> None:
self.target_type = target_type
self.condition = condition


class Neo4jStalenessRemovalTask(Task):
"""
A Specific task that is to remove stale nodes and relations in Neo4j.
Expand All @@ -64,6 +70,31 @@ class Neo4jStalenessRemovalTask(Task):
"""

delete_stale_nodes_statement = textwrap.dedent("""
MATCH (target:{{type}})
WHERE {staleness_condition}{{extra_condition}}
WITH target LIMIT $batch_size
DETACH DELETE (target)
RETURN count(*) as count
""")
delete_stale_relations_statement = textwrap.dedent("""
MATCH (start_node)-[target:{{type}}]-(end_node)
WHERE {staleness_condition}{{extra_condition}}
WITH target LIMIT $batch_size
DELETE target
RETURN count(*) as count
""")
validate_node_staleness_statement = textwrap.dedent("""
MATCH (target:{{type}})
WHERE {staleness_condition}{{extra_condition}}
RETURN count(*) as count
""")
validate_relation_staleness_statement = textwrap.dedent("""
MATCH (start_node)-[target:{{type}}]-(end_node)
WHERE {staleness_condition}{{extra_condition}}
RETURN count(*) as count
""")

def __init__(self) -> None:
pass

Expand Down Expand Up @@ -123,14 +154,8 @@ def validate(self) -> None:
self._validate_relation_staleness_pct()

def _delete_stale_nodes(self) -> None:
statement = textwrap.dedent("""
MATCH (n:{{type}})
WHERE {}
WITH n LIMIT $batch_size
DETACH DELETE (n)
RETURN COUNT(*) as count;
""")
self._batch_delete(statement=self._decorate_staleness(statement), targets=self.target_nodes)
self._batch_delete(statement=self._decorate_staleness(self.delete_stale_nodes_statement),
targets=self.target_nodes)

def _decorate_staleness(self,
statement: str
Expand All @@ -141,27 +166,21 @@ def _decorate_staleness(self,
:return:
"""
if self.ms_to_expire:
return statement.format(textwrap.dedent(f"""
n.publisher_last_updated_epoch_ms < (timestamp() - ${MARKER_VAR_NAME})
OR NOT EXISTS(n.publisher_last_updated_epoch_ms)"""))
return statement.format(staleness_condition=textwrap.dedent(f"""\
(target.publisher_last_updated_epoch_ms < (timestamp() - ${MARKER_VAR_NAME})
OR NOT EXISTS(target.publisher_last_updated_epoch_ms))"""))

return statement.format(textwrap.dedent(f"""
n.published_tag <> ${MARKER_VAR_NAME}
OR NOT EXISTS(n.published_tag)"""))
return statement.format(staleness_condition=textwrap.dedent(f"""\
(target.published_tag <> ${MARKER_VAR_NAME}
OR NOT EXISTS(target.published_tag))"""))

def _delete_stale_relations(self) -> None:
statement = textwrap.dedent("""
MATCH ()-[n:{{type}}]-()
WHERE {}
WITH n LIMIT $batch_size
DELETE n
RETURN count(*) as count;
""")
self._batch_delete(statement=self._decorate_staleness(statement), targets=self.target_relations)
self._batch_delete(statement=self._decorate_staleness(self.delete_stale_relations_statement),
targets=self.target_relations)

def _batch_delete(self,
statement: str,
targets: Iterable[str]
targets: Union[Iterable[str], Iterable[TargetWithCondition]]
) -> None:
"""
Performing huge amount of deletion could degrade Neo4j performance. Therefore, it's taking batch deletion here.
Expand All @@ -170,10 +189,18 @@ def _batch_delete(self,
:return:
"""
for t in targets:
LOGGER.info('Deleting stale data of %s with batch size %i', t, self.batch_size)
if isinstance(t, TargetWithCondition):
target_type = t.target_type
extra_condition = ' AND ' + t.condition
else:
target_type = t
extra_condition = ''

LOGGER.info('Deleting stale data of %s with batch size %i', target_type, self.batch_size)
total_count = 0
while True:
results = self._execute_cypher_query(statement=statement.format(type=t),
results = self._execute_cypher_query(statement=statement.format(type=target_type,
extra_condition=extra_condition),
param_dict={'batch_size': self.batch_size,
MARKER_VAR_NAME: self.marker},
dry_run=self.dry_run)
Expand All @@ -182,75 +209,78 @@ def _batch_delete(self,
total_count = total_count + count
if count == 0:
break
LOGGER.info('Deleted %i stale data of %s', total_count, t)
LOGGER.info('Deleted %i stale data of %s', total_count, target_type)

def _validate_staleness_pct(self,
total_records: Iterable[Dict[str, Any]],
stale_records: Iterable[Dict[str, Any]],
types: Iterable[str]
total_record_count: int,
stale_record_count: int,
target_type: str
) -> None:
total_count_dict = {record['type']: int(record['count']) for record in total_records}

for record in stale_records:
type_str = record['type']
if type_str not in types:
continue

stale_count = record['count']
if stale_count == 0:
continue
if total_record_count == 0 or stale_record_count == 0:
return

node_count = total_count_dict[type_str]
stale_pct = stale_count * 100 / node_count
stale_pct = stale_record_count * 100 / total_record_count

threshold = self.staleness_pct_dict.get(type_str, self.staleness_pct)
if stale_pct >= threshold:
raise Exception(f'Staleness percentage of {type_str} is {stale_pct} %. '
f'Stopping due to over threshold {threshold} %')
threshold = self.staleness_pct_dict.get(target_type, self.staleness_pct)
if stale_pct >= threshold:
raise Exception(f'Staleness percentage of {target_type} is {stale_pct} %. '
f'Stopping due to over threshold {threshold} %')

def _validate_node_staleness_pct(self) -> None:
total_nodes_statement = textwrap.dedent("""
MATCH (n)
WITH DISTINCT labels(n) as node, count(*) as count
RETURN head(node) as type, count
""")

stale_nodes_statement = textwrap.dedent("""
MATCH (n)
WHERE {}
WITH DISTINCT labels(n) as node, count(*) as count
RETURN head(node) as type, count
""")

stale_nodes_statement = textwrap.dedent(self._decorate_staleness(stale_nodes_statement))

total_records = self._execute_cypher_query(statement=total_nodes_statement)
stale_records = self._execute_cypher_query(statement=stale_nodes_statement,
param_dict={MARKER_VAR_NAME: self.marker})
self._validate_staleness_pct(total_records=total_records,
stale_records=stale_records,
types=self.target_nodes)
total_nodes_statement = textwrap.dedent(
self.validate_node_staleness_statement.format(staleness_condition='true'))
stale_nodes_statement = textwrap.dedent(
self._decorate_staleness(self.validate_node_staleness_statement))

for t in self.target_nodes:
if isinstance(t, TargetWithCondition):
target_type = t.target_type
extra_condition = ' AND ' + t.condition
else:
target_type = t
extra_condition = ''

total_records = self._execute_cypher_query(
statement=total_nodes_statement.format(type=target_type,
extra_condition=extra_condition))
stale_records = self._execute_cypher_query(
statement=stale_nodes_statement.format(type=target_type,
extra_condition=extra_condition),
param_dict={MARKER_VAR_NAME: self.marker})

total_record_value = next(iter(total_records), None)
stale_record_value = next(iter(stale_records), None)
self._validate_staleness_pct(total_record_count=total_record_value['count'] if total_record_value else 0,
stale_record_count=stale_record_value['count'] if stale_record_value else 0,
target_type=target_type)

def _validate_relation_staleness_pct(self) -> None:
total_relations_statement = textwrap.dedent("""
MATCH ()-[r]-()
RETURN type(r) as type, count(*) as count;
""")

stale_relations_statement = textwrap.dedent("""
MATCH ()-[n]-()
WHERE {}
RETURN type(n) as type, count(*) as count
""")

stale_relations_statement = textwrap.dedent(self._decorate_staleness(stale_relations_statement))

total_records = self._execute_cypher_query(statement=total_relations_statement)
stale_records = self._execute_cypher_query(statement=stale_relations_statement,
param_dict={MARKER_VAR_NAME: self.marker})
self._validate_staleness_pct(total_records=total_records,
stale_records=stale_records,
types=self.target_relations)
total_relations_statement = textwrap.dedent(
self.validate_relation_staleness_statement.format(staleness_condition='true'))
stale_relations_statement = textwrap.dedent(
self._decorate_staleness(self.validate_relation_staleness_statement))

for t in self.target_relations:
if isinstance(t, TargetWithCondition):
target_type = t.target_type
extra_condition = ' AND ' + t.condition
else:
target_type = t
extra_condition = ''

total_records = self._execute_cypher_query(
statement=total_relations_statement.format(type=target_type,
extra_condition=extra_condition))
stale_records = self._execute_cypher_query(
statement=stale_relations_statement.format(type=target_type,
extra_condition=extra_condition),
param_dict={MARKER_VAR_NAME: self.marker})

total_record_value = next(iter(total_records), None)
stale_record_value = next(iter(stale_records), None)
self._validate_staleness_pct(total_record_count=total_record_value['count'] if total_record_value else 0,
stale_record_count=stale_record_value['count'] if stale_record_value else 0,
target_type=target_type)

def _execute_cypher_query(self,
statement: str,
Expand Down
Loading

0 comments on commit 1b25881

Please sign in to comment.