This repository has been archived by the owner on Nov 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscan.py
executable file
·464 lines (351 loc) · 14.2 KB
/
scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
#!/usr/bin/env python
"""
Takes arguemnts from the command line and scans each dataset to extract the characteristics.
Outputs the characteristics to JSON files.
Can be re-run if errors are fixed and will only run those which failed.
"""
import json
import collections
import os
import glob
import argparse
import SETTINGS
from lib import options, utils
from lib.character import extract_character
def _get_arg_parser():
"""
Parses arguments given at the command line
:return: Namespace object built from attributes parsed from command line.
"""
parser = argparse.ArgumentParser()
project_options = options.known_projects
location_options = options.locations
parser.add_argument(
"project",
nargs=1,
type=str,
choices=project_options,
help=f'Project ID, must be one of: {project_options}'
)
parser.add_argument(
"-d",
"--dataset-ids",
nargs=1,
type=str,
default=None,
required=False,
help='List of comma-separated dataset identifiers'
)
parser.add_argument(
"-p",
"--paths",
nargs=1,
type=str,
default=None,
required=False,
help='List of comma-separated directories to search'
)
parser.add_argument(
"-f",
"--facets",
nargs=1,
type=str,
default=None,
required=False,
help='Set of facets to use, formatted as: x=hello,y=2,z=bye'
)
parser.add_argument(
"-e",
"--exclude",
nargs=1,
type=str,
default=None,
required=False,
help='Regular expressions for excluding paths from being scanned'
)
parser.add_argument(
"-m",
"--mode",
nargs=1,
type=str,
default="quick",
required=False,
help='Scanning mode: can be either quick or full. A full scan returns '
'max and min values while a quick scan excludes them. Defaults to quick.'
)
parser.add_argument(
"-l",
"--location",
nargs=1,
type=str,
default="ceda",
required=True,
choices=location_options,
help=f'Location of scan, must be one of: {location_options}'
)
return parser
def _to_list(item):
if not item: return item
return item[0].split(',')
def _to_dict(item):
if not item: return item
return dict([_.split('=') for _ in item[0].split(',')])
def parse_args():
parser = _get_arg_parser()
args = parser.parse_args()
project = args.project[0]
ds_ids = _to_list(args.dataset_ids)
paths = _to_list(args.paths)
facets = _to_dict(args.facets)
exclude = _to_list(args.exclude)
mode = args.mode[0]
location = args.location[0]
return project, ds_ids, paths, facets, exclude, mode, location
def to_json(character, output_path):
"""
Outputs the extracted characteristics to a JSON file.
If the characteristics can't be output an error file is produced.
:param character: (dict) The extracted characteristics.
:param output_path: (string) The file path at which the JSON file is produced.
:return : None
"""
# Output to JSON file
with open(output_path, 'w') as writer:
json.dump(character, writer, indent=4, sort_keys=True)
def _get_ds_paths_from_paths(paths, project):
"""
Return an OrderedDict of {<ds_id>: <ds_path>} found under the paths provided
as `paths` (a sequence of directory/file paths).
:param paths: (sequence) directory/file paths
:param project: top-level project, e.g. "cmip5", "cmip6" or "cordex" (case-insensitive)
:return: OrderedDict of {<ds_id>: <ds_path>}
"""
base_dir = options.project_base_dirs[project]
# Check paths first
bad_paths = []
for pth in paths:
if not pth.startswith(base_dir):
bad_paths.append(pth)
if bad_paths:
raise Exception(f'Invalid paths provided: {bad_paths}')
ds_paths = collections.OrderedDict()
for pth in paths:
print(f'[INFO] Searching for datasets under: {pth}')
facet_order = options.facet_rules[project]
facets_in_path = pth.replace(base_dir, '').strip('/').split('/')
facets = {}
for i, facet_name in enumerate(facet_order):
if len(facets_in_path) <= i:
break
facets[facet_name] = facets_in_path[i]
# Fix facet version if not set
if not facets.get('version'):
facets['version'] = 'latest'
facets_as_path = '/'.join([facets.get(_, '*') for _ in facet_order])
# Remove anything matching "files"
if '/files' in facets_as_path:
continue
#TODO: This is repet code of below. Suggest we create a module/class
# to manage all mapping of different args to resolve to ds_paths dictionary, later.
pattern = os.path.join(base_dir, facets_as_path)
print(f'[INFO] Finding dataset paths for pattern: {pattern}')
for ds_path in glob.glob(pattern):
dsid = utils.switch_ds(project, ds_path)
ds_paths[dsid] = ds_path
return ds_paths
def get_dataset_paths(project, ds_ids=None, paths=None, facets=None, exclude=None):
"""
Converts the input arguments into an Ordered Dictionary of {DSID: directory} items.
:param project: top-level project, e.g. "cmip5", "cmip6" or "cordex" (case-insensitive)
:param ds_ids: sequence of dataset identifiers (DSIDs), OR None.
:param paths: sequence of file paths to scan for NetCDF files under, OR None.
:param facets: dictionary of facet values to limit the search, OR None.
:param exclude: list of regular expressions to exclude in file paths, OR None.
:return: An Ordered Dictionary of {dsid: directory}
"""
base_dir = options.project_base_dirs[project]
ds_paths = collections.OrderedDict()
# If ds_ids is defined then ignore all other arguments and use this list
if ds_ids:
for dsid in ds_ids:
if not dsid: continue
ds_path = utils.switch_ds(project, dsid)
ds_paths[dsid] = ds_path
# Else use facets if they exist
elif facets:
facet_order = options.facet_rules[project]
facets_as_path = '/'.join([facets.get(_, '*') for _ in facet_order])
pattern = os.path.join(base_dir, facets_as_path)
print(f'[INFO] Finding dataset paths for pattern: {pattern}')
for ds_path in glob.glob(pattern):
dsid = utils.switch_ds(project, ds_path)
ds_paths[dsid] = ds_path
elif paths:
ds_paths = _get_ds_paths_from_paths(paths, project)
else:
raise NotImplementedError('Code currently breaks if not using "ds_ids" argument.')
return ds_paths
def scan_datasets(project, mode, location, ds_ids=None, paths=None, facets=None, exclude=None):
"""
Loops over ESGF data sets and scans them for character.
Scans multiple ESGF Datasets found for a given `project` based on a combination of:
- ds_ids: sequence of dataset identifiers (DSIDs)
- paths: sequence of file paths to scan for NetCDF files under
- facets: dictionary of facet values to limit the search
- exclude: list of regular expressions to exclude in file paths
The scanned datasets are characterised and the output is written to a JSON file
if no errors occurred.
Keeps track of whether the job was successful or not.
Produces error files if an error occurs, otherwise produces a success file.
:param project: top-level project, e.g. "cmip5", "cmip6" or "cordex" (case-insensitive)
:param ds_ids: sequence of dataset identifiers (DSIDs), OR None.
:param paths: sequence of file paths to scan for NetCDF files under, OR None.
:param facets: dictionary of facet values to limit the search, OR None.
:param exclude: list of regular expressions to exclude in file paths, OR None.
:param mode: Scanning mode: can be either quick or full. A full scan returns
max and min values while a quick scan excludes them. Default is quick.
:return: Dictionary of {"success": list of DSIDs that were successfully scanned,
"failed": list of DSIDs that failed to scan}
"""
# Keep track of failures
count = 0
failure_count = 0
# Filter arguments to get a set of file paths to DSIDs
ds_paths = get_dataset_paths(project, ds_ids=ds_ids, paths=paths, facets=facets, exclude=exclude)
for ds_id, ds_path in ds_paths.items():
scanner = scan_dataset(project, ds_id, ds_path, mode, location)
count += 1
if scanner is False:
failure_count += 1
percentage_failed = (failure_count / float(count)) * 100
print(f'[INFO] COMPLETED. Total count: {count}'
f', Failure count = {failure_count}. Percentage failed'
f' = {percentage_failed:.2f}%')
def _get_output_paths(project, ds_id):
"""
Return a dictionary of output paths to write JSON output, success and failure files to.
Make each parent directory of not already there.
:param project: top-level project.
:param ds_id: Dataset Identifier (DSID)
:return: dictionary of output paths with keys:
'success', 'json', 'no_files_error', 'extract_error', 'write_error', 'batch'
"""
grouped_ds_id = utils.get_grouped_ds_id(ds_id)
paths = {
'json': SETTINGS.JSON_OUTPUT_PATH.format(**vars()),
'no_files_error': SETTINGS.NO_FILES_PATH.format(**vars()),
'extract_error': SETTINGS.EXTRACT_ERROR_PATH.format(**vars()),
'write_error': SETTINGS.WRITE_ERROR_PATH.format(**vars()),
'batch': SETTINGS.BATCH_OUTPUT_PATH.format(**vars())
}
# Make directories if not already there
for pth in paths.values():
dr = os.path.dirname(pth)
if not os.path.isdir(dr):
os.makedirs(dr)
return paths
def analyse_facets(project, ds_id):
"""
:param project:
:param ds_id:
:return:
"""
facet_names = options.facet_rules[project]
facet_values = ds_id.split('.')
return dict(zip(facet_names, facet_values))
def is_registered(json_path):
if os.path.exists(json_path):
return True
else:
return False
def _check_for_min_max(json_path):
data = json.load(open(json_path))
mx = data["data"]["max"]
mn = data["data"]["min"]
if mx and mn:
return True
else:
return False
def scan_dataset(project, ds_id, ds_path, mode, location):
"""
Scans a set of files found under the `ds_path`.
The scanned datasets are characterised and the output is written to a JSON file
if no errors occurred.
Keeps track of whether the job was successful or not.
Produces error files if an error occurs, otherwise produces a success file.
:param project: top-level project, e.g. "cmip5", "cmip6" or "cordex" (case-insensitive)
:param ds_id: dataset identifier (DSID)
:param ds_path: directory under which to scan data files.
:param mode: Scanning mode: can be either quick or full. A full scan returns
max and min values while a quick scan excludes them. Defaults to quick.'
:return: Boolean - indicating success of failure of scan.
"""
if project not in options.known_projects:
raise Exception(f'Project must be one of known projects: {options.known_projects}')
print(f'[INFO] Scanning dataset: {ds_id}\n\t\t{ds_path} in {mode} mode ')
facets = analyse_facets(project, ds_id)
# Generate output file paths
outputs = _get_output_paths(project, ds_id)
# check json file exists
registration = is_registered(outputs["json"])
if registration:
try:
# if json file exists get mode
data = json.load(open(outputs["json"]))
mode = data["scan_metadata"]["mode"]
if mode == 'quick':
print(f'[INFO] Already ran for: {ds_id} in quick mode')
return True
if mode == 'full':
check = _check_for_min_max(outputs["json"])
if check:
print(f'[INFO] Already ran for: {ds_id} in full mode')
return True
# flag that a corrupt JSON file exists
except json.decoder.JSONDecodeError as exc:
os.remove(outputs["json"])
print(f'[INFO] Corrupt JSON file. Deleting and re-running.')
# Delete previous failure files and log files
for file_key in ('no_files_error', 'extract_error', 'write_error'):
err_file = outputs[file_key]
if os.path.exists(err_file):
os.remove(err_file)
# Get data files
nc_files = glob.glob(f'{ds_path}/*.nc')
if not nc_files:
print(f'[ERROR] No data files found for: {ds_path}/*.nc')
open(outputs['no_files_error'], 'w')
return False
# Open files with Xarray and get character
expected_facets = options.facet_rules[project]
var_id = options.get_facet('variable', facets, project)
character = extract_character(nc_files, location, var_id=var_id,
mode=mode, expected_attrs=expected_facets)
try:
character = extract_character(nc_files, location, var_id=var_id,
mode=mode, expected_attrs=expected_facets)
except Exception as exc:
print(f'[ERROR] Could not load Xarray Dataset for: {ds_path}')
print(f'[ERROR] Files: {nc_files}')
print(f'[ERROR] Exception was: {exc}')
# Create error file if can't open dataset
with open(outputs['extract_error'], 'w') as writer:
writer.write(str(exc))
return False
# Output to JSON file
try:
output = to_json(character, outputs['json'])
except Exception as exc:
print(f'[ERROR] Could not write JSON output: {outputs["json"]}')
# Create error file if can't output file
open(outputs['write_error'], 'w')
return False
print(f'[INFO] Wrote JSON file: {outputs["json"]}')
def main():
"""
Runs script if called on command line
"""
project, ds_ids, paths, facets, exclude, mode, location = parse_args()
scan_datasets(project, mode, location, ds_ids, paths, facets, exclude)
if __name__ == "__main__":
main()