Skip to content

Commit 7bad1b0

Browse files
committed
[timeseries] Requested Changes
1 parent 75edfe3 commit 7bad1b0

7 files changed

Lines changed: 30 additions & 31 deletions

File tree

README.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ Follow the setup instructions of `openwisp-controller
109109
}
110110
111111
In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval,
112-
make use i=of the following settings
112+
make use of the following settings
113113

114114
.. code-block:: python
115115
TIMESERIES_DATABASE = {
@@ -387,7 +387,7 @@ MB (megabytes) instead of GB (Gigabytes) you can use:
387387
}
388388
}
389389
390-
# This needs to be declared separately but only for elasticsearch
390+
# Please declare the operations separately in case you use elasticsearch as done below
391391
OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS = {
392392
'upload': {'operator': '/', 'value': 1000000},
393393
'download': {'operator': '/', 'value': 1000000},

openwisp_monitoring/db/backends/elasticsearch/client.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from openwisp_utils.utils import deep_merge_dicts
1818

1919
from .. import TIMESERIES_DB
20-
from .index import MetricIndex, Point, find_metric
20+
from .index import MetricDocument, Point, find_metric
2121
from .queries import default_chart_query, math_map, operator_lookup
2222
from .retention_policies import _make_policy, default_rp_policy
2323

@@ -151,7 +151,7 @@ def write(self, name, values, **kwargs):
151151
tags = kwargs.get('tags')
152152
timestamp = kwargs.get('timestamp')
153153
metric_id, index = find_metric(self.get_db, name, tags, rp, add=True)
154-
metric_index = MetricIndex.get(metric_id, index=index, using=self.get_db)
154+
metric_index = MetricDocument.get(metric_id, index=index, using=self.get_db)
155155
point = Point(fields=values, time=timestamp or datetime.now())
156156
metric_index.points.append(point)
157157
metric_index.save()
@@ -168,7 +168,7 @@ def read(self, key, fields, tags=None, limit=1, order='time', **kwargs):
168168
metric_id, index = find_metric(self.get_db, key, tags)
169169
except TypeError:
170170
return []
171-
metric_index = MetricIndex.get(index=index, id=metric_id, using=self.get_db)
171+
metric_index = MetricDocument.get(index=index, id=metric_id, using=self.get_db)
172172
# distinguish between traffic and clients
173173
points = []
174174
for point in list(metric_index.points):
@@ -458,5 +458,4 @@ def _device_data(self, key, tags, fields, **kwargs):
458458

459459

460460
# TODO:
461-
# _fill_points should not work when group_by not specified in the query (need an option to disable it)
462461
# _fill_points has a while which shouldn't be required

openwisp_monitoring/db/backends/elasticsearch/index.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@
44
from elasticsearch.exceptions import NotFoundError
55
from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search
66

7-
# TODO: Remove this. Current Structure
8-
# Index Templates <-- Indexes <-- Documents <-- Points
9-
107

118
class Point(InnerDoc):
129
time = Date(required=True, default_timezone=settings.TIME_ZONE)
1310
fields = Nested(dynamic=True, required=True, multi=True)
1411

1512

16-
class MetricIndex(Document):
13+
class MetricDocument(Document):
1714
tags = Nested(dynamic=True, required=False, multi=True)
1815
points = Nested(Point)
1916

@@ -47,18 +44,20 @@ def find_metric(client, index, tags, retention_policy=None, add=False):
4744
return result['id'], result['index']
4845
except (NotFoundError, AttributeError, IndexError):
4946
if add:
50-
obj = add_doc(client, index, tags, retention_policy=retention_policy)
51-
return obj['_id'], obj['_index']
47+
document = create_document(
48+
client, index, tags, retention_policy=retention_policy
49+
)
50+
return document['_id'], document['_index']
5251
return None
5352

5453

55-
def add_doc(client, key, tags, _id=None, retention_policy=None):
54+
def create_document(client, key, tags, _id=None, retention_policy=None):
5655
"""
57-
Add index to elasticsearch using ``keys``, ``tags`` and ``id`` provided.
56+
Adds document to relevant index using ``keys``, ``tags`` and ``id`` provided.
5857
If no ``id`` is provided a random ``uuid`` would be used.
5958
"""
6059
_id = str(_id or uuid.uuid1())
61-
# If index exists, add the doc and return
60+
# If index exists, create the document and return
6261
try:
6362
index_aliases = client.indices.get_alias(index=key)
6463
for k, v in index_aliases.items():
@@ -70,24 +69,24 @@ def add_doc(client, key, tags, _id=None, retention_policy=None):
7069
pass
7170
# Create a new index if it doesn't exist
7271
name = f'{key}-000001'
73-
obj = MetricIndex(meta={'id': _id})
74-
obj._index = obj._index.clone(name)
72+
document = MetricDocument(meta={'id': _id})
73+
document._index = document._index.clone(name)
7574
# Create a new index template if it doesn't exist
7675
if not client.indices.exists_template(name=key):
77-
obj._index.settings(**{'lifecycle.rollover_alias': key})
76+
document._index.settings(**{'lifecycle.rollover_alias': key})
7877
if retention_policy:
79-
obj._index.settings(**{'lifecycle.name': retention_policy})
80-
# index pattern is added for Index Lifecycle Management
81-
obj._index.as_template(key, f'{key}-*').save(using=client)
82-
obj.init(using=client, index=name)
83-
obj.meta.index = name
84-
obj.tags = tags
85-
obj.save(using=client, index=name)
78+
document._index.settings(**{'lifecycle.name': retention_policy})
79+
# add index pattern is added for Index Lifecycle Management
80+
document._index.as_template(key, f'{key}-*').save(using=client)
81+
document.init(using=client, index=name)
82+
document.meta.index = name
83+
document.tags = tags
84+
document.save(using=client, index=name)
8685
client.indices.put_alias(index=name, name=key, body={'is_write_index': True})
8786
if retention_policy:
8887
client.indices.put_settings(
8988
body={'lifecycle.name': retention_policy}, index=name
9089
)
9190
client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name)
9291
client.indices.refresh(index=key)
93-
return obj.to_dict(include_meta=True)
92+
return document.to_dict(include_meta=True)

openwisp_monitoring/db/backends/elasticsearch/queries.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ def _get_chart_query():
122122
'disk': {'disk_usage': {'avg': {'field': 'points.fields.used_disk'}}},
123123
}
124124
query = {}
125-
for k, v in aggregation_dict.items():
126-
query[k] = {'elasticsearch': _make_query(v)}
125+
for key, value in aggregation_dict.items():
126+
query[key] = {'elasticsearch': _make_query(value)}
127127
return query
128128

129129

openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,5 +248,7 @@ def test_read_multiple(self):
248248
def test_ilm_disabled(self):
249249
with patch.object(timeseries_db, 'ilm_enabled', False):
250250
self.assertFalse(timeseries_db.ilm_enabled)
251-
self.assertIsNone(timeseries_db.create_or_alter_retention_policy(name='default'))
251+
self.assertIsNone(
252+
timeseries_db.create_or_alter_retention_policy(name='default')
253+
)
252254
self.assertIsNone(timeseries_db.get_list_retention_policies())

openwisp_monitoring/monitoring/tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
from ...db import timeseries_db
99
from ...db.backends import TIMESERIES_DB
10-
from ..charts import register_chart, unregister_chart
1110
from ...db.backends.elasticsearch.queries import _make_query
11+
from ..charts import register_chart, unregister_chart
1212

1313
start_time = now()
1414
ten_minutes_ago = start_time - timedelta(minutes=10)

openwisp_monitoring/monitoring/tests/test_charts.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ def test_read_summary_not_aggregate(self):
4646
m = self._create_object_metric(name='summary_hidden')
4747
c = self._create_chart(metric=m)
4848
data = c.read()
49-
# TODO: elasticsearch is returning {'value': 6.0} as default query is returned for summary
5049
self.assertEqual(data['summary'], {'value': None})
5150

5251
def test_read_summary_top_fields(self):

0 commit comments

Comments
 (0)