diff --git a/revision_utils.py b/revision_utils.py index d77901ebb..57646712b 100644 --- a/revision_utils.py +++ b/revision_utils.py @@ -82,7 +82,7 @@ def get_revision_store_path(zone, trailing_slash=False): return os.path.join("/" + zone, constants.UUREVISIONCOLLECTION.lstrip(os.path.sep)) -def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_time_bound, verbose): +def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_time_bound, original_exists, verbose): """Get revision data objects for a particular versioned data object that should be deleted, as per a given revision strategy. @@ -91,10 +91,19 @@ def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_tim :param revisions: List of revisions for a particular data object. Each revision is represented by a 3-tuple (revision ID, modification time in epoch time, original path) :param initial_upper_time_bound: Initial upper time bound for first bucket + :param original_exists: Boolean value that indicates whether the original versioned data object still exists :param verbose: Whether to print additional information for troubleshooting (boolean) :returns: List of candidates for deletion based on the specified revision strategy """ + + if not original_exists: + if verbose: + for revision in revisions: + log.write(ctx, 'Scheduling revision <{}> for removal. Original no longer exists.'.format( + revision[2])) + return [revision[0] for revision in revisions] + buckets = revision_strategy.get_buckets() deletion_candidates = [] @@ -173,9 +182,9 @@ def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_tim return deletion_candidates -def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verbose): +def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, original_exists_dict, verbose): """Filters out revisioned data objects from a list if we can easily determine that they don't meet criteria for being removed, - for example if the number of revisions is at most one, and the minimum bucket size is at least one. + for example if the number of revisions of an existing versioned data object is at most one. This prefilter is performed in the scan phase. A full check of the remaining versioned data objects will be performed in the processing phase. @@ -190,6 +199,8 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb :param revision_strategy_name: Select a revision strategy based on a string ('A', 'B', 'Simple'). See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md for an explanation. + :param original_exists_dict: Dictionary where keys are paths of versioned data objects. Values are booleans that indicate whether + the original data object still exists :param verbose: Whether to print verbose information for troubleshooting (boolean) :returns: List of versioned data objects, after prefiltered versioned data objects / revisions have been @@ -198,10 +209,27 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb path) """ minimum_bucket_size = get_revision_strategy(revision_strategy_name).get_minimum_bucket_size() + results = [] + for object in revisions_list: + if len(object) > 0: + if original_exists_dict[object[0][2]]: + # If the versioned data object still exists, we can + # remove it in the prefilter stage if it has only a single + # revision, assuming that the size of the smallest bucket is + # at least 1. + if len(object) > min(minimum_bucket_size, 1): + results.append(object) + else: + # Revisions of versioned data objects that do not exist + # anymore should never be removed in the prefilter stage, + # since revisions should always be removed if their versioned + # data object no longer exists. + results.append(object) + if verbose: - log.write(ctx, "Removing following revisioned data objects in prefiltering for cleanup: " - + str([object for object in revisions_list if len(object) <= minimum_bucket_size])) - return [object for object in revisions_list if len(object) > min(minimum_bucket_size, 1)] + log.write(ctx, "Remaining revisions for cleanup after prefiltering: " + str(results)) + + return results def get_resc(row): diff --git a/revisions.py b/revisions.py index 856dee243..321d5307b 100644 --- a/revisions.py +++ b/revisions.py @@ -840,7 +840,8 @@ def rule_revisions_cleanup_scan(ctx, revision_strategy_name, verbose_flag): log.write(ctx, "Scanning revisions: " + str(revisions_list)) revision_data = revision_cleanup_scan_revision_objects(ctx, revisions_list) - prefiltered_revision_data = revision_cleanup_prefilter(ctx, revision_data, revision_strategy_name, verbose) + original_exists_dict = get_original_exists_dict(ctx, revision_data) + prefiltered_revision_data = revision_cleanup_prefilter(ctx, revision_data, revision_strategy_name, original_exists_dict, verbose) output_data_size = len(prefiltered_revision_data) if output_data_size > 0: if verbose: @@ -854,6 +855,59 @@ def rule_revisions_cleanup_scan(ctx, revision_strategy_name, verbose_flag): return 'Revision store cleanup scan job completed' +def get_original_exists_dict(ctx, revision_data): + """Returns a dictionary that indicates which original data objects of revision data still exist + + :param ctx: Combined type of a callback and rei struct + :param revision_data: List of lists of revision tuples in (data_id, timestamp, revision_path) format + + :returns: dictionary, in which the keys are revision path. The values are booleans, and indicate whether + the versioned data object of the revision still exists. If the revision data object does not + have AVUs that refer to the versioned data object, assume it still exists. + """ + result = dict() + for data_object_data in revision_data: + for (data_id, timestamp, revision_path) in data_object_data: + + try: + result[revision_path] = versioned_data_object_exists(ctx, revision_path) + except KeyError: + # If we can't determine the original path, we assume the original data object + # still exists, so that it is not automatically cleaned up by the revision cleanup job. + # An error is logged by versioned_data_object_exists + result[original_path] = True + + return result + + +def versioned_data_object_exists(ctx, revision_path): + """Checks whether the version data object of a revision still exists + + :param ctx: Combined type of a callback and rei struct + :param revision_path: Logical path of revision data object + + :returns: boolean value that indicates whether the versioned data object still + exists. + + :raises KeyError: If revision data object does not have revision AVUs + that point to versioned data object. + """ + + revision_avus = avu.of_data(ctx, revision_path) + avu_dict = {a: v for (a, v, u) in revision_avus} + + try: + original_path = os.path.join(avu_dict["org_original_coll_name"], + avu_dict["org_original_data_name"]) + return data_object.exists(ctx, original_path) + except KeyError: + # If we can't determine the original path, we assume the original data object + # still exists, so that it is not automatically cleaned up by the revision cleanup job. + log.write(ctx, "Error: could not find original data object for revision " + revision_path + + " because revision does not have expected revision AVUs.") + raise + + @rule.make(inputs=[0, 1, 2], outputs=[3]) def rule_revisions_cleanup_process(ctx, revision_strategy_name, endOfCalendarDay, verbose_flag): """Applies the selected revision strategy to a batch of spooled revision data @@ -896,7 +950,8 @@ def rule_revisions_cleanup_process(ctx, revision_strategy_name, endOfCalendarDay if verbose: log.write(ctx, 'Processing revisions {} ...'.format(str(revisions))) # Process the original path conform the bucket settings - candidates = get_deletion_candidates(ctx, revision_strategy, revisions, end_of_calendar_day, verbose) + original_exists = versioned_data_object_exists(ctx, revisions[0][2]) if len(revisions) > 0 else False + candidates = get_deletion_candidates(ctx, revision_strategy, revisions, end_of_calendar_day, original_exists, verbose) num_candidates += len(candidates) # Create lookup table for revision paths if needed diff --git a/unit-tests/test_revisions.py b/unit-tests/test_revisions.py index f718e955b..522aab1cb 100644 --- a/unit-tests/test_revisions.py +++ b/unit-tests/test_revisions.py @@ -64,43 +64,73 @@ def test_revision_strategy(self): # Tests total length of all buckets in seconds; equivalent to roughly 29 weeks. self.assertEqual(strategy.get_total_bucket_timespan(), 17755200) - def test_revision_cleanup_prefilter(self): + def test_revision_cleanup_prefilter_empty(self): empty_input = [] - empty_output = revision_cleanup_prefilter(None, empty_input, "B", False) + empty_output = revision_cleanup_prefilter(None, empty_input, "B", dict(), False) self.assertEqual(empty_output, []) + + def test_revision_cleanup_prefilter_single_exists(self): single_input = [[(1, 123, "/foo/bar/baz")]] - single_output = revision_cleanup_prefilter(None, single_input, "B", False) + exists_dict = {"/foo/bar/baz": True} + single_output = revision_cleanup_prefilter(None, single_input, "B", exists_dict, False) self.assertEqual(single_output, []) # Does not exceed min. bucket size for strategy B + + def test_revision_cleanup_prefilter_single_doesnotexist(self): + single_input = [[(1, 123, "/foo/bar/baz")]] + exists_dict = {"/foo/bar/baz": False} + single_output = revision_cleanup_prefilter(None, single_input, "B", exists_dict, False) + # Do not prefilter if versioned data object no longer exists + self.assertEqual(single_output, single_input) + + def test_revision_cleanup_prefilter_two_exists(self): two_input = [[(1, 123, "/foo/bar/baz"), (2, 234, "/foo/bar/baz")]] - two_output = revision_cleanup_prefilter(None, two_input, "B", False) + exists_dict = {"/foo/bar/baz": True} + two_output = revision_cleanup_prefilter(None, two_input, "B", exists_dict, False) # Does not exceed min. bucket size for strategy B # But more than 1 revision (so cannot prefilter, because # revisions could be outside defined buckets) self.assertEqual(two_output, two_input) + + def test_revision_cleanup_prefilter_two_doesnotexist(self): + exists_dict = {"/foo/bar/baz": False} + two_input = [[(1, 123, "/foo/bar/baz"), (2, 234, "/foo/bar/baz")]] + two_output = revision_cleanup_prefilter(None, two_input, "B", exists_dict, False) + # Do not prefilter if versioned data object no longer exists + self.assertEqual(two_output, two_input) + + def test_revision_cleanup_prefilter_three(self): three_input = [[(1, 123, "/foo/bar/baz"), (2, 234, "/foo/bar/baz"), (3, 345, "/foo/bar/baz")]] - three_output = revision_cleanup_prefilter(None, three_input, "B", False) + exists_dict = {"/foo/bar/baz": True} + three_output = revision_cleanup_prefilter(None, three_input, "B", exists_dict, False) self.assertEqual(three_output, three_input) # Exceeds min. bucket size for strategy B def test_revision_deletion_candidates_empty(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") revisions = [] - output = get_deletion_candidates(None, revision_strategy, revisions, dummy_time, False) + output = get_deletion_candidates(None, revision_strategy, revisions, dummy_time, True, False) self.assertEqual(output, []) - def test_revision_deletion_candidates_1_bucket_no_exceed(self): + def test_revision_deletion_candidates_1_bucket_no_exceed_exists(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") revisions = [(1, dummy_time - 60, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, []) + def test_revision_deletion_candidates_1_bucket_no_exceed_doesnotexist(self): + dummy_time = 1000000000 + revision_strategy = get_revision_strategy("B") + revisions = [(1, dummy_time - 60, "/foo/bar/baz")] + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False, False) + self.assertEqual(output, [1]) + def test_revision_deletion_candidates_2_bucket_no_exceed(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") revisions = [(1, dummy_time - 60, "/foo/bar/baz"), (2, dummy_time - 120, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, []) def test_revision_deletion_candidates_2_multi_bucket_no_exceed(self): @@ -108,7 +138,7 @@ def test_revision_deletion_candidates_2_multi_bucket_no_exceed(self): revision_strategy = get_revision_strategy("B") revisions = [(1, dummy_time - 60, "/foo/bar/baz"), (2, dummy_time - 13 * 3600, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, []) def test_revision_deletion_candidates_4_multi_bucket_no_exceed(self): @@ -118,18 +148,27 @@ def test_revision_deletion_candidates_4_multi_bucket_no_exceed(self): (2, dummy_time - 120, "/foo/bar/baz"), (3, dummy_time - 3600 * 16, "/foo/bar/baz"), (4, dummy_time - 3600 * 17, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, []) - def test_revision_deletion_candidates_3_bucket_exceed(self): + def test_revision_deletion_candidates_3_bucket_exceed_exists(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") revisions = [(1, dummy_time - 60, "/foo/bar/baz"), (2, dummy_time - 120, "/foo/bar/baz"), (3, dummy_time - 180, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, [2]) + def test_revision_deletion_candidates_3_bucket_exceed_doesnotexist(self): + dummy_time = 1000000000 + revision_strategy = get_revision_strategy("B") + revisions = [(1, dummy_time - 60, "/foo/bar/baz"), + (2, dummy_time - 120, "/foo/bar/baz"), + (3, dummy_time - 180, "/foo/bar/baz")] + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False, False) + self.assertEqual(output, [1, 2, 3]) + def test_revision_deletion_candidates_6_buckets_exceed(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") @@ -139,14 +178,14 @@ def test_revision_deletion_candidates_6_buckets_exceed(self): (4, dummy_time - 3600 * 16 - 60, "/foo/bar/baz"), (5, dummy_time - 3600 * 16 - 120, "/foo/bar/baz"), (6, dummy_time - 3600 * 16 - 180, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, [2, 5]) def test_revision_deletion_1_before_buckets(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") revisions = [(1, dummy_time - 365 * 24 * 3600, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, []) def test_revision_deletion_1_bucket_1_before(self): @@ -154,7 +193,7 @@ def test_revision_deletion_1_bucket_1_before(self): revision_strategy = get_revision_strategy("B") revisions = [(1, dummy_time - 60, "/foo/bar/baz"), (2, dummy_time - 365 * 24 * 3600, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, [2]) def test_revision_deletion_1_bucket_2_before(self): @@ -163,7 +202,7 @@ def test_revision_deletion_1_bucket_2_before(self): revisions = [(1, dummy_time - 60, "/foo/bar/baz"), (2, dummy_time - 365 * 24 * 3600 - 60, "/foo/bar/baz"), (3, dummy_time - 365 * 24 * 3600 - 90, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, [2, 3]) def test_revision_deletion_3_before_buckets(self): @@ -172,5 +211,5 @@ def test_revision_deletion_3_before_buckets(self): revisions = [(1, dummy_time - 365 * 24 * 3600 - 60, "/foo/bar/baz"), (2, dummy_time - 365 * 24 * 3600 - 120, "/foo/bar/baz"), (3, dummy_time - 365 * 24 * 3600 - 180, "/foo/bar/baz")] - output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) + output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, True, False) self.assertEqual(output, [2, 3])