Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemas: when schema mapping got updated, recreate indices in ES and reindex records #1346

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ notifications:

sudo: false

dist: trusty
dist: bionic

language: python

Expand Down
7 changes: 5 additions & 2 deletions cap/modules/deposit/ext.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Initialize extension."""

from __future__ import absolute_import, print_function
from cap.modules.schemas.models import Schema

from invenio_search import current_search

from cap.modules.schemas.models import Schema

from .receivers import handle_deposit_mapping_updated


class CAPDeposit(object):
"""CAPDeposit extension."""

def __init__(self, app=None):
"""Extension initialization."""
if app:
Expand Down
35 changes: 35 additions & 0 deletions cap/modules/deposit/receivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2016 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# CERN Analysis Preservation Framework is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CERN Analysis Preservation Framework; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Registered signal handlers for records module."""
from invenio_jsonschemas.proxies import current_jsonschemas

from cap.modules.records.utils import reindex_by_schema_url
from cap.modules.schemas.signals import deposit_mapping_updated


@deposit_mapping_updated.connect
def handle_deposit_mapping_updated(schema):
"""Reindex all the deposits when mapping in ES got updated."""
schema_url = current_jsonschemas.path_to_url(schema.deposit_path)
reindex_by_schema_url(schema_url, 'depid')
3 changes: 2 additions & 1 deletion cap/modules/records/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Data model package."""

from .receivers import handle_record_mapping_updated
37 changes: 37 additions & 0 deletions cap/modules/records/receivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2016 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# CERN Analysis Preservation Framework is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CERN Analysis Preservation Framework; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Registered signal handlers for deposit module."""
from invenio_jsonschemas.proxies import current_jsonschemas

from cap.modules.schemas.signals import record_mapping_updated

from .utils import reindex_by_schema_url


@record_mapping_updated.connect
def handle_record_mapping_updated(schema):
"""Reindex all the record when mapping in ES got updated."""
schema_url = current_jsonschemas.path_to_url(schema.record_path)

reindex_by_schema_url(schema_url, 'recid')
36 changes: 35 additions & 1 deletion cap/modules/records/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
import string

from flask import url_for
from invenio_db import db
from invenio_indexer.api import RecordIndexer
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_pidstore.models import PersistentIdentifier
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.models import RecordMetadata
from six.moves.urllib import parse
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.sqlite import JSON


def generate_recid(experiment):
Expand Down Expand Up @@ -71,3 +77,31 @@ def api_url_for(endpoint, pid, **kwargs):
**kwargs)

return url_to_api_url(url)


def reindex_by_schema_url(schema_url, pid_type):
"""Reindex all records of given pid_type belonging to that schema."""
def _get_json_type():
"""If postgres db return JSONB, else JSON."""
return JSONB if db.session.bind.dialect.name == 'postgresql' else JSON

indexer = RecordIndexer()

ids = [
x[0] for x in RecordMetadata.query.filter(
RecordMetadata.json['$schema'] == cast(
schema_url, _get_json_type())).values(RecordMetadata.id)
]

if ids:
filtered_by_pid_type = (
x[0] for x in PersistentIdentifier.query.filter(
PersistentIdentifier.object_type == 'rec', PersistentIdentifier
.pid_type == pid_type, PersistentIdentifier.status ==
PIDStatus.REGISTERED, PersistentIdentifier.object_uuid.in_(
ids)).values(PersistentIdentifier.object_uuid))

print('{} records will be reindexed...'.format(schema_url))

indexer.bulk_index(filtered_by_pid_type)
indexer.process_bulk_queue()
13 changes: 7 additions & 6 deletions cap/modules/schemas/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ def add_schema_from_fixture(data=None):
with db.session.begin_nested():
with db.session.begin_nested():
try:
schema = Schema.get(name=data['name'],
version=data['version'])
click.secho('{} already exist in the db.'.format(
str(name)))
return
schema = Schema.get(name=name, version=data['version'])
schema.update(**data)
msg, fg = '{} updated.'.format(str(name)), 'green'

except JSONSchemaNotFound:
schema = Schema(**data)
db.session.add(schema)
msg, fg = '{} added.'.format(str(name)), 'green'

if allow_all:
schema.add_read_access_for_all_users()
else:
schema.revoke_access_for_all_users()

