Skip to content

Commit bd51b94

Browse files
committed
[timeseries] Add initial support for elasticsearch #99
1 parent 57b0ded commit bd51b94

15 files changed

Lines changed: 546 additions & 26 deletions

File tree

.travis.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ addons:
2525
services:
2626
- docker
2727
- redis-server
28+
- elasticsearch
2829

2930
branches:
3031
only:
3132
- master
3233
- dev
3334

3435
before_install:
35-
- docker run -d --name influxdb -e INFLUXDB_DB=openwisp2 -p 8086:8086 influxdb:alpine
36+
# - docker run -d --name influxdb -e INFLUXDB_DB=openwisp2 -p 8086:8086 influxdb:alpine
37+
# - docker run -p 9200:9200 docker.elastic.co/elasticsearch/elasticsearch:7.8.0
3638
- pip install -U pip wheel setuptools
3739
- pip install $DJANGO
3840
- pip install -U -r requirements-test.txt

docker-compose.yml

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,25 @@ services:
1111
INFLUXDB_DB: openwisp2
1212
INFLUXDB_USER: openwisp
1313
INFLUXDB_USER_PASSWORD: openwisp
14-
14+
# clustered version of elasticsearch is used as that might be used in production
15+
elasticsearch:
16+
image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
17+
container_name: es01
18+
environment:
19+
- node.name: openwisp2
20+
- cluster.name: openwisp2
21+
- bootstrap.memory_lock: true
22+
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
23+
ulimits:
24+
memlock:
25+
soft: -1
26+
hard: -1
27+
volumes:
28+
- data01:/usr/share/elasticsearch/data
29+
ports:
30+
- 9200:9200
31+
networks:
32+
- elastic
1533
redis:
1634
image: redis:5.0-alpine
1735
ports:
@@ -20,3 +38,9 @@ services:
2038

2139
volumes:
2240
influxdb-data:
41+
data01:
42+
driver: local
43+
44+
networks:
45+
elastic:
46+
driver: bridge

openwisp_monitoring/db/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from .backends import timeseries_db
22

33
chart_query = timeseries_db.queries.chart_query
4-
default_chart_query = timeseries_db.queries.default_chart_query
54
device_data_query = timeseries_db.queries.device_data_query
65

7-
__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query']
6+
__all__ = ['timeseries_db', 'chart_query', 'device_data_query']

openwisp_monitoring/db/backends/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
4848
except ImportError as e:
4949
# The database backend wasn't found. Display a helpful error message
5050
# listing all built-in database backends.
51-
builtin_backends = ['influxdb']
51+
builtin_backends = ['influxdb', 'elasticsearch']
5252
if backend_name not in [
5353
f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends
5454
]:

openwisp_monitoring/db/backends/elasticsearch/__init__.py

