Skip to content

Commit e6089aa

Browse files
committed
schemas: trigger update on ES and reindexing when schema mapping changed
* signals on mappings update * recreate indices on mappings update (del -> create) * reindex all records belonging to updated schema * change ```cap fixtures schemas``` to update schemas in the db (before was just skipping if schema already existed in the db), triggering ES changes in reindexing Signed-off-by: Anna Trzcinska <[email protected]>
1 parent 16cbabf commit e6089aa

File tree

11 files changed

+334
-72
lines changed

11 files changed

+334
-72
lines changed

cap/modules/deposit/ext.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
"""Initialize extension."""
22

33
from __future__ import absolute_import, print_function
4-
from cap.modules.schemas.models import Schema
4+
55
from invenio_search import current_search
66

7+
from cap.modules.schemas.models import Schema
8+
9+
from .receivers import handle_deposit_mapping_updated
10+
711

812
class CAPDeposit(object):
913
"""CAPDeposit extension."""
10-
1114
def __init__(self, app=None):
1215
"""Extension initialization."""
1316
if app:

cap/modules/deposit/receivers.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Analysis Preservation Framework.
4+
# Copyright (C) 2016 CERN.
5+
#
6+
# CERN Analysis Preservation Framework is free software; you can redistribute
7+
# it and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Analysis Preservation Framework is distributed in the hope that it will
12+
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Analysis Preservation Framework; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
"""Registered signal handlers for records module."""
25+
from invenio_jsonschemas.proxies import current_jsonschemas
26+
27+
from cap.modules.records.utils import reindex_by_schema_url
28+
from cap.modules.schemas.signals import deposit_mapping_updated
29+
30+
31+
@deposit_mapping_updated.connect
32+
def handle_deposit_mapping_updated(schema):
33+
"""Reindex all the deposits when mapping in ES got updated."""
34+
schema_url = current_jsonschemas.path_to_url(schema.deposit_path)
35+
reindex_by_schema_url(schema_url, 'depid')

cap/modules/records/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@
2121
# In applying this license, CERN does not
2222
# waive the privileges and immunities granted to it by virtue of its status
2323
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24-
2524
"""Data model package."""
25+
26+
from .receivers import handle_record_mapping_updated

cap/modules/records/receivers.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Analysis Preservation Framework.
4+
# Copyright (C) 2016 CERN.
5+
#
6+
# CERN Analysis Preservation Framework is free software; you can redistribute
7+
# it and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Analysis Preservation Framework is distributed in the hope that it will
12+
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Analysis Preservation Framework; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
"""Registered signal handlers for deposit module."""
25+
from invenio_jsonschemas.proxies import current_jsonschemas
26+
27+
from cap.modules.schemas.signals import record_mapping_updated
28+
29+
from .utils import reindex_by_schema_url
30+
31+
32+
@record_mapping_updated.connect
33+
def handle_record_mapping_updated(schema):
34+
"""Reindex all the record when mapping in ES got updated."""
35+
schema_url = current_jsonschemas.path_to_url(schema.record_path)
36+
37+
reindex_by_schema_url(schema_url, 'recid')

cap/modules/records/utils.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,15 @@
2727
import string
2828

2929
from flask import url_for
30+
from invenio_db import db
31+
from invenio_indexer.api import RecordIndexer
3032
from invenio_pidstore.errors import PIDDoesNotExistError
31-
from invenio_pidstore.models import PersistentIdentifier
33+
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
34+
from invenio_records.models import RecordMetadata
3235
from six.moves.urllib import parse
36+
from sqlalchemy import cast
37+
from sqlalchemy.dialects.postgresql import JSONB
38+
from sqlalchemy.dialects.sqlite import JSON
3339

3440

3541
def generate_recid(experiment):
@@ -71,3 +77,27 @@ def api_url_for(endpoint, pid, **kwargs):
7177
**kwargs)
7278

