Skip to content

Commit 5647cd4

Browse files
[monitoring] Updated files and corrected failing qa checks #274
Fixes #274
1 parent 13f3044 commit 5647cd4

11 files changed

Lines changed: 453 additions & 1050 deletions

File tree

openwisp_monitoring/db/backends/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,18 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
6464
"Try using 'openwisp_monitoring.db.backends.XXX', where XXX is one of:\n"
6565
f"{builtin_backends}"
6666
) from e
67+
else:
68+
raise e
69+
6770

71+
if '2' in TIMESERIES_DB['BACKEND']:
72+
timeseries_db = load_backend_module(module='client').DatabaseClient(
73+
bucket=TIMESERIES_DB['BUCKET'],
74+
org=TIMESERIES_DB['ORG'],
75+
token=TIMESERIES_DB['TOKEN'],
76+
url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}",
77+
)
78+
else:
79+
timeseries_db = load_backend_module(module='client').DatabaseClient()
6880

69-
timeseries_db = load_backend_module(module='client').DatabaseClient()
7081
timeseries_db.queries = load_backend_module(module='queries')
Lines changed: 204 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,221 @@
11
import logging
2+
import re
3+
from datetime import datetime
24

3-
import influxdb_client
45
from django.conf import settings
5-
from django.utils.functional import cached_property
6+
from django.core.exceptions import ValidationError
7+
from django.utils.translation import gettext_lazy as _
8+
from influxdb_client import InfluxDBClient, Point
69
from influxdb_client.client.write_api import SYNCHRONOUS
710

8-
from openwisp_monitoring.utils import retry
11+
from ...exceptions import TimeseriesWriteException
912

1013
logger = logging.getLogger(__name__)
1114

1215

13-
class DatabaseClient:
14-
backend_name = 'influxdb2'
16+
class DatabaseClient(object):
17+
_AGGREGATE = [
18+
'COUNT',
19+
'DISTINCT',
20+
'INTEGRAL',
21+
'MEAN',
22+
'MEDIAN',
23+
'MODE',
24+
'SPREAD',
25+
'STDDEV',
26+
'SUM',
27+
'BOTTOM',
28+
'FIRST',
29+
'LAST',
30+
'MAX',
31+
'MIN',
32+
'PERCENTILE',
33+
'SAMPLE',
34+
'TOP',
35+
'CEILING',
36+
'CUMULATIVE_SUM',
37+
'DERIVATIVE',
38+
'DIFFERENCE',
39+
'ELAPSED',
40+
'FLOOR',
41+
'HISTOGRAM',
42+
'MOVING_AVERAGE',
43+
'NON_NEGATIVE_DERIVATIVE',
44+
'HOLT_WINTERS',
45+
]
46+
_FORBIDDEN = ['drop', 'create', 'delete', 'alter', 'into']
47+
backend_name = 'influxdb'
1548

