Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ce668bf
add elasticsearch daemon task
felliott Feb 27, 2014
e1ce884
first pass at new elasticsearchstorage.py
felliott Mar 2, 2014
031cdc8
add Elasticsearch test object
felliott Mar 2, 2014
56dc7ec
collection != index: user must provide index to ES
felliott Mar 2, 2014
5e2f3e5
fix crash in ElasticsearchQuerySet
felliott Mar 2, 2014
2f3968e
delete ES test index when done
felliott Mar 2, 2014
44b53b0
refresh ES index after inserting fixtures
felliott Mar 2, 2014
51c7e16
fill out storage retrieval methods
felliott Mar 2, 2014
6932b08
add search by query functionality
felliott Mar 2, 2014
dc0fec8
EsQuerySet now copies PickleQuerySet
felliott Mar 2, 2014
5b221ce
add stub for converting ES resp. to native types
felliott Mar 2, 2014
fdb87ea
doc/style cleanup
felliott Mar 3, 2014
633a2ea
fix ES delete query syntax
felliott Mar 10, 2014
0cf5fa8
add hacky workaround to get backrefs working
felliott Mar 11, 2014
a2a8863
fix linting complaints in test_simple_queries
felliott Mar 31, 2014
bf355de
make Elasticsearch evaluate lazily
felliott Mar 31, 2014
61dd33e
fix lint complaints in elasticsearch storage
felliott Mar 31, 2014
d989498
add NullHandler to silence ES.trace logging
felliott Mar 31, 2014
374c8df
fix foolish unpacking of ES matches
felliott Mar 31, 2014
3b170cb
add refresh() method to Storage base
felliott Mar 31, 2014
9dda082
require ES v1.0 or greater
felliott Jul 7, 2014
31c8ba0
add option to tasks.py to run ES as daemon
felliott Jul 7, 2014
2b50b38
update ES delete syntax for v1.0
felliott Jul 7, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modularodm/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .mongostorage import MongoStorage
from .picklestorage import PickleStorage
from .ephemeralstorage import EphemeralStorage
from .elasticsearchstorage import ElasticsearchStorage
5 changes: 5 additions & 0 deletions modularodm/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,8 @@ def find(self, query=None, **kwargs):
"""Query the database and return a query set.
"""
raise NotImplementedError

def refresh(self):
"""Refresh the index (needed for real-time databases)
"""
pass
267 changes: 267 additions & 0 deletions modularodm/storage/elasticsearchstorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
import elasticsearch
from elasticsearch import helpers, NotFoundError

from .base import Storage
from ..query.queryset import BaseQuerySet
from ..query.query import QueryGroup
from ..query.query import RawQuery
from modularodm.exceptions import NoResultsFound, MultipleResultsFound

EQUALITY_OPERATORS = ('eq', 'ne')
SET_OPERATORS = ('in', 'nin')
RANGE_OPERATORS = ('gt', 'gte', 'lt', 'lte')
NEGATION_OPERATORS = ('ne', 'nin')

STRING_OPERATORS = ('contains', 'icontains', 'endswith')
STRINGOP_MAP = {
'contains': '.*%s.*',
'icontains': '.*%s.*',
'endswith': '.*%s',
}


class ElasticsearchQuerySet(BaseQuerySet):

def __init__(self, schema, data):
super(ElasticsearchQuerySet, self).__init__(schema)
self.data = list(data)
self._sort = None
self._offset = None
self._limit = None

def _eval(self):
if (self._sort is not None):
for key in self._sort[::-1]:
if key.startswith('-'):
reverse = True
key = key.lstrip('-')
else:
reverse = False

self.data = sorted(self.data, key=lambda record: record[key], reverse=reverse)

if (self._offset is not None):
self.data = self.data[self._offset:]

if (self._limit is not None):
self.data = self.data[:self._limit]

return self

def __getitem__(self, index, raw=False):
super(ElasticsearchQuerySet, self).__getitem__(index)
self._eval()
key = self.data[index][self.primary]
if raw:
return key
return self.schema.load(key)

