Skip to content

Commit

Permalink
YDA-5810 fix rev cleanup handling removed data
Browse files Browse the repository at this point in the history
Update revision logic to remove revisions of versioned data objects that
no longer exist in the revision cleanup process.
  • Loading branch information
stsnel committed Jul 16, 2024
1 parent eb00481 commit dc2dfca
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 26 deletions.
40 changes: 34 additions & 6 deletions revision_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_revision_store_path(zone, trailing_slash=False):
return os.path.join("/" + zone, constants.UUREVISIONCOLLECTION.lstrip(os.path.sep))


def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_time_bound, verbose):
def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_time_bound, original_exists, verbose):
"""Get revision data objects for a particular versioned data object that should be deleted, as per
a given revision strategy.
Expand All @@ -91,10 +91,19 @@ def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_tim
:param revisions: List of revisions for a particular data object. Each revision is represented by a 3-tuple
(revision ID, modification time in epoch time, original path)
:param initial_upper_time_bound: Initial upper time bound for first bucket
:param original_exists: Boolean value that indicates whether the original versioned data object still exists
:param verbose: Whether to print additional information for troubleshooting (boolean)
:returns: List of candidates for deletion based on the specified revision strategy
"""

if not original_exists:
if verbose:
for revision in revisions:
log.write(ctx, 'Scheduling revision <{}> for removal. Original no longer exists.'.format(
revision[2]))
return [revision[0] for revision in revisions]

buckets = revision_strategy.get_buckets()
deletion_candidates = []

Expand Down Expand Up @@ -173,9 +182,9 @@ def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_tim
return deletion_candidates


def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verbose):
def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, original_exists_dict, verbose):
"""Filters out revisioned data objects from a list if we can easily determine that they don't meet criteria for being removed,
for example if the number of revisions is at most one, and the minimum bucket size is at least one.
for example if the number of revisions of an existing versioned data object is at most one.
This prefilter is performed in the scan phase. A full check of the remaining versioned data objects will be performed in the
processing phase.
Expand All @@ -190,6 +199,8 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb
:param revision_strategy_name: Select a revision strategy based on a string ('A', 'B', 'Simple'). See
https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md
for an explanation.
:param original_exists_dict: Dictionary where keys are paths of versioned data objects. Values are booleans that indicate whether
the original data object still exists
:param verbose: Whether to print verbose information for troubleshooting (boolean)
:returns: List of versioned data objects, after prefiltered versioned data objects / revisions have been
Expand All @@ -198,10 +209,27 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb
path)
"""
minimum_bucket_size = get_revision_strategy(revision_strategy_name).get_minimum_bucket_size()
results = []
for object in revisions_list:
if len(object) > 0:
if original_exists_dict[object[0][2]]:
# If the versioned data object still exists, we can
# remove it in the prefilter stage if it has only a single
# revision, assuming that the size of the smallest bucket is
# at least 1.
if len(object) > min(minimum_bucket_size, 1):
results.append(object)
else:
# Revisions of versioned data objects that do not exist
# anymore should never be removed in the prefilter stage,
# since revisions should always be removed if their versioned
# data object no longer exists.
results.append(object)

if verbose:
log.write(ctx, "Removing following revisioned data objects in prefiltering for cleanup: "
+ str([object for object in revisions_list if len(object) <= minimum_bucket_size]))
return [object for object in revisions_list if len(object) > min(minimum_bucket_size, 1)]
log.write(ctx, "Remaining revisions for cleanup after prefiltering: " + str(results))

return results


def get_resc(row):
Expand Down
59 changes: 57 additions & 2 deletions revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,8 @@ def rule_revisions_cleanup_scan(ctx, revision_strategy_name, verbose_flag):
log.write(ctx, "Scanning revisions: " + str(revisions_list))

revision_data = revision_cleanup_scan_revision_objects(ctx, revisions_list)
prefiltered_revision_data = revision_cleanup_prefilter(ctx, revision_data, revision_strategy_name, verbose)
original_exists_dict = get_original_exists_dict(ctx, revision_data)
prefiltered_revision_data = revision_cleanup_prefilter(ctx, revision_data, revision_strategy_name, original_exists_dict, verbose)
output_data_size = len(prefiltered_revision_data)
if output_data_size > 0:
if verbose:
Expand All @@ -854,6 +855,59 @@ def rule_revisions_cleanup_scan(ctx, revision_strategy_name, verbose_flag):
return 'Revision store cleanup scan job completed'


