-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathpermissions.py
343 lines (276 loc) · 15.1 KB
/
permissions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
from functools import wraps
from flask import g
from flask import request
from werkzeug.exceptions import BadRequest
from marshmallow import INCLUDE
from webargs import fields as webargs_fields, flaskparser
from dataactcore.interfaces.db import GlobalDB
from dataactcore.models.domainModels import SubTierAgency
from dataactcore.models.jobModels import Submission
from dataactcore.models.lookups import (ALL_PERMISSION_TYPES_DICT, PERMISSION_SHORT_DICT, DABS_PERMISSION_ID_LIST,
FABS_PERMISSION_ID_LIST)
from dataactcore.utils.jsonResponse import JsonResponse
from dataactcore.utils.ResponseError import ResponseError
from dataactcore.utils.statusCode import StatusCode
from dataactcore.utils.requestDictionary import RequestDictionary
NOT_AUTHORIZED_MSG = "You are not authorized to perform the requested task. Please contact your administrator."
DABS_PERMS = [PERMISSION_SHORT_DICT['w'], PERMISSION_SHORT_DICT['s']]
FABS_PERM = PERMISSION_SHORT_DICT['f']
webargs_parser = flaskparser.FlaskParser(unknown=INCLUDE)
def requires_login(func):
""" Decorator requiring that a user be logged in (i.e. that we're not using an anonymous session)
Args:
func: the function that this wrapper is wrapped around
Returns:
LOGIN_REQUIRED JSONResponse object if the user doesn't exist, otherwise it runs the wrapped function
"""
@wraps(func)
def inner(*args, **kwargs):
if g.user is None:
return JsonResponse.create(StatusCode.LOGIN_REQUIRED, {'message': "Login Required"})
return func(*args, **kwargs)
return inner
def requires_admin(func):
""" Decorator requiring the requesting user be a website admin
Args:
func: the function that this wrapper is wrapped around
Returns:
LOGIN_REQUIRED JSONResponse object if the user doesn't exist or is not an admin user, otherwise it runs the
wrapped function
"""
@wraps(func)
def inner(*args, **kwargs):
if g.user is None:
return JsonResponse.create(StatusCode.LOGIN_REQUIRED, {'message': "Login Required"})
if not g.user.website_admin:
return JsonResponse.create(StatusCode.LOGIN_REQUIRED, {'message': NOT_AUTHORIZED_MSG})
return func(*args, **kwargs)
return inner
def active_user_can(permission, cgac_code=None, frec_code=None):
""" Validate whether the active user can perform the act (described by the permission level) for the given
cgac_code or frec_code
Args:
permission: single-letter string representing an application permission_type
cgac_code: 3-digit numerical string identifying a CGAC agency
frec_code: 4-digit numerical string identifying a FREC agency
Returns:
Boolean result on whether the user has permissions greater than or equal to permission
"""
# If the user is not logged in, or the user is a website admin, there is no reason to check their permissions
if not hasattr(g, 'user'):
return False
if g.user.website_admin:
return True
# Ensure the permission exists and retrieve its ID and type
try:
permission_id = ALL_PERMISSION_TYPES_DICT[permission]
except KeyError:
return False
permission_list = FABS_PERMISSION_ID_LIST if permission_id in FABS_PERMISSION_ID_LIST else DABS_PERMISSION_ID_LIST
# Loop through user's affiliations and return True if any match the permission
for aff in g.user.affiliations:
# Check if affiliation agency matches agency args
if (aff.cgac and aff.cgac.cgac_code == cgac_code) or (aff.frec and aff.frec.frec_code == frec_code):
# Check if affiliation has higher permissions than permission args
aff_perm_id = aff.permission_type_id
# We can check for reader overall regardless of FABS or DABS or permission level because DABS permissions
# give read access to FABS submissions so it should pass as long as there are any permissions for the agency
if (permission == 'reader') or (aff_perm_id in permission_list and aff_perm_id >= permission_id):
return True
return False
def active_user_can_on_submission(perm, submission, check_owner=True):
""" Submissions add another permission possibility: if a user created a submission, they can do anything to it,
regardless of submission agency
Args:
perm: string PermissionType value
submission: Submission object
check_owner: allows the functionality if the user is the owner of the Submission; default True
Returns:
Boolean result on whether the user has permissions greater than or equal to perm
"""
is_owner = hasattr(g, 'user') and submission.user_id == g.user.user_id
user_can = active_user_can(perm, cgac_code=submission.cgac_code, frec_code=submission.frec_code)
return (is_owner and check_owner) or user_can
def requires_submission_perms(perm, check_owner=True, check_fabs=None):
""" Decorator that checks the current user's permissions and validates that the submission exists. It expects a
submission_id parameter on top of the function arguments.
Args:
perm: the type of permission we are checking for
check_owner: a boolean indicating if we should check whether the user is the owner of the submission
check_fabs: FABS permission to check if the Submission is FABS; default None
Returns:
A submission object obtained using the submission_id provided (along with the other args/kwargs that were
initially provided)
Raises:
ResponseError: If the user doesn't have permission to access the submission at the level requested
or the submission doesn't exist.
"""
def inner(fn):
@requires_login
@wraps(fn)
def wrapped(submission_id, *args, **kwargs):
sess = GlobalDB.db().session
submission = sess.query(Submission).filter_by(submission_id=submission_id).one_or_none()
if submission is None:
# @todo - why don't we use 404s?
raise ResponseError('No such submission', StatusCode.CLIENT_ERROR)
permission = check_fabs if check_fabs and submission.is_fabs else perm
if not active_user_can_on_submission(permission, submission, check_owner):
raise ResponseError("User does not have permission to access that submission",
StatusCode.PERMISSION_DENIED)
return fn(submission, *args, **kwargs)
return wrapped
return inner
def requires_agency_perms(perm):
""" Decorator that checks the current user's permissions and validates them against the agency code. It expects an
existing_submission_id, cgac_code, or frec_code parameter on top of the function arguments.
Args:
perm: the type of permission we are checking for
Returns:
The args/kwargs that were initially provided
Raises:
ResponseError: If the user doesn't have permission to access the submission at the level requested
or no valid agency code was provided.
"""
def inner(fn):
@requires_login
@wraps(fn)
def wrapped(*args, **kwargs):
req_args = webargs_parser.parse({
'existing_submission_id': webargs_fields.Int(load_default=None),
'cgac_code': webargs_fields.String(load_default=None),
'frec_code': webargs_fields.String(load_default=None)
}, request, location='form')
# Ensure there is either an existing_submission_id, a cgac_code, or a frec_code
if req_args['existing_submission_id'] is None and req_args['cgac_code'] is None and \
req_args['frec_code'] is None:
raise ResponseError('Missing required parameter: cgac_code, frec_code, or existing_submission_id',
StatusCode.CLIENT_ERROR)
# Use codes based on existing Submission if existing_submission_id is provided, otherwise use CGAC or FREC
if req_args['existing_submission_id'] is not None:
check_existing_submission_perms(perm, req_args['existing_submission_id'])
else:
# Check permissions for the agency
if not active_user_can(perm, cgac_code=req_args['cgac_code'], frec_code=req_args['frec_code']):
raise ResponseError("User does not have permissions to write to that agency",
StatusCode.PERMISSION_DENIED)
return fn(*args, **kwargs)
return wrapped
return inner
def requires_agency_code_perms(perm):
""" Decorator that checks the current user's permissions and validates them against the agency code. It expects an
agency_code parameter on top of the function arguments.
Args:
perm: the type of permission we are checking for
Returns:
The args/kwargs that were initially provided
Raises:
ResponseError: If the user doesn't have permission to access the submission at the level requested
or no valid agency code was provided.
"""
def inner(fn):
@requires_login
@wraps(fn)
def wrapped(*args, **kwargs):
for location in ['query', 'json']:
req_args = webargs_parser.parse({
'agency_code': webargs_fields.String(load_default=None)
}, request, location=location)
agency_code = req_args.get('agency_code', None)
if agency_code is not None:
break
# Ensure there is an agency_code
if agency_code is None:
raise ResponseError('Missing required parameter: agency_code', StatusCode.CLIENT_ERROR)
# Check permissions for the agency
if not active_user_can(perm, cgac_code=agency_code, frec_code=agency_code):
raise ResponseError("User does not have permissions for that agency", StatusCode.PERMISSION_DENIED)
return fn(*args, **kwargs)
return wrapped
return inner
def requires_sub_agency_perms(perm):
""" Decorator that checks the current user's permissions and validates them against the agency code. It expects an
agency_code parameter on top of the function arguments.
Args:
perm: the type of permission we are checking for
Returns:
The args/kwargs that were initially provided
Raises:
ResponseError: If the user doesn't have permission to access the submission at the level requested
or no valid agency code was provided.
"""
def inner(fn):
@requires_login
@wraps(fn)
def wrapped(*args, **kwargs):
sess = GlobalDB.db().session
try:
req_args = {
'agency_code': RequestDictionary.derive(request).get('agency_code', None),
'existing_submission_id': RequestDictionary.derive(request).get('existing_submission_id', None)
}
except (ValueError, TypeError) as e:
raise ResponseError(e, StatusCode.CLIENT_ERROR)
except BadRequest:
raise ResponseError('Bad request: agency_code or existing_submission_id not included properly',
StatusCode.CLIENT_ERROR)
if req_args['agency_code'] is None and req_args['existing_submission_id'] is None:
raise ResponseError('Missing required parameter: agency_code or existing_submission_id',
StatusCode.CLIENT_ERROR)
if not isinstance(req_args['agency_code'], str) and not isinstance(req_args['existing_submission_id'], str):
raise ResponseError('Bad request: agency_code or existing_submission_id'
+ 'required and must be strings', StatusCode.CLIENT_ERROR)
if req_args['existing_submission_id'] is not None:
check_existing_submission_perms(perm, req_args['existing_submission_id'])
else:
sub_tier_agency = sess.query(SubTierAgency).\
filter(SubTierAgency.sub_tier_agency_code == req_args['agency_code']).one_or_none()
if sub_tier_agency is None:
raise ResponseError('sub_tier_agency must be a valid sub_tier_agency_code',
StatusCode.CLIENT_ERROR)
cgac_code = sub_tier_agency.cgac.cgac_code if sub_tier_agency.cgac_id else None
frec_code = sub_tier_agency.frec.frec_code if sub_tier_agency.frec_id else None
if not active_user_can(perm, cgac_code=cgac_code, frec_code=frec_code):
raise ResponseError("User does not have permissions to write to that subtier agency",
StatusCode.PERMISSION_DENIED)
return fn(*args, **kwargs)
return wrapped
return inner
def separate_affiliations(affiliations, app_type):
""" Separates CGAC and FREC UserAffiliations and removes affiliations with permissions outside of the specified
application (FABS or DABS)
Args:
affiliations: list of UserAffiliations
app_type: string deciding which application to use (FABS or DABS)
Returns:
A list of UserAffiliations with CGAC agencies within the app_type application
A list of UserAffiliations with FREC agencies within the app_type application
"""
cgac_ids, frec_ids = [], []
app_permissions = FABS_PERMISSION_ID_LIST if app_type.lower() == 'fabs' else DABS_PERMISSION_ID_LIST
for affiliation in affiliations:
if affiliation.permission_type_id in app_permissions:
if affiliation.frec:
frec_ids.append(affiliation.frec.frec_id)
else:
cgac_ids.append(affiliation.cgac.cgac_id)
return cgac_ids, frec_ids
def check_existing_submission_perms(perm, submission_id):
""" Checks the current user's permissions against the submission with the ID of submission_id
Args:
perm: the type of permission we are checking for
submission_id: the ID of the Submission that the user input
Raises:
ResponseError: If the user doesn't have permission to access the submission at the level requested
or no valid agency code was provided.
"""
sess = GlobalDB.db().session
submission = sess.query(Submission).filter(Submission.submission_id == submission_id).one_or_none()
# Ensure submission exists
if submission is None:
raise ResponseError("existing_submission_id must be a valid submission_id",
StatusCode.CLIENT_ERROR)
# Check permissions for the submission
if not active_user_can_on_submission(perm, submission):
raise ResponseError("User does not have permissions to write to that submission",
StatusCode.PERMISSION_DENIED)