Whitespace-only changes.
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
import json
2+
import logging
3+
from copy import deepcopy
4+
from datetime import datetime, timedelta
5+
6+
from django.conf import settings
7+
from django.core.exceptions import ValidationError
8+
from django.utils.functional import cached_property
9+
from elasticsearch import Elasticsearch
10+
from elasticsearch.exceptions import ElasticsearchException, NotFoundError
11+
from elasticsearch_dsl import Search
12+
from elasticsearch_dsl.connections import connections
13+
from pytz import timezone as tz
14+
15+
from openwisp_utils.utils import deep_merge_dicts
16+
17+
from .. import TIMESERIES_DB
18+
from .index import MetricIndex, Point, find_metric
19+
from .queries import default_chart_query, math_map, operator_lookup
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
class DatabaseClient(object):
25+
_AGGREGATE = [
26+
'filters',
27+
'children',
28+
'parent',
29+
'date_histogram',
30+
'auto_date_histogram',
31+
'date_range',
32+
'geo_distance',
33+
'geohash_grid',
34+
'geotile_grid',
35+
'global',
36+
'geo_centroid',
37+
'global',
38+
'ip_range',
39+
'missing',
40+
'nested',
41+
'range',
42+
'reverse_nested',
43+
'significant_terms',
44+
'significant_text',
45+
'sampler',
46+
'terms',
47+
'diversified_sampler',
48+
'composite',
49+
'top_hits',
50+
'avg',
51+
'weighted_avg',
52+
'cardinality',
53+
'extended_stats',
54+
'geo_bounds',
55+
'max',
56+
'min',
57+
'percentiles',
58+
'percentile_ranks',
59+
'scripted_metric',
60+
'stats',
61+
'sum',
62+
'value_count',
63+
]
64+
backend_name = 'elasticsearch'
65+
66+
def __init__(self, db_name='metric'):
67+
self.db_name = db_name or TIMESERIES_DB['NAME']
68+
self.client_error = ElasticsearchException
69+
70+
def create_database(self):
71+
""" creates connection to elasticsearch """
72+
connections.create_connection(hosts=[TIMESERIES_DB['HOST']])
73+
self.get_db
74+
75+
def drop_database(self):
76+
""" deletes all indices """
77+
self.delete_metric_data()
78+
self.get_db.close()
79+
logger.debug('Deleted all indices from Elasticsearch')
80+
81+
@cached_property
82+
def get_db(self):
83+
""" Returns an ``Elasticsearch Client`` instance """
84+
# TODO: AUTHENTICATION remains see `SecurityClient`
85+
return Elasticsearch(
86+
[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"],
87+
http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']),
88+
retry_on_timeout=True,
89+
)
90+
91+
def create_or_alter_retention_policy(self, name, duration):
92+
""" creates or alters existing retention policy if necessary """
93+
# TODO
94+
pass
95+
96+
def query(self, query, precision=None):
97+
index = query.pop('key')
98+
return Search(index=index).from_dict(query).execute().to_dict()
99+
100+
def write(self, name, values, **kwargs):
101+
# TODO: Add support for retention policy
102+
tags = kwargs.get('tags')
103+
timestamp = kwargs.get('timestamp')
104+
metric_id = find_metric(name, tags, add=True)
105+
metric_index = MetricIndex().get(metric_id, index=name)
106+
point = Point(fields=values, time=timestamp or datetime.now())
107+
metric_index.points.append(point)
108+
metric_index.save()
109+
110+
def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
111+
extra_fields = kwargs.get('extra_fields')
112+
time_format = kwargs.get('time_format')
113+
# since = kwargs.get('since')
114+
metric_id = find_metric(key, tags)
115+
if not metric_id:
116+
return list()
117+
try:
118+
metric_index = MetricIndex().get(metric_id, index=key)
119+
except NotFoundError:
120+
return []
121+
if order == 'time':
122+
points = list(metric_index.points[0:limit])
123+
elif order == '-time':
124+
points = list(reversed(metric_index.points))[0:limit]
125+
else:
126+
raise self.client_error(
127+
f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get '
128+
'result sorted in ascending /descending order respectively.'
129+
)
130+
if not points:
131+
return list()
132+
# distinguish between traffic and clients
133+
for point in list(points):
134+
if fields not in point.fields.to_dict():
135+
points.remove(point)
136+
if extra_fields and extra_fields != '*':
137+
assert isinstance(extra_fields, list)
138+
_points = []
139+
for point in points:
140+
point = point.to_dict()
141+
_point = {
142+
'time': self._format_time(point['time'], time_format),
143+
fields: point['fields'][fields],
144+
}
145+
for extra_field in extra_fields:
146+
if point['fields'].get(extra_field) is not None:
147+
_point.update({extra_field: point['fields'][extra_field]})
148+
_points.append(_point)
149+
points = _points
150+
elif extra_fields == '*':
151+
points = [
152+
deep_merge_dicts(
153+
p.fields.to_dict(), {'time': self._format_time(p.time, time_format)}
154+
)
155+
for p in points
156+
]
157+
else:
158+
points = [
159+
deep_merge_dicts(
160+
{fields: p.fields.to_dict()[fields]},
161+
{'time': self._format_time(p.time, time_format)},
162+
)
163+
for p in points
164+
]
165+
# if since:
166+
# TODO:
167+
return points
168+
169+
def _format_time(self, obj, time_format=None):
170+
""" returns datetime object in isoformat / unix timestamp and UTC timezone """
171+
if time_format == 'isoformat':
172+
return obj.astimezone(tz=tz('UTC')).isoformat(timespec='seconds')
173+
return int(obj.astimezone(tz=tz('UTC')).timestamp())
174+
175+
def get_list_query(self, query, precision='s'):
176+
response = self.query(query, precision)
177+
points = response['aggregations']['GroupByTime']['buckets']
178+
list_points = self._fill_points(
179+
query, [self._format(point) for point in points]
180+
)
181+
return list_points
182+
183+
def _fill_points(self, query, points):
184+
_range = next(
185+
(item for item in query['query']['bool']['must'] if item.get('range')), None
186+
)
187+
if not _range or not points:
188+
return points
189+
days = int(_range['range']['points.time']['from'][4:-3])
190+
start_time = datetime.now()
191+
end_time = start_time - timedelta(days=days) # include today
192+
dummy_point = deepcopy(points[0])
193+
interval = points[0]['time'] - points[1]['time']
194+
start_ts = points[0]['time'] + interval
195+
end_ts = points[-1]['time'] - interval
196+
for field in dummy_point.keys():
197+
dummy_point[field] = None
198+
while start_ts < start_time.timestamp():
199+
dummy_point['time'] = start_ts
200+
points.insert(0, deepcopy(dummy_point))
201+
start_ts += interval
202+
# TODO: This needs to be fixed and shouldn't be required since intervals are set
203+
while points[-1]['time'] < end_time.timestamp():
204+
points.pop(-1)
205+
while end_ts > end_time.timestamp():
206+
dummy_point['time'] = end_ts
207+
points.append(deepcopy(dummy_point))
208+
end_ts -= interval
209+
return points
210+
211+
def delete_metric_data(self, key=None, tags=None):
212+
"""
213+
deletes a specific metric based on the key and tags
214+
provided, you may also choose to delete all metrics
215+
"""
216+
if key and tags:
217+
metric_id = find_metric(key, tags)
218+
self.get_db.delete(index=key, id=metric_id)
219+
else:
220+
self.get_db.indices.delete(index='*', ignore=[400, 404])
221+
222+
# Chart related functions below
223+
224+
def validate_query(self, query):
225+
if isinstance(query, str):
226+
query = json.loads(query)
227+
# Elasticsearch currently supports validation of only query section,
228+
# aggs, size, _source etc. are not supported
229+
valid_check = self.get_db.indices.validate_query(body={'query': query['query']})
230+
# Show a helpful message for failure
231+
if not valid_check['valid']:
232+
raise ValidationError(valid_check['error'])
233+
return self._is_aggregate(query)
234+
235+
def _is_aggregate(self, q):
236+
agg_dict = q['aggs']['GroupByTime']['aggs'].values()
237+
agg = []
238+
for item in agg_dict:
239+
agg.append(next(iter(item)))
240+
return True if set(agg) <= set(self._AGGREGATE) else False
241+
242+
def get_query(
243+
self,
244+
chart_type,
245+
params,
246+
time,
247+
group_map,
248+
summary=False,
249+
fields=None,
250+
query=None,
251+
timezone=settings.TIME_ZONE,
252+
):
253+
query['key'] = params.pop('key')
254+
query = json.dumps(query)
255+
for k, v in params.items():
256+
query = query.replace('{' + k + '}', v)
257+
query = self._group_by(query, time, chart_type, group_map, strip=summary)
258+
query = json.loads(query)
259+
if summary:
260+
_range = next(
261+
(item for item in query['query']['bool']['must'] if item.get('range')),
262+
None,
263+
)
264+
if _range:
265+
query['query']['bool']['must'].remove(_range)
266+
query['aggs']['GroupByTime']['date_histogram']['time_zone'] = timezone
267+
return query
268+
269+
def _group_by(self, query, time, chart_type, group_map, strip=False):
270+
if not self.validate_query(query):
271+
return query
272+
if not strip and not chart_type == 'histogram':
273+
value = group_map[time]
274+
query = query.replace('1d/d', f'{time}/d')
275+
query = query.replace('10m', value)
276+
if strip:
277+
query = query.replace('10m', time)
278+
return query
279+
280+
# TODO:
281+
def _get_top_fields(
282+
self,
283+
query,
284+
params,
285+
chart_type,
286+
group_map,
287+
number,
288+
time,
289+
timezone=settings.TIME_ZONE,
290+
):
291+
pass
292+
293+
def _format(self, point):
294+
pt = {}
295+
# Convert time from milliseconds -> seconds precision
296+
pt['time'] = int(point['key'] / 1000)
297+
for key, value in point.items():
298+
if isinstance(value, dict):
299+
pt[key] = self._transform_field(key, value['value'])
300+
return pt
301+
302+
def _transform_field(self, field, value):
303+
""" Performs arithmetic operations on the field if required """
304+
if value is None:
305+
return value
306+
if field in math_map:
307+
op = operator_lookup.get(math_map[field]['operator'])
308+
if op is not None:
309+
value = op(value, math_map[field]['value'])
310+
return value
311+
312+
def default_chart_query(self, tags):
313+
q = deepcopy(default_chart_query)
314+
if not tags:
315+
q['query']['bool']['must'].pop(0)
316+
q['query']['bool']['must'].pop(1)
317+
return q
318+
319+
320+
# Old data - delete by query (inefficient) / retention policy - Index lifecycle management
321+
# Fix Average - currently it's computing average over all fields!
322+
# Time Interval - fix range
323+
# Device query

0 commit comments

Comments
 (0)