-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathrevision_utils.py
274 lines (213 loc) · 12.1 KB
/
revision_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
"""Utility functions for revision management."""
__copyright__ = 'Copyright (c) 2019-2024, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
import datetime
import hashlib
import os
from typing import List, Tuple
from revision_strategies import get_revision_strategy, RevisionStrategy
from util import constants, log, pathutil, rule
def revision_eligible(max_size: int, data_obj_exists: bool, size: int, path: str, groups: List, revision_store_exists: bool) -> Tuple[bool, str]:
"""Determine whether can create a revision of given data object.
:param max_size: Max size that file can be to create a revision (in bytes)
:param data_obj_exists: Whether the data object exists
:param size: Size of the data object
:param path: Path to the given data object (for logging)
:param groups: List of groups retrieved for this data object
:param revision_store_exists: Whether revision store for this group exists
:returns: 2-tuple containing True / False whether a revision should be created,
and the message (if this is a error condition)
"""
if not data_obj_exists:
return False, "Data object <{}> was not found or path was collection".format(path)
if len(groups) == 0:
return False, "Cannot find owner of data object <{}>. It may have been removed. Skipping.".format(path)
if len(groups) > 1:
return False, "Cannot find unique owner of data object <{}>. Skipping.".format(path)
if not revision_store_exists:
return False, "Revision store collection does not exist for data object <{}>".format(path)
_, zone, _, _ = pathutil.info(path)
# A revision should not be created when the data object is too big,
# but this is not an error condition
if int(size) > max_size:
return False, ""
# Only create revisions for research space
if not path.startswith("/{}/home/{}".format(zone, constants.IIGROUPPREFIX)):
return False, ""
if pathutil.basename(path) in constants.UUBLOCKLIST:
return False, ""
return True, ""
def calculate_end_of_calendar_day() -> int:
"""Calculate the unix timestamp for the end of the current day (Same as start of next day).
:returns: End of calendar day - Timestamp of the end of the current day
"""
# Get datetime of tomorrow.
tomorrow = datetime.date.today() + datetime.timedelta(1)
# Convert tomorrow to unix timestamp.
return int(tomorrow.strftime("%s"))
def get_revision_store_path(zone: str, trailing_slash: bool = False) -> str:
"""Produces the logical path of the revision store
:param zone: zone name
:param trailing_slash: Add a trailing slash (default: False)
:returns: Logical path of revision store
"""
if trailing_slash:
return os.path.join("/" + zone, constants.UUREVISIONCOLLECTION.lstrip(os.path.sep), '')
else:
return os.path.join("/" + zone, constants.UUREVISIONCOLLECTION.lstrip(os.path.sep))
def get_deletion_candidates(ctx: 'rule.Context',
revision_strategy: RevisionStrategy,
revisions: List,
initial_upper_time_bound: bool,
original_exists: bool,
verbose: bool) -> List:
"""Get revision data objects for a particular versioned data object that should be deleted, as per
a given revision strategy.
:param ctx: Combined type of a callback and rei struct
:param revision_strategy: Revision strategy object
:param revisions: List of revisions for a particular data object. Each revision is represented by a 3-tuple
(revision ID, modification time in epoch time, original path)
:param initial_upper_time_bound: Initial upper time bound for first bucket
:param original_exists: Boolean value that indicates whether the original versioned data object still exists
:param verbose: Whether to print additional information for troubleshooting (boolean)
:returns: List of candidates for deletion based on the specified revision strategy
"""
if not original_exists:
if verbose:
for revision in revisions:
log.write(ctx, 'Scheduling revision <{}> for removal. Original no longer exists.'.format(
revision[2]))
return [revision[0] for revision in revisions]
buckets = revision_strategy.get_buckets()
deletion_candidates = []
# Set initial upper bound
t2 = initial_upper_time_bound
# List of bucket index with per bucket a list of its revisions within that bucket
# [[data_ids0],[data_ids1]]
bucket_revisions = []
non_bucket_revisions = []
revision_found_in_bucket = False
# Sort revisions by bucket
for bucket in buckets:
t1 = t2
t2 = t1 - bucket[0]
revision_list = []
for revision in revisions:
if revision[1] <= t1 and revision[1] > t2:
# Link the bucket and the revision together so its clear which revisions belong into which bucket
revision_found_in_bucket = True
revision_list.append(revision[0]) # append data-id
# Link the collected data_ids (revision_ids) to the corresponding bucket
bucket_revisions.append(revision_list)
# Get revisions that predate all buckets
for revision in revisions:
if revision[1] < t2:
non_bucket_revisions.append(revision[0])
# Per bucket find the revision candidates for deletion
bucket_counter = 0
for rev_list in bucket_revisions:
bucket = buckets[bucket_counter]
max_bucket_size = bucket[1]
bucket_start_index = bucket[2]
if len(rev_list) > max_bucket_size:
nr_to_be_removed = len(rev_list) - max_bucket_size
count = 0
if bucket_start_index >= 0:
while count < nr_to_be_removed:
# Add revision to list of removal
index = bucket_start_index + count
if verbose:
log.write(ctx, 'Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index),
str(bucket)))
deletion_candidates.append(rev_list[index])
count += 1
else:
while count < nr_to_be_removed:
index = len(rev_list) + (bucket_start_index) - count
if verbose:
log.write(ctx, 'Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index),
str(bucket)))
deletion_candidates.append(rev_list[index])
count += 1
bucket_counter += 1 # To keep conciding with strategy list
# If there are revisions in any bucket, remove all revisions before defined buckets. If there are
# no revisions in buckets, remove all revisions before defined buckets except the last one.
if len(non_bucket_revisions) > 1 or (len(non_bucket_revisions) == 1 and revision_found_in_bucket):
nr_to_be_removed = len(non_bucket_revisions) - (0 if revision_found_in_bucket else 1)
count = 0
while count < nr_to_be_removed:
index = count + (0 if revision_found_in_bucket else 1)
if verbose:
log.write(ctx, 'Scheduling revision <{}> (older than buckets) for removal.'.format(str(index)))
deletion_candidates.append(non_bucket_revisions[index])
count += 1
return deletion_candidates
def revision_cleanup_prefilter(ctx: 'rule.Context',
revisions_list: List,
revision_strategy_name: str,
original_exists_dict: bool,
verbose: bool) -> List:
"""Filters out revisioned data objects from a list if we can easily determine that they don't meet criteria for being removed,
for example if the number of revisions of an existing versioned data object is at most one.
This prefilter is performed in the scan phase. A full check of the remaining versioned data objects will be performed in the
processing phase.
The purpose of this function is to filter out revisions that obviously don't need further processing, so as to make the cleanup
process more efficient.
:param ctx: Combined type of a callback and rei struct
:param revisions_list: List of versioned data objects. Each versioned data object is represented as a list of revisions,
with each revision represented as a 3-tuple (revision ID, modification time in epoch time, original
path)
:param revision_strategy_name: Select a revision strategy based on a string ('A', 'B', 'Simple'). See
https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md
for an explanation.
:param original_exists_dict: Dictionary where keys are paths of versioned data objects. Values are booleans that indicate whether
the original data object still exists
:param verbose: Whether to print verbose information for troubleshooting (boolean)
:returns: List of versioned data objects, after prefiltered versioned data objects / revisions have been
removed. Each versioned data object is represented as a list of revisions,
with each revision represented as a 3-tuple (revision ID, modification time in epoch time, original
path)
"""
minimum_bucket_size = get_revision_strategy(revision_strategy_name).get_minimum_bucket_size()
results = []
for object in revisions_list:
if len(object) > 0:
if original_exists_dict[object[0][2]]:
# If the versioned data object still exists, we can
# remove it in the prefilter stage if it has only a single
# revision, assuming that the size of the smallest bucket is
# at least 1.
if len(object) > min(minimum_bucket_size, 1):
results.append(object)
else:
# Revisions of versioned data objects that do not exist
# anymore should never be removed in the prefilter stage,
# since revisions should always be removed if their versioned
# data object no longer exists.
results.append(object)
if verbose:
log.write(ctx, "Remaining revisions for cleanup after prefiltering: " + str(results))
return results
def get_resc(row: List) -> str:
"""Get the resc id for a data object given the metadata provided (for revision job).
:param row: metadata for the data object
:returns: resc
"""
info = row[3].split(',')
if len(info) == 2:
return info[0]
# Backwards compatibility with revision metadata created in v1.8 or earlier.
return row[3]
def get_balance_id(row: List, path: str) -> int:
"""Get the balance id for a data object given the metadata provided (for revision job).
:param row: metadata for the data object
:param path: path to the data object
:returns: Balance id
"""
info = row[3].split(',')
if len(info) == 2:
return int(info[1])
# Backwards compatibility with revision metadata created in v1.8 or earlier.
# Determine a balance_id for this dataobject based on its path.
# This will determine whether this dataobject will be taken into account in this job/range or another that is running parallel
return int(hashlib.md5(path.encode('utf-8')).hexdigest(), 16) % 64 + 1