def __iter__(self, raw=False):
self._eval()
keys = [obj[self.primary] for obj in self.data]
if raw:
return keys
return (self.schema.load(key) for key in keys)

def __len__(self):
self._eval()
return len(self.data)

count = __len__

def get_key(self, index):
return self.__getitem__(index, raw=True)

def get_keys(self):
return list(self.__iter__(raw=True))

def sort(self, *keys):
""" Iteratively sort data by keys in reverse order. """
self._sort = keys
return self

def offset(self, n):
self._offset = n
return self

def limit(self, n):
self._limit = n
return self


class ElasticsearchStorage(Storage):

QuerySet = ElasticsearchQuerySet

def __init__(self, client, es_index, collection, ):
self.client = client
self.collection = collection
self.es_index = es_index

def find(self, query=None, **kwargs):
elasticsearch_query = self._translate_query(query)

# elasticsearch *always* limits search results. The scan()
# helper in the elasitcsearch package will make repeated
# requests until no more results are returned.
matches = []
for results in helpers.scan(
self.client,
query=elasticsearch_query,
index=self.es_index,
doc_type=self.collection,
):
matches.append(results)

results = []
for match in matches:
results.append(self._from_elastic_types(match))

return results

def find_one(self, query=None, **kwargs):
""" Gets a single object from the collection.

If no matching documents are found, raises ``NoResultsFound``.
If >1 matching documents are found, raises ``MultipleResultsFound``.

:params: One or more ``Query`` or ``QuerySet`` objects may be passed

:returns: The selected document
"""
elasticsearch_query = self._translate_query(query)
matches = self.client.search(
index=self.es_index,
doc_type=self.collection,
body=elasticsearch_query,
)['hits']['hits']

if len(matches) == 1:
return self._from_elastic_types(matches[0])

if len(matches) == 0:
raise NoResultsFound()

raise MultipleResultsFound(
'Query for find_one must return exactly one result; '
'returned {0}'.format(len(matches))
)

def get(self, primary_name, key):
match = {}
try:
match = self.client.get(index=self.es_index, doc_type=self.collection, id=key)
except NotFoundError:
return None

return self._from_elastic_types(match)

def insert(self, primary_name, key, value):
self.client.create(
index=self.es_index, doc_type=self.collection, id=key,
body=self._to_elastic_types(value),
)

def update(self, query, data):
data = self._to_elastic_types(data)
for doc in self.find(query):
self.client.update(
index=self.es_index,
doc_type=self.collection, id=doc['_id'],
body={'doc': data}
)

def remove(self, query=None):
elasticsearch_query = self._translate_query(query)
delete_query = {"query": {"filtered": {
"query": {"match_all": {}},
"filter": elasticsearch_query['filter'],
}}}
self.client.delete_by_query(
index=self.es_index,
doc_type=self.collection,
body=delete_query
)

def flush(self):
pass

def __repr__(self):
return self.find()

def _translate_query(self, query=None, elasticsearch_query=None):
elasticsearch_query = self._build_query(query, elasticsearch_query)
return {'filter': elasticsearch_query}

def _build_query(self, query=None, elasticsearch_query=None):
"""Turn a query object into a valid elasticsearch filter dict"""
elasticsearch_query = elasticsearch_query or {}

if isinstance(query, RawQuery):
attribute, operator, argument = \
query.attribute, query.operator, query.argument

if operator in EQUALITY_OPERATORS:
elasticsearch_query['term'] = {attribute: argument}

elif operator in RANGE_OPERATORS:
elasticsearch_query['range'] = {
attribute: {operator: argument}
}

elif operator in SET_OPERATORS:
elasticsearch_query['terms'] = {attribute: argument}

elif operator == 'startswith':
elasticsearch_query['prefix'] = {attribute: argument}

elif operator in STRING_OPERATORS:
elasticsearch_query['regexp'] = {
attribute: self._stringop_to_regex(operator, argument)
}

if operator in NEGATION_OPERATORS:
elasticsearch_query = {"not": elasticsearch_query}