16-
def __init__(self):
17-
self.token = settings.TIMESERIES_DB['TOKEN']
18-
self.org = settings.TIMESERIES_DB['ORG']
19-
self.bucket = settings.TIMESERIES_DB['BUCKET']
20-
self.url = (
21-
f"http://{settings.TIMESERIES_DB['HOST']}:{settings.TIMESERIES_DB['PORT']}"
49+
def __init__(self, bucket, org, token, url):
50+
self.bucket = bucket
51+
self.org = org
52+
self.token = token
53+
self.url = url
54+
self.client = InfluxDBClient(url=url, token=token, org=org)
55+
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
56+
self.query_api = self.client.query_api()
57+
58+
def create_database(self):
59+
logger.debug('InfluxDB 2.0 does not require explicit database creation.')
60+
61+
def drop_database(self):
62+
logger.debug('InfluxDB 2.0 does not support dropping databases via the client.')
63+
64+
def create_or_alter_retention_policy(self, name, duration):
65+
logger.debug('InfluxDB 2.0 handles retention policies via bucket settings.')
66+
67+
def write(self, name, values, **kwargs):
68+
timestamp = kwargs.get('timestamp', datetime.utcnow().isoformat())
69+
point = (
70+
Point(name)
71+
.tag("object_id", kwargs.get('tags').get('object_id'))
72+
.field(kwargs.get('field'), values)
73+
.time(timestamp)
2274
)
75+
try:
76+
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
77+
except Exception as exception:
78+
logger.warning(f'got exception while writing to tsdb: {exception}')
79+
raise TimeseriesWriteException
80+
81+
def batch_write(self, metric_data):
82+
points = []
83+
for data in metric_data:
84+
timestamp = data.get('timestamp', datetime.utcnow().isoformat())
85+
point = (
86+
Point(data.get('name'))
87+
.tag("object_id", data.get('tags').get('object_id'))
88+
.field(data.get('field'), data.get('values'))
89+
.time(timestamp)
90+
)
91+
points.append(point)
92+
try:
93+
self.write_api.write(bucket=self.bucket, org=self.org, record=points)
94+
except Exception as exception:
95+
logger.warning(f'got exception while writing to tsdb: {exception}')
96+
raise TimeseriesWriteException
2397

24-
@cached_property
25-
def client(self):
26-
return influxdb_client.InfluxDBClient(
27-
url=self.url, token=self.token, org=self.org
98+
def read(self, key, fields, tags=None, **kwargs):
99+
since = kwargs.get('since')
100+
order = kwargs.get('order')
101+
limit = kwargs.get('limit')
102+
query = (
103+
f'from(bucket: "{self.bucket}")'
104+
f' |> range(start: {since if since else "-1h"})' # Use since or default
105+
f' |> filter(fn: (r) => r._measurement == "{key}")'
28106
)
107+
if tags:
108+
tag_query = ' and '.join(
109+
[f'r.{tag} == "{value}"' for tag, value in tags.items()]
110+
)
111+
query += f' |> filter(fn: (r) => {tag_query})'
112+
if fields:
113+
field_query = ' or '.join([f'r._field == "{field}"' for field in fields])
114+
query += f' |> filter(fn: (r) => {field_query})'
115+
if order:
116+
query += f' |> sort(columns: ["_time"], desc: {order == "-time"})'
117+
if limit:
118+
query += f' |> limit(n: {limit})'
119+
result = self.query_api.query(org=self.org, query=query)
120+
return [record.values for table in result for record in table.records]
29121

30-
@cached_property
31-
def write_api(self):
32-
return self.client.write_api(write_options=SYNCHRONOUS)
122+
def delete_metric_data(self, key=None, tags=None):
123+
logger.debug(
124+
'InfluxDB 2.0 does not support deleting specific data points via the client.'
125+
)
33126

34-
@retry
35-
def write(self, name, values, **kwargs):
36-
point = influxdb_client.Point(name).fields(values)
37-
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
127+
def validate_query(self, query):
128+
for word in self._FORBIDDEN:
129+
if word in query.lower():
130+
msg = _(f'the word "{word.upper()}" is not allowed')
131+
raise ValidationError({'configuration': msg})
132+
return self._is_aggregate(query)
133+
134+
def _is_aggregate(self, q):
135+
q = q.upper()
136+
for word in self._AGGREGATE:
137+
if any(['%s(' % word in q, '|%s}' % word in q, '|%s|' % word in q]):
138+
return True
139+
return False
140+
141+
def get_query(
142+
self,
143+
chart_type,
144+
params,
145+
time,
146+
group_map,
147+
summary=False,
148+
fields=None,
149+
query=None,
150+
timezone=settings.TIME_ZONE,
151+
):
152+
query = self._fields(fields, query, params['field_name'])
153+
params = self._clean_params(params)
154+
query = query.format(**params)
155+
query = self._group_by(query, time, chart_type, group_map, strip=summary)
156+
if summary:
157+
query = f'{query} |> limit(n: 1)'
158+
return query
159+
160+
def _fields(self, fields, query, field_name):
161+
matches = re.search(self._fields_regex, query)
162+
if not matches and not fields:
163+
return query
164+
elif matches and not fields:
165+
groups = matches.groupdict()
166+
fields_key = groups.get('group')
167+
fields = [field_name]
168+
if fields and matches:
169+
groups = matches.groupdict()
170+
function = groups['func'] # required
171+
operation = groups.get('op') # optional
172+
fields = [self.__transform_field(f, function, operation) for f in fields]
173+
fields_key = groups.get('group')
174+
else:
175+
fields_key = '{fields}'
176+
if fields:
177+
selected_fields = ', '.join(fields)
178+
return query.replace(fields_key, selected_fields)
179+
180+
def __transform_field(self, field, function, operation=None):
181+
if operation:
182+
operation = f' {operation}'
183+
else:
184+
operation = ''
185+
return f'{function}("{field}"){operation} AS {field.replace("-", "_")}'
186+
187+
def _group_by(self, query, time, chart_type, group_map, strip=False):
188+
if not self.validate_query(query):
189+
return query
190+
if not strip and not chart_type == 'histogram':
191+
value = group_map[time]
192+
group_by = (
193+
f'|> aggregateWindow(every: {value}, fn: mean, createEmpty: false)'
194+
)
195+
else:
196+
group_by = ''
197+
if 'aggregateWindow' not in query:
198+
query = f'{query} {group_by}'
199+
return query
200+
201+
202+
# Example usage
203+
if __name__ == "__main__":
204+
bucket = "mybucket"
205+
org = "myorg"
206+
token = "t8Q3Y5mTWuqqTRdGyVxZuyVLO-8pl3I8KaNTR3jV7uTDr_GVECP5Z7LsrZwILGw79Xp4O8pAWkdqTREgIk073Q=="
207+
url = "http://localhost:9086"
208+
209+
client = DatabaseClient(bucket=bucket, org=org, token=token, url=url)
210+
client.create_database()
38211

39-
@cached_property
40-
def query_api(self):
41-
return self.client.query_api()
212+
# Write example
213+
client.write(
214+
"example_measurement", 99.5, tags={"object_id": "server_01"}, field="uptime"
215+
)
42216

43-
@retry
44-
def query(self, query):
45-
return self.query_api.query(org=self.org, query=query)
217+
# Read example
218+
result = client.read(
219+
"example_measurement", ["uptime"], tags={"object_id": "server_01"}
220+
)
221+
print(result)

0 commit comments

Comments
 (0)