Skip to content

Commit b1523ae

Browse files
committed
schemas: trigger update on ES and reindexing when schema mapping changed
* signals on mappings update * recreate indices on mappings update (del -> create) * reindex all records belonging to updated schema * change ```cap fixtures schemas``` to update schemas in the db (before was just skipping if schema already existed in the db), triggering ES changes in reindexing Signed-off-by: Anna Trzcinska <[email protected]>
1 parent d2b2a25 commit b1523ae

File tree

12 files changed

+318
-96
lines changed

12 files changed

+318
-96
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ notifications:
2929

3030
sudo: false
3131

32-
dist: trusty
32+
dist: bionic
3333

3434
language: python
3535

cap/modules/deposit/ext.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
"""Initialize extension."""
22

33
from __future__ import absolute_import, print_function
4-
from cap.modules.schemas.models import Schema
4+
55
from invenio_search import current_search
66

7+
from cap.modules.schemas.models import Schema
8+
9+
from .receivers import handle_deposit_mapping_updated
10+
711

812
class CAPDeposit(object):
913
"""CAPDeposit extension."""
10-
1114
def __init__(self, app=None):
1215
"""Extension initialization."""
1316
if app:

cap/modules/deposit/receivers.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Analysis Preservation Framework.
4+
# Copyright (C) 2016 CERN.
5+
#
6+
# CERN Analysis Preservation Framework is free software; you can redistribute
7+
# it and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Analysis Preservation Framework is distributed in the hope that it will
12+
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Analysis Preservation Framework; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
"""Registered signal handlers for records module."""
25+
from invenio_jsonschemas.proxies import current_jsonschemas
26+
27+
from cap.modules.records.utils import reindex_by_schema_url
28+
from cap.modules.schemas.signals import deposit_mapping_updated
29+
30+
31+
@deposit_mapping_updated.connect
32+
def handle_deposit_mapping_updated(schema):
33+
"""Reindex all the deposits when mapping in ES got updated."""
34+
schema_url = current_jsonschemas.path_to_url(schema.deposit_path)
35+
reindex_by_schema_url(schema_url, 'depid')

cap/modules/records/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@
2121
# In applying this license, CERN does not
2222
# waive the privileges and immunities granted to it by virtue of its status
2323
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24-
2524
"""Data model package."""
25+
26+
from .receivers import handle_record_mapping_updated

cap/modules/records/receivers.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Analysis Preservation Framework.
4+
# Copyright (C) 2016 CERN.
5+
#
6+
# CERN Analysis Preservation Framework is free software; you can redistribute
7+
# it and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Analysis Preservation Framework is distributed in the hope that it will
12+
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Analysis Preservation Framework; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
"""Registered signal handlers for deposit module."""
25+
from invenio_jsonschemas.proxies import current_jsonschemas
26+
27+
from cap.modules.schemas.signals import record_mapping_updated
28+
29+
from .utils import reindex_by_schema_url
30+
31+
32+
@record_mapping_updated.connect
33+
def handle_record_mapping_updated(schema):
34+
"""Reindex all the record when mapping in ES got updated."""
35+
schema_url = current_jsonschemas.path_to_url(schema.record_path)
36+
37+
reindex_by_schema_url(schema_url, 'recid')

cap/modules/records/utils.py

+22-12
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,15 @@
2727
import string
2828

2929
from flask import url_for
30+
from invenio_db import db
31+
from invenio_indexer.api import RecordIndexer
3032
from invenio_pidstore.errors import PIDDoesNotExistError
31-
from invenio_pidstore.models import PersistentIdentifier
33+
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
34+
from invenio_records.models import RecordMetadata
3235
from six.moves.urllib import parse
36+
from sqlalchemy import cast
37+
from sqlalchemy.dialects.postgresql import JSONB
38+
from sqlalchemy.dialects.sqlite import JSON
3339

3440

3541
def generate_recid(experiment):
@@ -81,17 +87,21 @@ def _get_json_type():
8187

8288
indexer = RecordIndexer()
8389

84-
ids = (x[0] for x in RecordMetadata.query.filter(
85-
RecordMetadata.json['$schema'] == cast(
86-
schema_url, _get_json_type())).values(RecordMetadata.id))
90+
ids = [
91+
x[0] for x in RecordMetadata.query.filter(
92+
RecordMetadata.json['$schema'] == cast(
93+
schema_url, _get_json_type())).values(RecordMetadata.id)
94+
]
8795

88-
filtered_by_pid_type = (x[0] for x in PersistentIdentifier.query.filter(
89-
PersistentIdentifier.status == PIDStatus.REGISTERED,
90-
PersistentIdentifier.object_type == 'rec', PersistentIdentifier.
91-
pid_type == pid_type, PersistentIdentifier.object_uuid.in_(
92-
ids)).values(PersistentIdentifier.object_uuid))
96+
if ids:
97+
filtered_by_pid_type = (
98+
x[0] for x in PersistentIdentifier.query.filter(
99+
PersistentIdentifier.object_type == 'rec', PersistentIdentifier
100+
.pid_type == pid_type, PersistentIdentifier.status ==
101+
PIDStatus.REGISTERED, PersistentIdentifier.object_uuid.in_(
102+
ids)).values(PersistentIdentifier.object_uuid))
93103