elif isinstance(query, QueryGroup):
if query.operator == 'and':
return {'and': [self._build_query(node) for node in query.nodes]}

elif query.operator == 'or':
return {'or': [self._build_query(node) for node in query.nodes]}

elif query.operator == 'not':
return {'not': self._build_query(query.nodes[0])}

else:
raise ValueError('QueryGroup operator must be <and>, <or>, or <not>.')

elif query is None:
return {}

else:
raise TypeError('Query must be a QueryGroup or Query object.')

return elasticsearch_query

def _stringop_to_regex(self, operator, argument):
return STRINGOP_MAP[operator] % argument

def _to_elastic_types(self, match):
for foo in match:
if type(match[foo]) is tuple:
match[foo] = (str(match[foo][0]), match[foo][1])

return match

def _from_elastic_types(self, match):
result = match['_source']
for foo in result:
if type(result[foo]) is tuple:
result[foo] = (int(result[foo][0]), result[foo][1])

return result

def refresh(self):
indices = elasticsearch.client.IndicesClient(self.client)
indices.refresh(index=self.es_index)
1 change: 1 addition & 0 deletions modularodm/storedobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ def save(self, force=False):
else:
self.insert(self._primary_key, storage_data)

self._storage[0].refresh()
# if primary key has changed, follow back references and update
# AND
# run after_save or after_save_on_difference
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ flask
blinker
pymongo
python-dateutil
elasticsearch>=1.0
9 changes: 9 additions & 0 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ def mongo(daemon=False, port=20771):
cmd += " --fork"
run(cmd)

@task
def elastic(daemon=False, port=20772, clustername='modular-odm-test'):
'''Run the elasticsearch process.
'''
cmd = "elasticsearch -Des.http.port={0} -Des.cluster.name={1}".format(port, clustername)
if daemon:
cmd += " -d"
run(cmd)

@task
def test(coverage=False, browse=False):
command = "nosetests"
Expand Down
38 changes: 37 additions & 1 deletion tests/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import elasticsearch
import logging
import inspect
import os
Expand All @@ -7,9 +8,16 @@
import uuid

from modularodm import StoredObject
from modularodm.storage import MongoStorage, PickleStorage, EphemeralStorage
from modularodm.storage import MongoStorage, PickleStorage, EphemeralStorage, ElasticsearchStorage


class NullHandler(logging.Handler):
def emit(self, record):
pass

logger = logging.getLogger(__name__)
logging.getLogger('elasticsearch.trace').addHandler(NullHandler())


class TestObject(StoredObject):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -83,6 +91,33 @@ def clean_up_storage(self):
self.mongo_client.drop_collection(c)


class ElasticsearchStorageMixin(object):
fixture_suffix = 'Elasticsearch'

# DB settings
DB_HOST = os.environ.get('ES_HOST', 'localhost')
DB_PORT = int(os.environ.get('ES_PORT', '20772'))
DB_INDEX = '--modmtest--'

client = elasticsearch.Elasticsearch([{"host": DB_HOST, "port": DB_PORT}])
indices = elasticsearch.client.IndicesClient(client)

def make_storage(self):
collection = str(uuid.uuid4())[:8]
return ElasticsearchStorage(
client=self.client, es_index=self.DB_INDEX, collection=collection
)

def clean_up_storage(self):
self.client.flush()
self.indices.delete(self.DB_INDEX)

# must refresh index before querying, or some results may not be found
def setUp(self):
super(ElasticsearchStorageMixin, self).setUp()
self.indices.refresh(index=self.DB_INDEX)


class MultipleBackendMeta(type):
def __new__(mcs, name, bases, dct):

Expand All @@ -101,6 +136,7 @@ def __new__(mcs, name, bases, dct):
PickleStorageMixin,
MongoStorageMixin,
EphemeralStorageMixin,
ElasticsearchStorageMixin,
):
new_name = '{}{}'.format(name, mixin.fixture_suffix)
frame.f_globals[new_name] = type.__new__(
Expand Down
Loading