Skip to content

Commit 2e3043c

Browse files
[monitoring] Adding influxDB 2.x version support #274
Fixes #274
1 parent 0366a4b commit 2e3043c

11 files changed

Lines changed: 954 additions & 16 deletions

File tree

Dockerfile

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,38 @@
11
FROM python:3.9.19-slim-bullseye
22

3+
# Install system dependencies
34
RUN apt update && \
45
apt install --yes zlib1g-dev libjpeg-dev gdal-bin libproj-dev \
56
libgeos-dev libspatialite-dev libsqlite3-mod-spatialite \
67
sqlite3 libsqlite3-dev openssl libssl-dev fping && \
78
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*
89

10+
# Upgrade pip and install Python dependencies
911
RUN pip install -U pip setuptools wheel
1012

13+
# Copy and install project dependencies
1114
COPY requirements-test.txt requirements.txt /opt/openwisp/
1215
RUN pip install -r /opt/openwisp/requirements.txt && \
1316
pip install -r /opt/openwisp/requirements-test.txt && \
1417
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*
1518

19+
# Copy project files and install the project
1620
ADD . /opt/openwisp
1721
RUN pip install -U /opt/openwisp && \
1822
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/*
23+
24+
# Set working directory
1925
WORKDIR /opt/openwisp/tests/
26+
27+
# Set environment variables
2028
ENV NAME=openwisp-monitoring \
2129
PYTHONBUFFERED=1 \
22-
INFLUXDB_HOST=influxdb \
30+
INFLUXDB1_HOST=influxdb \
31+
INFLUXDB2_HOST=influxdb2 \
2332
REDIS_HOST=redis
24-
CMD ["sh", "docker-entrypoint.sh"]
33+
34+
# Expose the application port
2535
EXPOSE 8000
36+
37+
# Command to run the application
38+
CMD ["sh", "docker-entrypoint.sh"]

docker-compose.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ services:
1010
- "8000:8000"
1111
depends_on:
1212
- influxdb
13+
- influxdb2
1314
- redis
1415

1516
influxdb:
@@ -28,6 +29,20 @@ services:
2829
INFLUXDB_USER: openwisp
2930
INFLUXDB_USER_PASSWORD: openwisp
3031

32+
influxdb2:
33+
image: influxdb:2.0-alpine
34+
volumes:
35+
- influxdb2-data:/var/lib/influxdb2
36+
ports:
37+
- "8087:8086"
38+
environment:
39+
DOCKER_INFLUXDB_INIT_MODE: setup
40+
DOCKER_INFLUXDB_INIT_USERNAME: openwisp
41+
DOCKER_INFLUXDB_INIT_PASSWORD: openwisp
42+
DOCKER_INFLUXDB_INIT_ORG: openwisp
43+
DOCKER_INFLUXDB_INIT_BUCKET: openwisp2
44+
DOCKER_INFLUXDB_INIT_RETENTION: 1w
45+
3146
redis:
3247
image: redis:5.0-alpine
3348
ports:
@@ -36,3 +51,4 @@ services:
3651

3752
volumes:
3853
influxdb-data: {}
54+
influxdb2-data: {}

openwisp_monitoring/db/backends/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
3030
"""
3131
try:
3232
assert 'BACKEND' in TIMESERIES_DB, 'BACKEND'
33+
if 'BACKEND' in TIMESERIES_DB and '2' in TIMESERIES_DB['BACKEND']:
34+
# InfluxDB 2.x specific checks
35+
assert 'TOKEN' in TIMESERIES_DB, 'TOKEN'
36+
assert 'ORG' in TIMESERIES_DB, 'ORG'
37+
assert 'BUCKET' in TIMESERIES_DB, 'BUCKET'
38+
else:
39+
# InfluxDB 1.x specific checks
40+
assert 'USER' in TIMESERIES_DB, 'USER'
41+
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
42+
assert 'NAME' in TIMESERIES_DB, 'NAME'
3343
assert 'USER' in TIMESERIES_DB, 'USER'
3444
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
3545
assert 'NAME' in TIMESERIES_DB, 'NAME'
@@ -48,7 +58,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
4858
except ImportError as e:
4959
# The database backend wasn't found. Display a helpful error message
5060
# listing all built-in database backends.
51-
builtin_backends = ['influxdb']
61+
builtin_backends = ['influxdb', 'influxdb2']
5262
if backend_name not in [
5363
f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends
5464
]:
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import logging
2+
3+
from django.utils.functional import cached_property
4+
5+
from openwisp_monitoring.utils import retry
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class BaseDatabaseClient:
11+
def __init__(self, db_name=None):
12+
self._db = None
13+
self.db_name = db_name
14+
15+
@cached_property
16+
def db(self):
17+
raise NotImplementedError("Subclasses must implement `db` method")
18+
19+
@retry
20+
def create_database(self):
21+
raise NotImplementedError("Subclasses must implement `create_database` method")
22+
23+
@retry
24+
def drop_database(self):
25+
raise NotImplementedError("Subclasses must implement `drop_database` method")
26+
27+
@retry
28+
def query(self, query):
29+
raise NotImplementedError("Subclasses must implement `query` method")
30+
31+
def write(self, name, values, **kwargs):
32+
raise NotImplementedError("Subclasses must implement `write` method")
33+
34+
def get_list_retention_policies(self, name=None):
35+
raise NotImplementedError(
36+
"Subclasses must implement `get_list_retention_policies` method"
37+
)
38+
39+
def create_or_alter_retention_policy(self, name, duration):
40+
raise NotImplementedError(
41+
"Subclasses must implement `create_or_alter_retention_policy` method"
42+
)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import logging
2+
3+
from django.utils.functional import cached_property
4+
from influxdb_client import InfluxDBClient, Point
5+
from influxdb_client.client.exceptions import InfluxDBError
6+
from influxdb_client.client.write_api import SYNCHRONOUS
7+
8+
from openwisp_monitoring.utils import retry
9+
10+
from ...exceptions import TimeseriesWriteException
11+
from .. import TIMESERIES_DB
12+
from ..base import BaseDatabaseClient
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class DatabaseClient(BaseDatabaseClient):
18+
backend_name = 'influxdb2'
19+
20+
def __init__(self, db_name=None):
21+
super().__init__(db_name)
22+
self.client_error = InfluxDBError
23+
24+
@cached_property
25+
def db(self):
26+
return InfluxDBClient(
27+
url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}",
28+
token=TIMESERIES_DB['TOKEN'],
29+
org=TIMESERIES_DB['ORG'],
30+
bucket=self.db_name,
31+
)
32+
33+
@retry
34+
def create_database(self):
35+
self.write_api = self.db.write_api(write_options=SYNCHRONOUS)
36+
self.query_api = self.db.query_api()
37+
logger.debug('Initialized APIs for InfluxDB 2.0')
38+
39+
@retry
40+
def drop_database(self):
41+
pass # Implement as needed for InfluxDB 2.0
42+
43+
@retry
44+
def query(self, query):
45+
return self.query_api.query(query)
46+
47+
def write(self, name, values, **kwargs):
48+
point = Point(name).time(self._get_timestamp(kwargs.get('timestamp')))
49+
tags = kwargs.get('tags', {})
50+
for tag, value in tags.items():
51+
point.tag(tag, value)
52+
for field, value in values.items():
53+
point.field(field, value)
54+
try:
55+
self.write_api.write(bucket=self.db_name, record=point)
56+
except InfluxDBError as e:
57+
raise TimeseriesWriteException(str(e))
58+
59+
@retry
60+
def get_list_retention_policies(self, name=None):
61+
bucket = self.db.buckets_api().find_bucket_by_name(name)
62+
if bucket:
63+
return bucket.retention_rules
64+
return []
65+
66+
@retry
67+
def create_or_alter_retention_policy(self, name, duration):
68+
bucket = self.db.buckets_api().find_bucket_by_name(name)
69+
retention_rules = [{"type": "expire", "everySeconds": duration}]
70+
if bucket:
71+
bucket.retention_rules = retention_rules
72+
self.db.buckets_api().update_bucket(bucket=bucket)
73+
else:
74+
self.db.buckets_api().create_bucket(
75+
bucket_name=name,
76+
retention_rules=retention_rules,
77+
org=TIMESERIES_DB["ORG"],
78+
)

0 commit comments

Comments
 (0)