7379
return url_to_api_url(url)
80+
81+
82+
def reindex_by_schema_url(schema_url, pid_type):
83+
"""Reindex all records of given pid_type belonging to that schema."""
84+
def _get_json_type():
85+
"""If postgres db return JSONB, else JSON."""
86+
return JSONB if db.session.bind.dialect.name == 'postgresql' else JSON
87+
88+
indexer = RecordIndexer()
89+
90+
ids = (x[0] for x in RecordMetadata.query.filter(
91+
RecordMetadata.json['$schema'] == cast(
92+
schema_url, _get_json_type())).values(RecordMetadata.id))
93+
94+
filtered_by_pid_type = (x[0] for x in PersistentIdentifier.query.filter(
95+
PersistentIdentifier.object_type == 'rec',
96+
PersistentIdentifier.pid_type == pid_type, PersistentIdentifier.status
97+
== PIDStatus.REGISTERED, PersistentIdentifier.object_uuid.in_(
98+
ids)).values(PersistentIdentifier.object_uuid))
99+
100+
print('{} records will be reindexed...'.format(schema_url))
101+
102+
indexer.bulk_index(filtered_by_pid_type)
103+
indexer.process_bulk_queue(es_bulk_kwargs={'raise_on_error': True})

cap/modules/schemas/cli.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,18 @@ def add_schema_from_fixture(data=None):
6969
with db.session.begin_nested():
7070
with db.session.begin_nested():
7171
try:
72-
schema = Schema.get(name=data['name'],
73-
version=data['version'])
74-
click.secho('{} already exist in the db.'.format(
75-
str(name)))
76-
return
72+
schema = Schema.get(name=name, version=data['version'])
73+
schema.update(**data)
74+
msg, fg = '{} updated.'.format(str(name)), 'green'
7775

7876
except JSONSchemaNotFound:
79-
schema = Schema(**data)
80-
db.session.add(schema)
77+
db.session.add(Schema(**data))
78+
msg, fg = '{} added.'.format(str(name)), 'green'
8179

8280
if allow_all:
8381
schema.add_read_access_for_all_users()
82+
else:
83+
schema.revoke_access_for_all_users()
8484