except IntegrityError:
click.secho('Error occured during adding {} to the db. \n'.format(
Expand All @@ -89,4 +90,4 @@ def add_schema_from_fixture(data=None):
return

db.session.commit()
click.secho('{} has been added.'.format(str(name)), fg='green')
click.secho(msg, fg=fg)
126 changes: 96 additions & 30 deletions cap/modules/schemas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
from six.moves.urllib.parse import urljoin
from sqlalchemy import UniqueConstraint, event
from sqlalchemy.orm import validates
from sqlalchemy.orm.base import NO_VALUE
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.utils import import_string

from cap.types import json_type

from .permissions import SchemaAdminAction, SchemaReadAction
from .serializers import resolved_schemas_serializer, schema_serializer
from .signals import deposit_mapping_updated, record_mapping_updated

ES_FORBIDDEN = r' ,"\<*>|?'

Expand Down Expand Up @@ -208,11 +210,30 @@ def add_read_access_for_all_users(self):
"""Give read access to all authenticated users."""
assert self.id

db.session.add(
ActionSystemRoles.allow(SchemaReadAction(self.id),
role=authenticated_user))
try:
ActionSystemRoles.query.filter(
ActionSystemRoles.action == 'schema-object-read',
ActionSystemRoles.argument == str(self.id),
ActionSystemRoles.role_name == 'authenticated_user').one()
except NoResultFound:
db.session.add(
ActionSystemRoles.allow(SchemaReadAction(self.id),
role=authenticated_user))
db.session.flush()

def revoke_access_for_all_users(self):
"""Revoke read access to all authenticated users."""
assert self.id

try:
db.session.delete(
ActionSystemRoles.query.filter(
ActionSystemRoles.action == 'schema-object-read',
ActionSystemRoles.argument == str(self.id),
ActionSystemRoles.role_name == 'authenticated_user').one())
except NoResultFound:
pass

def give_admin_access_for_user(self, user):
"""Give admin access for users."""
assert self.id
Expand Down Expand Up @@ -270,39 +291,55 @@ def name_to_es_name(name):
return name.replace('/', '-')


def create_index(index_name, mapping_body, aliases):
"""Create index in elasticsearch, add under given aliases."""
if not es.indices.exists(index_name):
current_search.mappings[index_name] = {} # invenio search needs it

es.indices.create(index=index_name,
body={'mappings': mapping_body},
ignore=False)

for alias in aliases:
es.indices.update_aliases(
{'actions': [{
'add': {
'index': index_name,
'alias': alias
}
}]})


@event.listens_for(Schema, 'after_insert')
def after_insert_schema(target, value, schema):
"""On schema insert, create corresponding indexes and aliases in ES."""
if schema.is_indexed:
create_index(schema.deposit_index, schema.deposit_mapping,
schema.deposit_aliases)
create_index(schema.record_index, schema.record_mapping,
schema.record_aliases)
_recreate_deposit_mapping_in_ES(schema, schema.deposit_mapping)
_recreate_record_mapping_in_ES(schema, schema.record_mapping)

# invenio search needs it
mappings_imp = current_app.config.get('SEARCH_GET_MAPPINGS_IMP')
current_cache.delete_memoized(import_string(mappings_imp))


@event.listens_for(Schema.deposit_mapping, 'set')
def after_deposit_mapping_updated(target, value, oldvalue, initiator):
"""If deposit mapping field was updated:
* trigger mapping update in ES
* send signal

Skip if:
* triggered on creation of schema (not update)
* schema not indexed in ES
"""
if oldvalue == NO_VALUE or not target.is_indexed:
return

_recreate_deposit_mapping_in_ES(target, value)

if target.use_deposit_as_record:
_recreate_record_mapping_in_ES(target, value)


@event.listens_for(Schema.record_mapping, 'set')
def after_record_mapping_updated(target, value, oldvalue, initiator):
"""If record mapping field was updated:
* trigger mapping update in ES
* send signal

Skip if:
* triggered on creation of schema (not update)
* schema not indexed in ES
* flag use_deposit_as_record, so record mapping changes can be ignored
"""
if oldvalue == NO_VALUE or not target.is_indexed or \
target.use_deposit_as_record:
return

_recreate_record_mapping_in_ES(target, value)


@event.listens_for(Schema, 'after_delete')
def before_delete_schema(mapper, connect, schema):
"""On schema delete, delete corresponding indexes and aliases in ES."""
Expand All @@ -316,7 +353,36 @@ def before_delete_schema(mapper, connect, schema):
current_cache.delete_memoized(import_string(mappings_imp))


@db.event.listens_for(Schema, 'before_update', propagate=True)
def timestamp_before_update(mapper, connection, target):
"""Update `updated` property with current time on `before_update` event."""
target.updated = datetime.utcnow()
def _create_index(index_name, mapping_body, aliases):
"""Create index in elasticsearch, add under given aliases."""
if not es.indices.exists(index_name):
current_search.mappings[index_name] = {} # invenio search needs it

es.indices.create(index=index_name,
body={'mappings': mapping_body},
ignore=False)

for alias in aliases:
es.indices.update_aliases(
{'actions': [{
'add': {
'index': index_name,
'alias': alias
}
}]})


def _recreate_deposit_mapping_in_ES(schema, mapping):
if es.indices.exists(schema.deposit_index):
es.indices.delete(index=schema.deposit_index)

_create_index(schema.deposit_index, mapping, schema.deposit_aliases)
deposit_mapping_updated.send(schema)


def _recreate_record_mapping_in_ES(schema, mapping):
if es.indices.exists(schema.record_index):
es.indices.delete(index=schema.record_index)

_create_index(schema.record_index, mapping, schema.record_aliases)
record_mapping_updated.send(schema)
14 changes: 12 additions & 2 deletions cap/modules/schemas/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import copy

from flask import url_for
from invenio_jsonschemas.proxies import current_jsonschemas
from marshmallow import Schema, ValidationError, fields, pre_load, validate

from cap.utils import url_to_api_url
from invenio_jsonschemas.proxies import current_jsonschemas
from marshmallow import Schema, ValidationError, fields, pre_load, validate

from .validators import JSONSchemaValidator

Expand Down Expand Up @@ -102,6 +102,15 @@ def filter_out_fields_that_cannot_be_updated(self, data, **kwargs):
return data


class SimplifiedSchemaSerializer(Schema):
"""Simplified serializer for schema."""

name = fields.Str(dump_only=True, required=True)
version = fields.Str(dump_only=True, required=True,
validate=validate.Regexp(regex=r"(\d+).(\d+).(\d+)"))
fullname = fields.Str(dump_only=True)


class ResolvedSchemaSerializer(SchemaSerializer):
"""Schema serializer with resolved jsonschemas."""

Expand All @@ -125,5 +134,6 @@ def get_resolved_record_schema(self, obj):


schema_serializer = SchemaSerializer()
simplified_schema_serializer = SimplifiedSchemaSerializer()
update_schema_serializer = UpdateSchemaSerializer()
resolved_schemas_serializer = ResolvedSchemaSerializer()
Loading