-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathmodels.py
388 lines (306 loc) · 13.4 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2016 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# CERN Analysis Preservation Framework is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CERN Analysis Preservation Framework; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Models for schemas."""
import re
from datetime import datetime
from flask import current_app
from invenio_access.models import ActionSystemRoles, ActionUsers
from invenio_access.permissions import authenticated_user
from invenio_cache import current_cache
from invenio_db import db
from invenio_jsonschemas.errors import JSONSchemaNotFound
from invenio_search import current_search
from invenio_search import current_search_client as es
from six.moves.urllib.parse import urljoin
from sqlalchemy import UniqueConstraint, event
from sqlalchemy.orm import validates
from sqlalchemy.orm.base import NO_VALUE
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.utils import import_string
from cap.types import json_type
from .permissions import SchemaAdminAction, SchemaReadAction
from .serializers import resolved_schemas_serializer, schema_serializer
from .signals import deposit_mapping_updated, record_mapping_updated
ES_FORBIDDEN = r' ,"\<*>|?'
# map attributes when use_deposit_as_record flag on
SERVE_DEPOSIT_AS_RECORD_MAP = {
'record_schema': 'deposit_schema',
'record_mapping': 'deposit_mapping',
'record_options': 'deposit_options'
}
class Schema(db.Model):
"""Model defining analysis JSON schemas."""
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), unique=False, nullable=False)
fullname = db.Column(db.String(128), unique=False, nullable=True)
# version
major = db.Column(db.Integer, unique=False, nullable=False, default=1)
minor = db.Column(db.Integer, unique=False, nullable=False, default=0)
patch = db.Column(db.Integer, unique=False, nullable=False, default=0)
experiment = db.Column(db.String(128), unique=False, nullable=True)
deposit_schema = db.Column(json_type,
default=lambda: dict(),
nullable=True)
deposit_options = db.Column(json_type,
default=lambda: dict(),
nullable=True)
deposit_mapping = db.Column(json_type,
default=lambda: dict(),
nullable=True)
record_schema = db.Column(json_type, default=lambda: dict(), nullable=True)
record_options = db.Column(json_type,
default=lambda: dict(),
nullable=True)
record_mapping = db.Column(json_type,
default=lambda: dict(),
nullable=True)
use_deposit_as_record = db.Column(db.Boolean(create_constraint=False),
unique=False,
default=False)
is_indexed = db.Column(db.Boolean(create_constraint=False),
unique=False,
default=False)
created = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
updated = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
__tablename__ = 'schema'
__table_args__ = (UniqueConstraint('name',
'major',
'minor',
'patch',
name='unique_schema_version'), )
def __init__(self, *args, **kwargs):
"""Possible to set version from string."""
version = kwargs.pop('version', None)
if version:
self.version = version
super(Schema, self).__init__(*args, **kwargs)
def __getattribute__(self, attr):
"""Map record attribute to deposit one,
if use_deposit_as_record is ```True```."""
if attr in SERVE_DEPOSIT_AS_RECORD_MAP.keys(
) and self.use_deposit_as_record:
attr = SERVE_DEPOSIT_AS_RECORD_MAP.get(attr)
return object.__getattribute__(self, attr)
def __str__(self):
"""Stringify schema object."""
return '{name}-v{version}'.format(name=self.name, version=self.version)
@property
def version(self):
"""Return stringified version."""
return "{0}.{1}.{2}".format(self.major, self.minor, self.patch)
@version.setter
def version(self, string):
"""Set version."""
matched = re.match(r"(\d+).(\d+).(\d+)", string)
if matched is None:
raise ValueError(
'Version has to be passed as string <major>.<minor>.<patch>')
self.major, self.minor, self.patch = matched.groups()
@property
def deposit_path(self):
"""Return deposit schema path."""
prefix = current_app.config['SCHEMAS_DEPOSIT_PREFIX']
path = urljoin(prefix, "{0}.json".format(self))
return path
@property
def record_path(self):
"""Return record schema path."""
prefix = current_app.config['SCHEMAS_RECORD_PREFIX']
path = urljoin(prefix, "{0}.json".format(self))
return path
@property
def deposit_index(self):
"""Get deposit index name."""
path = urljoin(current_app.config['SCHEMAS_DEPOSIT_PREFIX'], str(self))
return name_to_es_name(path)
@property
def record_index(self):
"""Get record index name."""
path = urljoin(current_app.config['SCHEMAS_RECORD_PREFIX'], str(self))
return name_to_es_name(path)
@property
def deposit_aliases(self):
"""Get ES deposits aliases."""
name = name_to_es_name(self.name)
return ['deposits', 'deposits-records', 'deposits-{}'.format(name)]
@property
def record_aliases(self):
"""Get ES records aliases."""
name = name_to_es_name(self.name)
return ['records', 'records-{}'.format(name)]
@validates('name')
def validate_name(self, key, name):
"""Validate if name is ES compatible."""
if any(x in ES_FORBIDDEN for x in name):
raise ValueError('Name cannot contain the following characters'
'[, ", *, \\, <, | , , , > , ?]')
return name
def serialize(self, resolve=False):
"""Serialize schema model."""
serializer = resolved_schemas_serializer if resolve else schema_serializer # noqa
return serializer.dump(self).data
def update(self, **kwargs):
"""Update schema instance."""
for key, value in kwargs.items():
setattr(self, key, value)
return self
def add_read_access_for_all_users(self):
"""Give read access to all authenticated users."""
assert self.id
try:
ActionSystemRoles.query.filter(
ActionSystemRoles.action == 'schema-object-read',
ActionSystemRoles.argument == str(self.id),
ActionSystemRoles.role_name == 'authenticated_user').one()
except NoResultFound:
db.session.add(
ActionSystemRoles.allow(SchemaReadAction(self.id),
role=authenticated_user))
db.session.flush()
def revoke_access_for_all_users(self):
"""Revoke read access to all authenticated users."""
assert self.id
try:
db.session.delete(
ActionSystemRoles.query.filter(
ActionSystemRoles.action == 'schema-object-read',
ActionSystemRoles.argument == str(self.id),
ActionSystemRoles.role_name == 'authenticated_user').one())
except NoResultFound:
pass
def give_admin_access_for_user(self, user):
"""Give admin access for users."""
assert self.id
db.session.add(ActionUsers.allow(SchemaAdminAction(self.id),
user=user))
db.session.flush()
@classmethod
def get_latest(cls, name):
"""Get the latest version of schema with given name."""
latest = cls.query \
.filter_by(name=name) \
.order_by(cls.major.desc(),
cls.minor.desc(),
cls.patch.desc()) \
.first()
if latest:
return latest
else:
raise JSONSchemaNotFound(schema=name)
@classmethod
def get(cls, name, version):
"""Get schema by name and version."""
matched = re.match(r"(\d+).(\d+).(\d+)", version)
if matched is None:
raise ValueError(
'Version has to be passed as string <major>.<minor>.<patch>')
major, minor, patch = matched.groups()
try:
schema = cls.query \
.filter_by(name=name, major=major, minor=minor, patch=patch) \
.one()
except NoResultFound:
raise JSONSchemaNotFound("{}-v{}".format(name, version))
return schema
def name_to_es_name(name):
r"""Translate name to ES compatible name.
Replace '/' with '-'.
[, ", *, \\, <, | , , , > , ?] are forbidden.
"""
if any(x in ES_FORBIDDEN for x in name):
raise AssertionError('Name cannot contain the following characters'
'[, ", *, \\, <, | , , , > , ?]')
return name.replace('/', '-')
@event.listens_for(Schema, 'after_insert')
def after_insert_schema(target, value, schema):
"""On schema insert, create corresponding indexes and aliases in ES."""
if schema.is_indexed:
_recreate_deposit_mapping_in_ES(schema, schema.deposit_mapping)
_recreate_record_mapping_in_ES(schema, schema.record_mapping)
# invenio search needs it
mappings_imp = current_app.config.get('SEARCH_GET_MAPPINGS_IMP')
current_cache.delete_memoized(import_string(mappings_imp))
@event.listens_for(Schema.deposit_mapping, 'set')
def after_deposit_mapping_updated(target, value, oldvalue, initiator):
"""If deposit mapping field was updated:
* trigger mapping update in ES
* send signal
Skip if:
* triggered on creation of schema (not update)
* schema not indexed in ES
"""
if oldvalue == NO_VALUE or not target.is_indexed:
return
_recreate_deposit_mapping_in_ES(target, value)
if target.use_deposit_as_record:
_recreate_record_mapping_in_ES(target, value)
@event.listens_for(Schema.record_mapping, 'set')
def after_record_mapping_updated(target, value, oldvalue, initiator):
"""If record mapping field was updated:
* trigger mapping update in ES
* send signal
Skip if:
* triggered on creation of schema (not update)
* schema not indexed in ES
* flag use_deposit_as_record, so record mapping changes can be ignored
"""
if oldvalue == NO_VALUE or not target.is_indexed or \
target.use_deposit_as_record:
return
_recreate_record_mapping_in_ES(target, value)
@event.listens_for(Schema, 'after_delete')
def before_delete_schema(mapper, connect, schema):
"""On schema delete, delete corresponding indexes and aliases in ES."""
if schema.is_indexed:
for index in (schema.record_index, schema.deposit_index):
if es.indices.exists(index):
es.indices.delete(index)
# invenio search needs it
mappings_imp = current_app.config.get('SEARCH_GET_MAPPINGS_IMP')
current_cache.delete_memoized(import_string(mappings_imp))
def _create_index(index_name, mapping_body, aliases):
"""Create index in elasticsearch, add under given aliases."""
if not es.indices.exists(index_name):
current_search.mappings[index_name] = {} # invenio search needs it
es.indices.create(index=index_name,
body={'mappings': mapping_body},
ignore=False)
for alias in aliases:
es.indices.update_aliases(
{'actions': [{
'add': {
'index': index_name,
'alias': alias
}
}]})
def _recreate_deposit_mapping_in_ES(schema, mapping):
if es.indices.exists(schema.deposit_index):
es.indices.delete(index=schema.deposit_index)
_create_index(schema.deposit_index, mapping, schema.deposit_aliases)
deposit_mapping_updated.send(schema)
def _recreate_record_mapping_in_ES(schema, mapping):
if es.indices.exists(schema.record_index):
es.indices.delete(index=schema.record_index)
_create_index(schema.record_index, mapping, schema.record_aliases)
record_mapping_updated.send(schema)