94-
print('{} records will be reindexed...'.format(schema_url))
104+
print('{} records will be reindexed...'.format(schema_url))
95105

96-
indexer.bulk_index(filtered_by_pid_type)
97-
indexer.process_bulk_queue(es_bulk_kwargs={'raise_on_error': True})
106+
indexer.bulk_index(filtered_by_pid_type)
107+
indexer.process_bulk_queue()

cap/modules/schemas/cli.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,19 @@ def add_schema_from_fixture(data=None):
6969
with db.session.begin_nested():
7070
with db.session.begin_nested():
7171
try:
72-
schema = Schema.get(name=data['name'],
73-
version=data['version'])
74-
click.secho('{} already exist in the db.'.format(
75-
str(name)))
76-
return
72+
schema = Schema.get(name=name, version=data['version'])
73+
schema.update(**data)
74+
msg, fg = '{} updated.'.format(str(name)), 'green'
7775

7876
except JSONSchemaNotFound:
7977
schema = Schema(**data)
8078
db.session.add(schema)
79+
msg, fg = '{} added.'.format(str(name)), 'green'
8180

8281
if allow_all:
8382
schema.add_read_access_for_all_users()
83+
else:
84+
schema.revoke_access_for_all_users()
8485

8586
except IntegrityError:
8687
click.secho('Error occured during adding {} to the db. \n'.format(
@@ -89,4 +90,4 @@ def add_schema_from_fixture(data=None):
8990
return
9091

9192
db.session.commit()
92-
click.secho('{} has been added.'.format(str(name)), fg='green')
93+
click.secho(msg, fg=fg)

cap/modules/schemas/models.py

+96-30
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@
3737
from six.moves.urllib.parse import urljoin
3838
from sqlalchemy import UniqueConstraint, event
3939
from sqlalchemy.orm import validates
40+
from sqlalchemy.orm.base import NO_VALUE
4041
from sqlalchemy.orm.exc import NoResultFound
4142
from werkzeug.utils import import_string
4243

4344
from cap.types import json_type
4445

4546
from .permissions import SchemaAdminAction, SchemaReadAction
4647
from .serializers import resolved_schemas_serializer, schema_serializer
48+
from .signals import deposit_mapping_updated, record_mapping_updated
4749

4850
ES_FORBIDDEN = r' ,"\<*>|?'
4951

@@ -208,11 +210,30 @@ def add_read_access_for_all_users(self):
208210
"""Give read access to all authenticated users."""
209211
assert self.id
210212

211-
db.session.add(
212-
ActionSystemRoles.allow(SchemaReadAction(self.id),
213-
role=authenticated_user))
213+
try:
214+
ActionSystemRoles.query.filter(
215+
ActionSystemRoles.action == 'schema-object-read',
216+
ActionSystemRoles.argument == str(self.id),
217+
ActionSystemRoles.role_name == 'authenticated_user').one()
218+
except NoResultFound:
219+
db.session.add(
220+
ActionSystemRoles.allow(SchemaReadAction(self.id),
221+
role=authenticated_user))
214222
db.session.flush()
215223

224+
def revoke_access_for_all_users(self):
225+
"""Revoke read access to all authenticated users."""
226+
assert self.id
227+
228+
try:
229+
db.session.delete(
230+
ActionSystemRoles.query.filter(
231+
ActionSystemRoles.action == 'schema-object-read',
232+
ActionSystemRoles.argument == str(self.id),
233+
ActionSystemRoles.role_name == 'authenticated_user').one())
234+
except NoResultFound:
235+
pass
236+
216237
def give_admin_access_for_user(self, user):
217238
"""Give admin access for users."""
218239
assert self.id
@@ -270,39 +291,55 @@ def name_to_es_name(name):
270291
return name.replace('/', '-')
271292

272293

273-
def create_index(index_name, mapping_body, aliases):
274-
"""Create index in elasticsearch, add under given aliases."""
275-
if not es.indices.exists(index_name):
276-
current_search.mappings[index_name] = {} # invenio search needs it
277-
278-
es.indices.create(index=index_name,
279-
body={'mappings': mapping_body},
280-
ignore=False)
281-
282-
for alias in aliases:
283-
es.indices.update_aliases(
284-
{'actions': [{
285-
'add': {
286-
'index': index_name,
287-
'alias': alias
288-
}
289-
}]})
290-
291-
292294
@event.listens_for(Schema, 'after_insert')
293295
def after_insert_schema(target, value, schema):
294296
"""On schema insert, create corresponding indexes and aliases in ES."""
295297
if schema.is_indexed:
296-
create_index(schema.deposit_index, schema.deposit_mapping,
297-
schema.deposit_aliases)
298-
create_index(schema.record_index, schema.record_mapping,
299-
schema.record_aliases)
298+
_recreate_deposit_mapping_in_ES(schema, schema.deposit_mapping)
299+
_recreate_record_mapping_in_ES(schema, schema.record_mapping)
300300

301301
# invenio search needs it
302302
mappings_imp = current_app.config.get('SEARCH_GET_MAPPINGS_IMP')
303303
current_cache.delete_memoized(import_string(mappings_imp))
304304

305305

306+
@event.listens_for(Schema.deposit_mapping, 'set')
307+
def after_deposit_mapping_updated(target, value, oldvalue, initiator):
308+
"""If deposit mapping field was updated:
309+
* trigger mapping update in ES
310+
* send signal
311+
312+
Skip if:
313+
* triggered on creation of schema (not update)
314+
* schema not indexed in ES
315+
"""
316+
if oldvalue == NO_VALUE or not target.is_indexed:
317+
return
318+
319+
_recreate_deposit_mapping_in_ES(target, value)
320+
321+
if target.use_deposit_as_record:
322+
_recreate_record_mapping_in_ES(target, value)
323+
324+
325+
@event.listens_for(Schema.record_mapping, 'set')
326+
def after_record_mapping_updated(target, value, oldvalue, initiator):
327+
"""If record mapping field was updated:
328+
* trigger mapping update in ES
329+
* send signal
330+
331+
Skip if:
332+
* triggered on creation of schema (not update)
333+
* schema not indexed in ES
334+
* flag use_deposit_as_record, so record mapping changes can be ignored
335+
"""
336+
if oldvalue == NO_VALUE or not target.is_indexed or \
337+
target.use_deposit_as_record:
338+
return
339+
340+
_recreate_record_mapping_in_ES(target, value)
341+
342+
306343
@event.listens_for(Schema, 'after_delete')
307344
def before_delete_schema(mapper, connect, schema):
308345
"""On schema delete, delete corresponding indexes and aliases in ES."""
@@ -316,7 +353,36 @@ def before_delete_schema(mapper, connect, schema):
316353
current_cache.delete_memoized(import_string(mappings_imp))
317354

318355

319-
@db.event.listens_for(Schema, 'before_update', propagate=True)
320-
def timestamp_before_update(mapper, connection, target):
321-
"""Update `updated` property with current time on `before_update` event."""
322-
target.updated = datetime.utcnow()
356+
def _create_index(index_name, mapping_body, aliases):
357+
"""Create index in elasticsearch, add under given aliases."""
358+
if not es.indices.exists(index_name):
359+
current_search.mappings[index_name] = {} # invenio search needs it
360+
361+
es.indices.create(index=index_name,
362+
body={'mappings': mapping_body},
363+
ignore=False)
364+
365+
for alias in aliases:
366+
es.indices.update_aliases(
367+
{'actions': [{
368+
'add': {
369+
'index': index_name,
370+
'alias': alias
371+
}
372+
}]})
373+
374+
375+
def _recreate_deposit_mapping_in_ES(schema, mapping):
376+
if es.indices.exists(schema.deposit_index):
377+
es.indices.delete(index=schema.deposit_index)
378+
379+
_create_index(schema.deposit_index, mapping, schema.deposit_aliases)
380+
deposit_mapping_updated.send(schema)
381+
382+
383+
def _recreate_record_mapping_in_ES(schema, mapping):
384+
if es.indices.exists(schema.record_index):
385+
es.indices.delete(index=schema.record_index)
386+
387+
_create_index(schema.record_index, mapping, schema.record_aliases)
388+
record_mapping_updated.send(schema)

tests/unit/test_views.py cap/modules/schemas/signals.py

+7-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of CERN Analysis Preservation Framework.
4-
# Copyright (C) 2018 CERN.
4+
# Copyright (C) 2016, 2017 CERN.
55
#
66
# CERN Analysis Preservation Framework is free software; you can redistribute
77
# it and/or modify it under the terms of the GNU General Public License as
@@ -21,18 +21,14 @@
2121
# In applying this license, CERN does not
2222
# waive the privileges and immunities granted to it by virtue of its status
2323
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24-
# or submit itself to any jurisdiction.
25-
"""Unit tests for Cap general views."""
24+
"""Signals for schemas module."""
2625

27-
from flask import url_for
26+
from __future__ import absolute_import, print_function
2827

28+
from blinker import Namespace
2929

30-
def test_view_ping(app):
31-
with app.test_request_context():
32-
url = url_for('cap.ping')
30+
_signals = Namespace()
3331

34-
with app.test_client() as client:
35-
resp = client.get(url)
32+
deposit_mapping_updated = _signals.signal('deposit_mapping-updated')
3633

37-
assert resp.status_code == 200
38-
assert resp.data == 'Pong!'
34+
record_mapping_updated = _signals.signal('record_mapping-updated')

0 commit comments

Comments
 (0)