def get_original_exists_dict(ctx, revision_data):
"""Returns a dictionary that indicates which original data objects of revision data still exist
:param ctx: Combined type of a callback and rei struct
:param revision_data: List of lists of revision tuples in (data_id, timestamp, revision_path) format
:returns: dictionary, in which the keys are revision path. The values are booleans, and indicate whether
the versioned data object of the revision still exists. If the revision data object does not
have AVUs that refer to the versioned data object, assume it still exists.
"""
result = dict()
for data_object_data in revision_data:
for (data_id, timestamp, revision_path) in data_object_data:

try:
result[revision_path] = versioned_data_object_exists(ctx, revision_path)
except KeyError:
# If we can't determine the original path, we assume the original data object
# still exists, so that it is not automatically cleaned up by the revision cleanup job.
# An error is logged by versioned_data_object_exists
result[original_path] = True

return result


def versioned_data_object_exists(ctx, revision_path):
"""Checks whether the version data object of a revision still exists
:param ctx: Combined type of a callback and rei struct
:param revision_path: Logical path of revision data object
:returns: boolean value that indicates whether the versioned data object still
exists.
:raises KeyError: If revision data object does not have revision AVUs
that point to versioned data object.
"""

revision_avus = avu.of_data(ctx, revision_path)
avu_dict = {a: v for (a, v, u) in revision_avus}

try:
original_path = os.path.join(avu_dict["org_original_coll_name"],
avu_dict["org_original_data_name"])
return data_object.exists(ctx, original_path)
except KeyError:
# If we can't determine the original path, we assume the original data object
# still exists, so that it is not automatically cleaned up by the revision cleanup job.
log.write(ctx, "Error: could not find original data object for revision " + revision_path
+ " because revision does not have expected revision AVUs.")
raise