8585
except IntegrityError:
8686
click.secho('Error occured during adding {} to the db. \n'.format(
@@ -89,4 +89,4 @@ def add_schema_from_fixture(data=None):
8989
return
9090

9191
db.session.commit()
92-
click.secho('{} has been added.'.format(str(name)), fg='green')
92+
click.secho(msg, fg=fg)

cap/modules/schemas/models.py

+77-26
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
from .permissions import SchemaAdminAction, SchemaReadAction
4646
from .serializers import resolved_schemas_serializer, schema_serializer
47+
from .signals import deposit_mapping_updated, record_mapping_updated
4748

4849
ES_FORBIDDEN = r' ,"\<*>|?'
4950

@@ -208,11 +209,30 @@ def add_read_access_for_all_users(self):
208209
"""Give read access to all authenticated users."""
209210
assert self.id
210211

211-
db.session.add(
212-
ActionSystemRoles.allow(SchemaReadAction(self.id),
213-
role=authenticated_user))
212+
try:
213+
ActionSystemRoles.query.filter(
214+
ActionSystemRoles.action == 'schema-object-read',
215+
ActionSystemRoles.argument == str(self.id),
216+
ActionSystemRoles.role_name == 'authenticated_user').one()
217+
except NoResultFound:
218+
db.session.add(
219+
ActionSystemRoles.allow(SchemaReadAction(self.id),
220+
role=authenticated_user))
214221
db.session.flush()
215222

223+
def revoke_access_for_all_users(self):
224+
"""Revoke read access to all authenticated users."""
225+
assert self.id
226+
227+
try:
228+
db.session.delete(
229+
ActionSystemRoles.query.filter(
230+
ActionSystemRoles.action == 'schema-object-read',
231+
ActionSystemRoles.argument == str(self.id),
232+
ActionSystemRoles.role_name == 'authenticated_user').one())
233+
except NoResultFound:
234+
pass
235+
216236
def give_admin_access_for_user(self, user):
217237
"""Give admin access for users."""
218238
assert self.id
@@ -270,25 +290,6 @@ def name_to_es_name(name):
270290
return name.replace('/', '-')
271291

272292

273-
def create_index(index_name, mapping_body, aliases):
274-
"""Create index in elasticsearch, add under given aliases."""
275-
if not es.indices.exists(index_name):
276-
current_search.mappings[index_name] = {} # invenio search needs it
277-
278-
es.indices.create(index=index_name,
279-
body={'mappings': mapping_body},
280-
ignore=False)
281-
282-
for alias in aliases:
283-
es.indices.update_aliases(
284-
{'actions': [{
285-
'add': {
286-
'index': index_name,
287-
'alias': alias
288-
}
289-
}]})
290-
291-
292293
@event.listens_for(Schema, 'after_insert')
293294
def after_insert_schema(target, value, schema):
294295
"""On schema insert, create corresponding indexes and aliases in ES."""
@@ -303,6 +304,43 @@ def after_insert_schema(target, value, schema):
303304
current_cache.delete_memoized(import_string(mappings_imp))
304305

305306

307+
@event.listens_for(Schema.deposit_mapping, 'set')
308+
def after_deposit_mapping_updated(target, value, oldvalue, initiator):
309+
"""If deposit mapping field was updated:
310+
* trigger mapping update in ES
311+
* send signal
312+
"""
313+
if target.is_indexed:
314+
if es.indices.exists(target.deposit_index):
315+
es.indices.delete(index=target.deposit_index)
316+
317+
create_index(target.deposit_index, value, target.deposit_aliases)
318+
deposit_mapping_updated.send(target)
319+
320+
if target.use_deposit_as_record:
321+
if es.indices.exists(target.record_index):
322+
es.indices.delete(index=target.record_index)
323+
324+
create_index(target.record_index, value, target.record_aliases)
325+
326+
record_mapping_updated.send(target)
327+
328+
329+
@event.listens_for(Schema.record_mapping, 'set')
330+
def after_record_mapping_updated(target, value, oldvalue, initiator):
331+
"""If record mapping field was updated:
332+
* trigger mapping update in ES
333+
* send signal
334+
"""
335+
if target.is_indexed and not target.use_deposit_as_record:
336+
if es.indices.exists(target.record_index):
337+
es.indices.delete(index=target.record_index)
338+
339+
create_index(target.record_index, value, target.record_aliases)
340+
341+
record_mapping_updated.send(target)
342+
343+
306344
@event.listens_for(Schema, 'after_delete')
307345
def before_delete_schema(mapper, connect, schema):
308346
"""On schema delete, delete corresponding indexes and aliases in ES."""
@@ -316,7 +354,20 @@ def before_delete_schema(mapper, connect, schema):
316354
current_cache.delete_memoized(import_string(mappings_imp))
317355

318356

319-
@db.event.listens_for(Schema, 'before_update', propagate=True)
320-
def timestamp_before_update(mapper, connection, target):
321-
"""Update `updated` property with current time on `before_update` event."""
322-
target.updated = datetime.utcnow()
357+
def create_index(index_name, mapping_body, aliases):
358+
"""Create index in elasticsearch, add under given aliases."""
359+
if not es.indices.exists(index_name):
360+
current_search.mappings[index_name] = {} # invenio search needs it
361+
362+
es.indices.create(index=index_name,
363+
body={'mappings': mapping_body},
364+
ignore=False)
365+
366+
for alias in aliases:
367+
es.indices.update_aliases(
368+
{'actions': [{
369+
'add': {
370+
'index': index_name,
371+
'alias': alias
372+
}
373+
}]})

cap/modules/schemas/signals.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Analysis Preservation Framework.
4+
# Copyright (C) 2016, 2017 CERN.
5+
#
6+
# CERN Analysis Preservation Framework is free software; you can redistribute
7+
# it and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Analysis Preservation Framework is distributed in the hope that it will
12+
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Analysis Preservation Framework; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
"""Signals for schemas module."""
25+
26+
from __future__ import absolute_import, print_function
27+
28+
from blinker import Namespace
29+
30+
_signals = Namespace()
31+
32+
deposit_mapping_updated = _signals.signal('deposit_mapping-updated')
33+
34+
record_mapping_updated = _signals.signal('record_mapping-updated')

setup.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
# temporary pinned since there are 'connection closed' issues
7777
# on production server
7878
'urllib3[secure]==1.22',
79-
'SQLAlchemy-Continuum==1.3.4',
79+
'sqlalchemy==1.3',
8080

8181
# temporary pinned since there are 'fs' conslicts between
8282
# 'reana-commons' and 'invenio-files-rest'
@@ -95,15 +95,13 @@
9595
# "raven" versions needed till we FIX dependecies on installation
9696
'raven[flask]>=5.0.0,<5.5',
9797
'invenio-logging[sentry]>=1.0.0b1',
98-
9998
'uWSGI==2.0.17',
10099
'uwsgi-tools==1.1.1',
101100
'uwsgitop==0.10',
102101
]
103102

104103
packages = find_packages()
105104

106-
107105
# Get the version string. Cannot be done with import!
108106
g = {}
109107
with open(os.path.join('cap', 'version.py'), 'rt') as fp:

0 commit comments

Comments
 (0)