-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathpolicies.py
702 lines (539 loc) · 29.1 KB
/
policies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
"""iRODS policy implementations."""
__copyright__ = 'Copyright (c) 2020-2024, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
import re
import session_vars
import datarequest
import folder
import policies_datapackage_status
import policies_datarequest_status
import policies_folder_status
import policies_intake
import replication
import revisions
import vault
from policies_utils import is_safe_genquery_inp
from util import *
# Policy check functions {{{
# These can be called from anywhere to check the authorization for a certain
# operation. A check function, prefixed with 'can_', will always return either
# policy.succeed() or policy.fail(). If the result is policy.fail(), the reason
# for the failure can be obtained in the 'reason' property of the result object.
# This can be used for reporting authorization failures to clients.
# Authorize I/O operations {{{
# Separate from ACLs, we deny certain operations on collections and data in
# research or deposit folders when paths are locked.
def can_coll_create(ctx: rule.Context, actor: str, coll: str) -> policy.Succeed | policy.Fail:
"""Disallow creating collections in locked folders."""
log.debug(ctx, 'check coll create <{}>'.format(coll))
if pathutil.info(coll).space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT]:
if folder.is_locked(ctx, pathutil.dirname(coll)) and not user.is_admin(ctx, actor):
return policy.fail('Parent folder is locked')
if pathutil.info(coll).space is pathutil.Space.INTAKE:
if policies_intake.is_coll_in_locked_dataset(ctx, user.user_and_zone(ctx), pathutil.chop(coll)[0]):
return policy.fail('Collection part of a locked dataset')
return policy.succeed()
def can_coll_delete(ctx: rule.Context, actor: str, coll: str) -> policy.Succeed | policy.Fail:
"""Disallow deleting collections in locked folders and collections containing locked folders."""
log.debug(ctx, 'check coll delete <{}>'.format(coll))
if re.match(r'^/[^/]+/home/[^/]+$', coll) and not user.is_admin(ctx, actor):
return policy.fail('Cannot delete or move collections directly under /home')
if pathutil.info(coll).space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT]:
if not user.is_admin(ctx, actor) and folder.has_locks(ctx, coll):
return policy.fail('Folder or subfolder is locked')
if pathutil.info(coll).space is pathutil.Space.INTAKE:
if policies_intake.coll_in_path_of_locked_dataset(ctx, user.user_and_zone(ctx), coll):
return policy.fail('Collection part of a locked dataset')
return policy.succeed()
def can_coll_move(ctx: rule.Context, actor: str, src: str, dst: str) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'check coll move <{}> -> <{}>'.format(src, dst))
return policy.all(can_coll_delete(ctx, actor, src),
can_coll_create(ctx, actor, dst))
def can_data_create(ctx: rule.Context, actor: str, path: str) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'check data create <{}>'.format(path))
if pathutil.info(path).space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT]:
if folder.is_locked(ctx, pathutil.dirname(path)):
# Parent coll locked?
if not user.is_admin(ctx, actor):
return policy.fail('Folder is locked')
elif folder.is_data_locked(ctx, path):
# If the parent coll is not locked, there might still be a lock on
# an existing destination data object (though this situation cannot
# arise through portal actions).
if not user.is_admin(ctx, actor):
return policy.fail('Destination is locked')
if pathutil.info(path).space is pathutil.Space.INTAKE:
if policies_intake.is_data_in_locked_dataset(ctx, user.user_and_zone(ctx), path):
return policy.fail('Data part of a locked dataset')
return policy.succeed()
def can_data_write(ctx: rule.Context, actor: str, path: str) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'check data write <{}>'.format(path))
# Disallow writing to locked objects in research and deposit folders.
if pathutil.info(path).space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT]:
if folder.is_data_locked(ctx, path) and not user.is_admin(ctx, actor):
return policy.fail('Data object is locked')
# Disallow writing to locked datasets in intake.
if pathutil.info(path).space is pathutil.Space.INTAKE:
if policies_intake.is_data_in_locked_dataset(ctx, user.user_and_zone(ctx), path):
return policy.fail('Data part of a locked dataset')
return policy.succeed()
def can_data_delete(ctx: rule.Context, actor: str, path: str) -> policy.Succeed | policy.Fail:
if re.match(r'^/[^/]+/home/[^/]+$', path) and not user.is_admin(ctx, actor):
return policy.fail('Cannot delete or move data directly under /home')
if pathutil.info(path).space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT]:
if not user.is_admin(ctx, actor) and folder.is_data_locked(ctx, path):
return policy.fail('Folder is locked')
if pathutil.info(path).space is pathutil.Space.INTAKE:
if policies_intake.is_data_in_locked_dataset(ctx, user.user_and_zone(ctx), path):
return policy.fail('Data part of a locked dataset')
return policy.succeed()
def can_data_copy(ctx: rule.Context, actor: str, src: str, dst: str) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'check data copy <{}> -> <{}>'.format(src, dst))
return can_data_create(ctx, actor, dst)
def can_data_move(ctx: rule.Context, actor: str, src: str, dst: str) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'check data move <{}> -> <{}>'.format(src, dst))
return policy.all(can_data_delete(ctx, actor, src),
can_data_create(ctx, actor, dst))
# }}}
# Hooking pre peps to the above check functions {{{
# Ideally we would use only dynamic PEPs, as they are more consistent and
# appear to be the preferred (by iRODS) way of implementing policies.
# However dynamic API PEPs, in contrast with their apparently equivalent
# static PEPs, are not triggered by MSIs and therefore cannot be relied upon.
# So we again fall back to static PEPs, and add only a few dynamic PEPs where
# they provide benefits that the static PEPs cannot provide.
#
# The actual static PEPs are currently in the rule language part of the ruleset.
# Most of them 'cut' and call identically named Python functions in this file.
@policy.require()
def py_acPreprocForCollCreate(ctx: rule.Context) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'py_acPreprocForCollCreate')
# print(jsonutil.dump(session_vars.get_map(ctx.rei)))
return can_coll_create(ctx, user.user_and_zone(ctx),
str(session_vars.get_map(ctx.rei)['collection']['name']))
@policy.require()
def py_acPreprocForRmColl(ctx: rule.Context) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'py_acPreprocForRmColl')
# print(jsonutil.dump(session_vars.get_map(ctx.rei)))
return can_coll_delete(ctx, user.user_and_zone(ctx),
str(session_vars.get_map(ctx.rei)['collection']['name']))
@policy.require()
def py_acPreprocForDataObjOpen(ctx: rule.Context) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'py_acPreprocForDataObjOpen')
# data object reads are always allowed.
# writes are blocked e.g. when the object is locked (unless actor is a rodsadmin).
if session_vars.get_map(ctx.rei)['data_object']['write_flag'] == 1:
return can_data_write(ctx, user.user_and_zone(ctx),
str(session_vars.get_map(ctx.rei)['data_object']['object_path']))
else:
return policy.succeed()
@policy.require()
def py_acDataDeletePolicy(ctx: rule.Context) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'py_acDataDeletePolicy')
return (policy.succeed()
if can_data_delete(ctx, user.user_and_zone(ctx),
str(session_vars.get_map(ctx.rei)['data_object']['object_path']))
else ctx.msiDeleteDisallowed())
@policy.require()
def py_acPreProcForObjRename(ctx: rule.Context, src: str, dst: str) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'py_acPreProcForObjRename')
# irods/lib/api/include/dataObjInpOut.h
RENAME_DATA_OBJ = 11
RENAME_COLL = 12
if session_vars.get_map(ctx.rei)['operation_type'] == RENAME_DATA_OBJ:
return can_data_move(ctx, user.user_and_zone(ctx), src, dst)
elif session_vars.get_map(ctx.rei)['operation_type'] == RENAME_COLL:
return can_coll_move(ctx, user.user_and_zone(ctx), src, dst)
return policy.succeed()
@policy.require()
def py_acPostProcForPut(ctx: rule.Context) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'py_acPostProcForPut')
# Data object creation cannot be prevented by API dynpeps and static PEPs,
# due to how MSIs work. Thus, this ugly workaround specifically for MSIs.
path = str(session_vars.get_map(ctx.rei)['data_object']['object_path'])
x = can_data_create(ctx, user.user_and_zone(ctx), path)
if not x:
data_object.remove(ctx, path, force=True)
return x
@policy.require()
def py_acPostProcForCopy(ctx: rule.Context) -> policy.Succeed | policy.Fail:
# See py_acPostProcForPut.
log.debug(ctx, 'py_acPostProcForCopy')
path = str(session_vars.get_map(ctx.rei)['data_object']['object_path'])
x = can_data_create(ctx, user.user_and_zone(ctx), path)
if not x:
data_object.remove(ctx, path, force=True)
return x
# Disabled: caught by acPreprocForCollCreate
# @policy.require()
# def pep_api_coll_create_pre(ctx, instance_name, rs_comm, coll_create_inp):
# log.debug(ctx, 'pep_api_coll_create_pre')
# return can_coll_create(ctx, user.user_and_zone(ctx),
# str(coll_create_inp.collName))
# Disabled: caught by acPreprocForRmColl
# @policy.require()
# def pep_api_rm_coll_pre(ctx, instance_name, rs_comm, rm_coll_inp, coll_opr_stat):
# log.debug(ctx, 'pep_api_rm_coll_pre')
# return can_coll_delete(ctx, user.user_and_zone(ctx),
# str(rm_coll_inp.collName))
# Disabled: caught by acPostProcForPut
# @policy.require()
# def pep_api_data_obj_put_pre(ctx, instance_name, rs_comm, data_obj_inp, data_obj_inp_bbuf, portal_opr_out):
# # Matches data object creation/overwrite via iput.
# log.debug(ctx, 'pep_api_data_obj_put_pre')
# return can_data_create(ctx, user.user_and_zone(ctx),
# str(data_obj_inp.objPath))
@policy.require()
def pep_api_data_obj_create_pre(ctx: rule.Context,
instance_name: str,
rs_comm: object,
data_obj_inp: object) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'pep_api_data_obj_create_pre')
# Catch object creation/overwrite via Davrods and PRC.
# This should also catch object creation by any other client that isn't so
# nice as to set the "PUT_OPR" flag, and thereby bypasses the static PUT postproc above.
# Note that this should only be needed for create actions, not open in general:
# for overwriting there is still a PRE static PEP that applies - acPreprocForDataObjOpen.
return can_data_create(ctx, user.user_and_zone(ctx),
str(data_obj_inp.objPath))
@policy.require()
def pep_api_data_obj_create_and_stat_pre(ctx: rule.Context,
instance_name: str,
rs_comm: object,
data_obj_inp: object,
open_stat: object) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'pep_api_data_obj_create_and_stat_pre')
# Not triggered by any of our clients currently, but needed for completeness.
return can_data_create(ctx, user.user_and_zone(ctx),
str(data_obj_inp.objPath))
# Disabled: caught by acPostProcForCopy
# @policy.require()
# def pep_api_data_obj_copy_pre(ctx, instance_name, rs_comm, data_obj_copy_inp, trans_stat):
# log.debug(ctx, 'pep_api_data_obj_copy_pre')
# return can_data_create(ctx, user.user_and_zone(ctx),
# str(data_obj_copy_inp.destDataObjInp.objPath))
# Disabled: caught by acPreProcForObjRename
# @policy.require()
# def pep_api_data_obj_rename_pre(ctx, instance_name, rs_comm, data_obj_rename_inp):
# log.debug(ctx, 'pep_api_data_obj_rename_pre')
# # API name says data_obj, but it applies to both data and collections
# # depending on 'oprType'.
# # irods/lib/api/include/dataObjInpOut.h
# RENAME_DATA_OBJ = 11
# RENAME_COLL = 12
# if data_obj_rename_inp.srcDataObjInp.oprType == RENAME_DATA_OBJ:
# return can_data_move(ctx, user.user_and_zone(ctx),
# str(data_obj_rename_inp.srcDataObjInp.objPath),
# str(data_obj_rename_inp.destDataObjInp.objPath))
# elif data_obj_rename_inp.srcDataObjInp.oprType == RENAME_COLL:
# return can_coll_move(ctx, user.user_and_zone(ctx),
# str(data_obj_rename_inp.srcDataObjInp.objPath),
# str(data_obj_rename_inp.destDataObjInp.objPath))
@policy.require()
def pep_api_data_obj_trim_pre(ctx: rule.Context,
instance_name: str,
rs_comm: object,
data_obj_inp: object) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'pep_api_data_obj_trim_pre')
return can_data_write(ctx, user.user_and_zone(ctx),
str(data_obj_inp.objPath))
@policy.require()
def pep_api_data_obj_truncate_pre(ctx: rule.Context,
instance_name: str,
rs_comm: object,
data_obj_truncate_inp: object) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'pep_api_data_obj_truncate_pre')
return can_data_write(ctx, user.user_and_zone(ctx),
str(data_obj_truncate_inp.objPath))
# Disabled: caught by acDataDeletePolicy
# @policy.require()
# def pep_api_data_obj_unlink_pre(ctx, instance_name, rs_comm, data_obj_unlink_inp):
# log.debug(ctx, 'pep_api_data_obj_unlink_pre')
# return can_data_delete(ctx, user.user_and_zone(ctx),
# str(data_obj_unlink_inp.objPath))
# }}}
# Authorize metadata operations {{{
# Disabled: caught by py_acPreProcForModifyAVUMetadata
# @policy.require()
# def pep_api_mod_avu_metadata_pre(ctx, instance_name, rs_comm, mod_avumetadata_inp):
# log.debug(ctx, 'pep_api_mod_avu_metadata_pre')
# return can_meta_modify(ctx, actor, AvuOpr(mod_avumetadata_inp))
# Policy for most AVU changes
@policy.require()
def py_acPreProcForModifyAVUMetadata(ctx: rule.Context,
option: str,
obj_type: str,
obj_name: str,
attr: str,
value: str,
unit: str) -> policy.Succeed | policy.Fail:
actor = user.user_and_zone(ctx)
if obj_type not in ['-d', '-C']:
# Metadata policies below only apply to collections.
return policy.succeed()
space = pathutil.info(obj_name).space
if space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT] and attr == constants.IISTATUSATTRNAME:
# Research or deposit folder status change. Validate.
if unit != '':
return policy.fail('Invalid status attribute')
if option not in ['set', 'rm', 'rmw']:
# "add" has no meaning on the status attribute, as there must
# always be either 0 or 1 instance of this attr.
return policy.fail('Only "set" and "rm" operations allowed on folder status attribute')
if option in ['rm', 'rmw']:
option, value = 'set', ''
x = policies_folder_status.can_set_folder_status_attr(ctx, actor, obj_name, value)
if not x:
return x
return policies_folder_status.pre_status_transition(ctx, obj_name, x[0], x[1])
elif (space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT]
and attr in [constants.UUORGMETADATAPREFIX + "revision_scheduled",
constants.UUORGMETADATAPREFIX + "replication_scheduled"]):
# Research or deposit organizational metadata.
if user.is_admin(ctx, actor):
return policy.succeed()
if option in ['add']:
return policy.succeed()
else:
return policy.fail('Only "add" operations allowed on attribute')
elif space is pathutil.Space.VAULT and attr == constants.IIVAULTSTATUSATTRNAME:
if not user.is_admin(ctx, actor):
return policy.fail('No permission to change vault status')
x = policies_datapackage_status.can_set_datapackage_status_attr(ctx, actor, obj_name, value)
if not x:
return x
return policies_datapackage_status.pre_status_transition(ctx, obj_name, x[0], x[1])
elif obj_type == '-C' and space is pathutil.Space.RESEARCH and unit.startswith(constants.UUUSERMETADATAROOT + '_'):
# Research package metadata, set when saving the metadata form.
# Allow if object is not locked.
if (not folder.is_locked(ctx, obj_name)) or user.is_admin(ctx, actor):
return policy.succeed()
else:
return policy.fail('Folder is locked')
elif space is pathutil.Space.DATAREQUEST and attr == datarequest.DATAREQUESTSTATUSATTRNAME:
# Check if user is permitted to change the status
if not user.is_admin(ctx, actor):
return policy.fail('No permission to change datarequest status')
# Datarequest status change. Validate.
return policies_datarequest_status.can_set_datarequest_status(ctx, obj_name, value)
else:
# Allow metadata operations in general if they do not affect reserved
# attributes.
return policy.succeed()
# imeta mod
@policy.require()
def py_acPreProcForModifyAVUMetadata_mod(ctx: rule.Context,
option: str,
obj_type: str,
obj_name: str,
a_attr: str,
a_value: str,
a_unit: str,
b_name: str,
b_value: str,
b_unit: str) -> policy.Succeed | policy.Fail:
actor = user.user_and_zone(ctx)
if user.is_admin(ctx, actor):
return policy.succeed()
if obj_type not in ['-d', '-C']:
return policy.succeed()
if pathutil.info(obj_name).space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT, pathutil.Space.VAULT]:
return policy.fail('Metadata mod not allowed')
return policy.succeed()
# imeta cp
@policy.require()
def py_acPreProcForModifyAVUMetadata_cp(ctx: rule.Context, option: str, t_src: str, t_dst: str, src: str, dst: str) -> policy.Succeed | policy.Fail:
actor = user.user_and_zone(ctx)
if user.is_admin(ctx, actor):
return policy.succeed()
if t_dst not in ['-d', '-C']:
return policy.succeed()
if pathutil.info(dst).space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT, pathutil.Space.VAULT]:
# Prevent imeta cp. Previously this was blocked by a buggy policy.
# We now block this explicitly in research & vault because the action is
# too difficult to reliably validate w.r.t. for example folder status transitions,
# and no rules make use of this code path.
return policy.fail('Metadata copy not allowed')
return policy.succeed()
# This PEP is called after an AVU is added (option = 'add'), set (option =
# 'set') or removed (option = 'rm') in the research area or the vault. Post
# conditions defined in folder.py and iiVaultTransitions.r
# are called here.
@rule.make()
def py_acPostProcForModifyAVUMetadata(ctx: rule.Context,
option: str,
obj_type: str,
obj_name: str,
attr: str,
value: str,
unit: str) -> None:
info = pathutil.info(obj_name)
if attr == constants.IISTATUSATTRNAME and info.space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT]:
status = constants.research_package_state.FOLDER.value if option in ['rm', 'rmw'] else value
policies_folder_status.post_status_transition(ctx, obj_name, str(user.user_and_zone(ctx)), status)
elif info.space is pathutil.Space.VAULT:
if attr == constants.IIVAULTSTATUSATTRNAME:
policies_datapackage_status.post_status_transition(ctx, obj_name, str(user.user_and_zone(ctx)), value)
elif attr.startswith(constants.UUORGMETADATAPREFIX) and attr != constants.IIARCHIVEATTRNAME:
vault.update_archive(ctx, obj_name, attr)
# Send emails after datarequest status transition if appropriate
elif attr == datarequest.DATAREQUESTSTATUSATTRNAME and info.space is pathutil.Space.DATAREQUEST:
policies_datarequest_status.post_status_transition(ctx, obj_name, value)
# }}}
# Authorize access control operations {{{
# ichmod
@policy.require()
def pep_api_mod_access_control_pre(ctx: rule.Context,
instance_name: str,
rs_comm: object,
mod_access_control_inp: object) -> policy.Succeed | policy.Fail:
log.debug(ctx, 'pep_api_mod_access_control_pre')
actor = user.user_and_zone(ctx)
if user.is_admin(ctx, actor):
return policy.succeed()
path = str(mod_access_control_inp.path)
if pathutil.info(path).space in [pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT]:
# Prevent ichmod in research and deposit space by normal users.
return policy.fail('Mod access control not allowed')
return policy.succeed()
# }}}
# ExecCmd {{{
@policy.require()
def py_acPreProcForExecCmd(ctx: rule.Context,
cmd: str,
args: str,
addr: str,
hint: str) -> policy.Succeed | policy.Fail:
actor = user.user_and_zone(ctx)
# No restrictions for rodsadmin and priv group.
if user.is_admin(ctx, actor):
return policy.succeed()
if user.is_member_of(ctx, 'priv-execcmd-all', actor):
return policy.succeed()
if not (hint == addr == ''):
return policy.fail('Disallowed hint/addr in execcmd')
# Allow scheduled admin scripts.
if cmd.startswith('admin-scheduled-'):
return policy.succeed()
# Allow 'admin-*' scripts, if first arg is the actor username&zone.
if cmd.startswith('admin-'):
if args == str(actor) or args.startswith(str(actor) + ' '):
return policy.succeed()
else:
return policy.fail('Actor not given as first arg to admin- execcmd')
return policy.fail('No execcmd privileges for this command')
# Internal function to determine whether changes to data objects on a particular
# resource need to trigger policies (e.g. asynchronous replication) by default.
def resource_should_trigger_policies(resource: str) -> bool:
if resource in config.resource_primary:
return True
if resource in config.resource_vault:
return True
for pattern in config.resource_trigger_pol:
if re.match(pattern, resource):
return True
return False
@rule.make()
def pep_resource_modified_post(ctx: rule.Context,
instance_name: str,
_ctx: rule.Context,
out: str) -> None:
if not resource_should_trigger_policies(instance_name):
return
path = _ctx.map()['logical_path']
zone = _ctx.map()['user_rods_zone']
username = _ctx.map()['user_user_name']
info = pathutil.info(path)
for resource in config.resource_replica:
replication.replicate_asynchronously(ctx, path, instance_name, resource)
try:
# Import metadata if a metadata JSON file was changed.
# Example matches:
# "/tempZone/home/research-any/possible/path/to/yoda-metadata.json"
# "/tempZone/home/deposit-any/deposit[123]/yoda-metadata.json"
# "/tempZone/home/vault-any/possible/path/to/yoda-metadata[123][1].json"
# "/tempZone/home/datamanager-category/vault-path/to/yoda-metadata.json"
if ((info.space in (pathutil.Space.RESEARCH, pathutil.Space.DEPOSIT, pathutil.Space.DATAMANAGER)
and pathutil.basename(info.subpath) == constants.IIJSONMETADATA)
or (info.space is pathutil.Space.VAULT
# Vault jsons have a [timestamp] in the file name.
and re.match(r'{}\[[^/]+\]\.{}$'.format(*map(re.escape, pathutil.chopext(constants.IIJSONMETADATA))),
pathutil.basename(info.subpath)))):
# Path is a metadata file, ingest.
log.write(ctx, 'metadata JSON <{}> modified by {}, ingesting'.format(path, username))
ctx.rule_meta_modified_post(path, username, zone)
elif (info.space is pathutil.Space.DATAREQUEST
and pathutil.basename(info.subpath) == datarequest.DATAREQUEST + datarequest.JSON_EXT):
request_id = pathutil.dirname(info.subpath)
log.write(ctx, 'datarequest JSON <{}> modified by {}, ingesting'.format(path, username))
datarequest.datarequest_sync_avus(ctx, request_id)
except Exception as e:
# The rules on metadata are run synchronously and could fail.
# Log errors, but continue with revisions.
log.write(ctx, 'rule_meta_modified_post failed: ' + str(e))
# ctx.uuResourceModifiedPostRevision(instance_name, zone, path)
revisions.resource_modified_post_revision(ctx, instance_name, zone, path)
@rule.make()
def py_acPostProcForObjRename(ctx: rule.Context, src: str, dst: str) -> None:
# Update ACLs to give correct group ownership when an object is moved into
# a different research- or grp- collection.
info = pathutil.info(dst)
if info.space is pathutil.Space.RESEARCH or info.group.startswith(constants.IIGRPPREFIX):
if len(info.subpath) and info.group != pathutil.info(src).group:
ctx.uuEnforceGroupAcl(dst)
@rule.make(inputs=[0, 1, 2, 3, 4, 5, 6], outputs=[2])
def pep_resource_resolve_hierarchy_pre(ctx: rule.Context,
resource: str,
_ctx: rule.Context,
out: str,
operation: str,
host: str,
parser: str,
vote: str) -> str | None:
if not config.arb_enabled or operation != "CREATE":
return None
arb_data = arb_data_manager.ARBDataManager()
arb_status = arb_data.get(ctx, resource)
if arb_status == constants.arb_status.FULL:
return "read=1.0;write=0.0"
else:
return "read=1.0;write=1.0"
@rule.make(inputs=[0], outputs=[1])
def rule_check_anonymous_access_allowed(ctx: rule.Context, address: str) -> str:
"""Check if access to the anonymous account is allowed from a particular network
address. Non-local access to the anonymous account should only be allowed from
DavRODS servers, for security reasons.
:param ctx: Combined type of a callback and rei struct
:param address: Network address to check
:returns: 'true' if access from this network address is allowed; otherwise 'false'
"""
permit_list = ["127.0.0.1"] + config.remote_anonymous_access
return "true" if address in permit_list else "false"
@rule.make(inputs=[], outputs=[0])
def rule_check_max_connections_exceeded(ctx: rule.Context) -> str:
"""Check if user exceeds the maximum number of connections.
:param ctx: Combined type of a callback and rei struct
:returns: 'true' if maximum number of connections is exceeded; otherwise 'false'.
Also returns 'false' if the maximum number of connections check has been
disabled, or if the maximum number does not apply to the present user.
"""
if not config.user_max_connections_enabled:
return "false"
elif user.name(ctx) in ['anonymous', 'rods']:
return "false"
else:
connections = user.number_of_connections(ctx)
return "false" if connections <= config.user_max_connections_number else "true"
@rule.make(inputs=[0, 1, 2, 3, 4], outputs=[])
def pep_database_gen_query_pre(ctx: rule.Context,
dbtype: str,
_ctx: rule.Context,
results: str,
genquery_inp: object,
genquery_out: object) -> None:
if not is_safe_genquery_inp(genquery_inp):
# We can't use log here, because the REI is not (always) available.
print("Refused unsafe query: " + str(genquery_inp))
ctx.msiOprDisallowed()
# }}}
# }}}