@rule.make(inputs=[0, 1, 2], outputs=[3])
def rule_revisions_cleanup_process(ctx, revision_strategy_name, endOfCalendarDay, verbose_flag):
"""Applies the selected revision strategy to a batch of spooled revision data
Expand Down Expand Up @@ -896,7 +950,8 @@ def rule_revisions_cleanup_process(ctx, revision_strategy_name, endOfCalendarDay
if verbose:
log.write(ctx, 'Processing revisions {} ...'.format(str(revisions)))
# Process the original path conform the bucket settings
candidates = get_deletion_candidates(ctx, revision_strategy, revisions, end_of_calendar_day, verbose)
original_exists = versioned_data_object_exists(ctx, revisions[0][2]) if len(revisions) > 0 else False
candidates = get_deletion_candidates(ctx, revision_strategy, revisions, end_of_calendar_day, original_exists, verbose)
num_candidates += len(candidates)

# Create lookup table for revision paths if needed
Expand Down
75 changes: 57 additions & 18 deletions unit-tests/test_revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,51 +64,81 @@ def test_revision_strategy(self):
# Tests total length of all buckets in seconds; equivalent to roughly 29 weeks.
self.assertEqual(strategy.get_total_bucket_timespan(), 17755200)

def test_revision_cleanup_prefilter(self):
def test_revision_cleanup_prefilter_empty(self):
empty_input = []
empty_output = revision_cleanup_prefilter(None, empty_input, "B", False)
empty_output = revision_cleanup_prefilter(None, empty_input, "B", dict(), False)
self.assertEqual(empty_output, [])

def test_revision_cleanup_prefilter_single_exists(self):
single_input = [[(1, 123, "/foo/bar/baz")]]
single_output = revision_cleanup_prefilter(None, single_input, "B", False)
exists_dict = {"/foo/bar/baz": True}
single_output = revision_cleanup_prefilter(None, single_input, "B", exists_dict, False)
self.assertEqual(single_output, []) # Does not exceed min. bucket size for strategy B

def test_revision_cleanup_prefilter_single_doesnotexist(self):
single_input = [[(1, 123, "/foo/bar/baz")]]
exists_dict = {"/foo/bar/baz": False}
single_output = revision_cleanup_prefilter(None, single_input, "B", exists_dict, False)
# Do not prefilter if versioned data object no longer exists
self.assertEqual(single_output, single_input)

def test_revision_cleanup_prefilter_two_exists(self):
two_input = [[(1, 123, "/foo/bar/baz"), (2, 234, "/foo/bar/baz")]]
two_output = revision_cleanup_prefilter(None, two_input, "B", False)
exists_dict = {"/foo/bar/baz": True}
two_output = revision_cleanup_prefilter(None, two_input, "B", exists_dict, False)
# Does not exceed min. bucket size for strategy B
# But more than 1 revision (so cannot prefilter, because
# revisions could be outside defined buckets)
self.assertEqual(two_output, two_input)

def test_revision_cleanup_prefilter_two_doesnotexist(self):
exists_dict = {"/foo/bar/baz": False}
two_input = [[(1, 123, "/foo/bar/baz"), (2, 234, "/foo/bar/baz")]]
two_output = revision_cleanup_prefilter(None, two_input, "B", exists_dict, False)
# Do not prefilter if versioned data object no longer exists
self.assertEqual(two_output, two_input)

def test_revision_cleanup_prefilter_three(self):
three_input = [[(1, 123, "/foo/bar/baz"), (2, 234, "/foo/bar/baz"), (3, 345, "/foo/bar/baz")]]
three_output = revision_cleanup_prefilter(None, three_input, "B", False)
exists_dict = {"/foo/bar/baz": True}
three_output = revision_cleanup_prefilter(None, three_input, "B", exists_dict, False)
self.assertEqual(three_output, three_input) # Exceeds min. bucket size for strategy B

def test_revision_deletion_candidates_empty(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = []
output = get_deletion_candidates(None, revision_strategy, revisions, dummy_time, False)
output = get_deletion_candidates(None, revision_strategy, revisions, dummy_time, True, False)
self.assertEqual(output, [])

def test_revision_deletion_candidates_1_bucket_no_exceed(self):
def test_revision_deletion_candidates_1_bucket_no_exceed_exists(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = [(1, dummy_time - 60, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [])

def test_revision_deletion_candidates_1_bucket_no_exceed_doesnotexist(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = [(1, dummy_time - 60, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False, False)
self.assertEqual(output, [1])

def test_revision_deletion_candidates_2_bucket_no_exceed(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = [(1, dummy_time - 60, "/foo/bar/baz"),
(2, dummy_time - 120, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [])

def test_revision_deletion_candidates_2_multi_bucket_no_exceed(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = [(1, dummy_time - 60, "/foo/bar/baz"),
(2, dummy_time - 13 * 3600, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [])

def test_revision_deletion_candidates_4_multi_bucket_no_exceed(self):
Expand All @@ -118,18 +148,27 @@ def test_revision_deletion_candidates_4_multi_bucket_no_exceed(self):
(2, dummy_time - 120, "/foo/bar/baz"),
(3, dummy_time - 3600 * 16, "/foo/bar/baz"),
(4, dummy_time - 3600 * 17, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [])

def test_revision_deletion_candidates_3_bucket_exceed(self):
def test_revision_deletion_candidates_3_bucket_exceed_exists(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = [(1, dummy_time - 60, "/foo/bar/baz"),
(2, dummy_time - 120, "/foo/bar/baz"),
(3, dummy_time - 180, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [2])

def test_revision_deletion_candidates_3_bucket_exceed_doesnotexist(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = [(1, dummy_time - 60, "/foo/bar/baz"),
(2, dummy_time - 120, "/foo/bar/baz"),
(3, dummy_time - 180, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False, False)
self.assertEqual(output, [1, 2, 3])

def test_revision_deletion_candidates_6_buckets_exceed(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
Expand All @@ -139,22 +178,22 @@ def test_revision_deletion_candidates_6_buckets_exceed(self):
(4, dummy_time - 3600 * 16 - 60, "/foo/bar/baz"),
(5, dummy_time - 3600 * 16 - 120, "/foo/bar/baz"),
(6, dummy_time - 3600 * 16 - 180, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [2, 5])

def test_revision_deletion_1_before_buckets(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = [(1, dummy_time - 365 * 24 * 3600, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [])

def test_revision_deletion_1_bucket_1_before(self):
dummy_time = 1000000000
revision_strategy = get_revision_strategy("B")
revisions = [(1, dummy_time - 60, "/foo/bar/baz"),
(2, dummy_time - 365 * 24 * 3600, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [2])

def test_revision_deletion_1_bucket_2_before(self):
Expand All @@ -163,7 +202,7 @@ def test_revision_deletion_1_bucket_2_before(self):
revisions = [(1, dummy_time - 60, "/foo/bar/baz"),
(2, dummy_time - 365 * 24 * 3600 - 60, "/foo/bar/baz"),
(3, dummy_time - 365 * 24 * 3600 - 90, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [2, 3])

def test_revision_deletion_3_before_buckets(self):
Expand All @@ -172,5 +211,5 @@ def test_revision_deletion_3_before_buckets(self):
revisions = [(1, dummy_time - 365 * 24 * 3600 - 60, "/foo/bar/baz"),
(2, dummy_time - 365 * 24 * 3600 - 120, "/foo/bar/baz"),
(3, dummy_time - 365 * 24 * 3600 - 180, "/foo/bar/baz")]
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False)
output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False)
self.assertEqual(output, [2, 3])

0 comments on commit dc2dfca

Please sign in to comment.