Skip to content

Commit f527e8a

Browse files
[DL Streamer Pipeline Server] Added InfluxDB publisher for publishing metadata to influxdb (#218)
Co-authored-by: Ashish Jagadish <ashish.jagadish@intel.com>
1 parent 1552b73 commit f527e8a

12 files changed

Lines changed: 775 additions & 326 deletions

File tree

microservices/dlstreamer-pipeline-server/Dockerfile

Lines changed: 319 additions & 319 deletions
Large diffs are not rendered by default.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"config": {
3+
"pipelines": [
4+
{
5+
"name": "pallet_defect_detection",
6+
"source": "gstreamer",
7+
"queue_maxsize": 50,
8+
"pipeline": "{auto_source} name=source ! decodebin ! videoconvert ! gvadetect name=detection model-instance-id=inst0 ! queue ! gvafpscounter ! gvametaconvert add-empty-results=true name=metaconvert ! appsink name=destination",
9+
"parameters": {
10+
"type": "object",
11+
"properties": {
12+
"detection-properties": {
13+
"element": {
14+
"name": "detection",
15+
"format": "element-properties"
16+
}
17+
}
18+
}
19+
},
20+
"auto_start": false,
21+
"influx_write": {
22+
"bucket": "dlstreamer-pipeline-results",
23+
"org": "my-org",
24+
"measurement": "dlsps"
25+
}
26+
}
27+
]
28+
}
29+
}

microservices/dlstreamer-pipeline-server/docker/.env

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,10 @@ PROMETHEUS_PORT=9999
7575
# Webrtc related config
7676
WHIP_SERVER_IP=
7777
WHIP_SERVER_PORT=
78-
WHIP_SERVER_TIMEOUT=10s
78+
WHIP_SERVER_TIMEOUT=10s
79+
80+
# InfluxDB related config
81+
INFLUXDB_HOST=
82+
INFLUXDB_PORT=
83+
INFLUXDB_USER=
84+
INFLUXDB_PASS=

microservices/dlstreamer-pipeline-server/docker/docker-compose.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ services:
9090
- OTEL_COLLECTOR_HOST=${OTEL_COLLECTOR_HOST}
9191
- OTEL_COLLECTOR_PORT=${OTEL_COLLECTOR_PORT}
9292
- OTEL_EXPORT_INTERVAL_MILLIS=${OTEL_EXPORT_INTERVAL_MILLIS}
93+
- INFLUXDB_HOST=${INFLUXDB_HOST}
94+
- INFLUXDB_PORT=${INFLUXDB_PORT}
95+
- INFLUXDB_USER=${INFLUXDB_USER}
96+
- INFLUXDB_PASS=${INFLUXDB_PASS}
9397
volumes:
9498
# - "../resources:/home/pipeline-server/resources/"
9599
# - "../configs/default/config.json:/home/pipeline-server/config.json"
@@ -131,4 +135,4 @@ volumes:
131135
driver: local
132136
driver_opts:
133137
type: tmpfs
134-
device: tmpfs
138+
device: tmpfs
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# How to publish metadata to InfluxDB
2+
3+
## Steps
4+
5+
DL Streamer Pipeline Server supports storing metadata of frames in InfluxDB.
6+
7+
First you must add server configuration details such as host, port, credentials, etc. as environment variables to DL Streamer Pipeline Server.
8+
9+
If you are launching the service along with DL Streamer Pipeline Server, you should add the InfluxDB service details to DL Streamer Pipeline Server's docker-compose.yml file present at `[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/docker/docker-compose.yml`. For this tutorial we will be following this approach.
10+
11+
12+
For the sake of demonstration, we will be using InfluxDB v2.7.11 to store the metadata and will be launched together with DL Streamer Pipeline Server. To get started, follow the steps below.
13+
14+
1. Modify environment variables in `[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/docker/.env` file.
15+
- Provide the InfluxDB details and credentials.
16+
17+
```sh
18+
INFLUXDB_HOST=influxdb
19+
INFLUXDB_PORT=8086
20+
INFLUXDB_USER=<DATABASE USERNAME> #example INFLUXDB_USER=influxadmin
21+
INFLUXDB_PASS=<DATABASE PASSWORD> #example INFLUXDB_PASS=influxadmin
22+
```
23+
24+
2. Add influxdb service to the docker compose yml.
25+
- Modify the docker-compose.yml file with the following changes. Add `influxdb` service under `services` section. Modify the values as per your requirements.
26+
27+
```yaml
28+
services:
29+
influxdb:
30+
image: influxdb:latest
31+
container_name: influxdb
32+
hostname: influxdb
33+
ports:
34+
- "8086:8086"
35+
volumes:
36+
- influxdb:/var/lib/influxdb
37+
restart: unless-stopped
38+
networks:
39+
- app_network
40+
```
41+
- Also add the `influxdb` in volume section of your docker-compose.yml
42+
43+
```yaml
44+
volumes:
45+
vol_pipeline_root:
46+
driver: local
47+
driver_opts:
48+
type: tmpfs
49+
device: tmpfs
50+
influxdb:
51+
```
52+
53+
- Update `no_proxy` environment section of DL Streamer Pipeline Server service by adding `influxdb` container name to `no_proxy` parameter present under `environment` section of `dlstreamer-pipeline-server` service. Also include the InfluxDB environment variables.
54+
```yaml
55+
services:
56+
dlstreamer-pipeline-server:
57+
environment:
58+
- no_proxy=$no_proxy,multimodal-data-visualization-streaming,${RTSP_CAMERA_IP},${OTEL_COLLECTOR_HOST},${S3_STORAGE_HOST},${INFLUXDB_HOST},influxdb
59+
- INFLUXDB_HOST=${INFLUXDB_HOST}
60+
- INFLUXDB_PORT=${INFLUXDB_PORT}
61+
- INFLUXDB_USER=${INFLUXDB_USER}
62+
- INFLUXDB_PASS=${INFLUXDB_PASS}
63+
```
64+
65+
> **Note** The value added to `no_proxy` must match with the value of `container_name` specified in the `influxdb` service section at docker compose file (`[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/docker/docker-compose.yml`). In our example, its `influxdb`.
66+
67+
3. A sample config has been provided for this demonstration at `[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/configs/sample_influx/config.json`. We need to volume mount the sample config file in `[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/docker/docker-compose.yml` file. Refer below snippet:
68+
69+
```sh
70+
volumes:
71+
# Volume mount [WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/configs/sample_influx/config.json to config file that DL Streamer Pipeline Server container loads.
72+
- "../configs/sample_influx/config.json:/home/pipeline-server/config.json"
73+
```
74+
75+
4. Start DL Streamer Pipeline Server and InfluxDB.
76+
```sh
77+
cd [WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/docker
78+
docker compose up -d
79+
```
80+
5. Setup InfluxDB and create bucket.
81+
- DL Streamer Pipeline Server expects that the setup should be done for InfluxDB and also a bucket should also be created before launching the pipeline.
82+
Here's is a sample python script (requires `request` python package). This script initializes an InfluxDB 2.x server by creating the first admin user, org, and bucket. It calls the `/api/v2/setup` endpoint with the required parameters. Adjust the credentials and names as needed before running.
83+
```python
84+
import requests
85+
url = "http://localhost:8086/api/v2/setup"
86+
payload = {
87+
"username": "influxadmin",
88+
"password": "influxadmin",
89+
"org": "my-org",
90+
"bucket": "dlstreamer-pipeline-results",
91+
"retentionPeriodSeconds": 0
92+
}
93+
response = requests.post(url, json=payload)
94+
if response.status_code == 201:
95+
print("Setup successful!")
96+
else:
97+
print("Setup failed:", response.text)
98+
```
99+
- Execute it in a python environment that has `request` package installed. Save the python script above as `influx_setup.py` in your current directory.
100+
```sh
101+
python3 influx_setup.py
102+
```
103+
6. Launch pipeline by sending the following curl request.
104+
``` sh
105+
curl http://localhost:8080/pipelines/user_defined_pipelines/pallet_defect_detection -X POST -H 'Content-Type: application/json' -d '{
106+
"source": {
107+
"uri": "file:///home/pipeline-server/resources/videos/warehouse.avi",
108+
"type": "uri"
109+
},
110+
"destination": {
111+
"metadata":
112+
{
113+
"type": "influx_write",
114+
"bucket": "dlstreamer-pipeline-results",
115+
"org": "my-org",
116+
"measurement": "dlsps"
117+
}
118+
},
119+
"parameters": {
120+
"detection-properties": {
121+
"model": "/home/pipeline-server/resources/models/geti/pallet_defect_detection/deployment/Detection/model/model.xml",
122+
"device": "CPU"
123+
}
124+
}
125+
}'
126+
```
127+
The frame destination sub-config for `influx_write` specifies that the frame metadata will be written to an InfluxDB instance under the organization `my-org` and bucket `dlstreamer-pipeline-results`. All frame's metadata will be recorded under the same measurement, which defaults to `dlsps` if the `measurement` field is not explicitly provided. For example, frame metadata will be written to the measurement `dlsps` in the bucket `dlstreamer-pipeline-results` within the organization `my-org`.
128+
129+
**Note**: DL Streamer Pipeline Server supports only writing of metadata to InfluxDB. It does not support creating, maintaining or deletion of buckets. It also does not support reading or deletion of metadata from InfluxDB. Also, as mentioned before DL Streamer Pipeline Server assumes that the user already has a InfluxDB with buckets configured.
130+
131+
7. Once you start DL Streamer Pipeline Server with above changes, you should be able to see metadata written to InfluxDB. Since we are using InfluxDB 2.x for our demonstration, you can see the frames being written to InfluxDB by logging into InfluxDB console. You can access the console in your browser - `http://<INFLUXDB_HOST>:8086`. Use the credentials specified above in the `[WORKDIR]/docker/.env` to login into console. After logging into console, you can go to your desired buckets and check the metadata stored.
132+
You can also use the Query Builder in the InfluxDB UI to write and run the following query to view all data written to InfluxDB:
133+
```sh
134+
from(bucket: "dlstreamer-pipeline-results")
135+
|> range(start: -3h)
136+
|> filter(fn: (r) => r["_measurement"] == "dlsps")
137+
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
138+
|> group()
139+
|> sort(columns: ["_time"])
140+
```
141+
Example of metadata stored in InfluxDB:
142+
![Metadata stored in InfluxDB](./images/influx_metadata.png)
143+
144+
8. To stop DL Streamer Pipeline Server and other services, run the following. Since the data is stored inside the InfluxDB container for this demonstration, the metadata will not persists after the containers are brought down.
145+
```sh
146+
docker compose down -v
147+
```
730 KB
Loading

microservices/dlstreamer-pipeline-server/docs/user-guide/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
how-to-launch-configurable-pipelines
1919
how-to-start-evam-mqtt-publish
2020
how-to-store-s3-frame
21+
how-to-store-metadata-influxdb
2122
how-to-launch-and-manage-pipeline
2223
how-to-use-rtsp-camera-as-video-source
2324
how-to-run-udf-pipelines
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from marshmallow import Schema, fields, post_dump, EXCLUDE
2+
import json
3+
4+
class DataSchema(Schema):
5+
height = fields.Int(required=True)
6+
width = fields.Int(required=True)
7+
channels = fields.Int(required=True)
8+
caps = fields.Str(required=True)
9+
img_format = fields.Str(required=True)
10+
img_handle = fields.Str(required=True)
11+
12+
objects = fields.List(fields.Raw(), required=False)
13+
resolution = fields.Dict(required=True)
14+
pipeline = fields.Dict(required=True)
15+
gva_meta = fields.List(fields.Raw(), required=False)
16+
17+
encoding_type = fields.Str(required=False)
18+
encoding_level = fields.Int(required=False)
19+
frame_id = fields.Int(required=True)
20+
21+
timestamp = fields.Int(load_default=None)
22+
tags = fields.Dict(load_default={})
23+
time = fields.Int(required=True)
24+
S3_meta = fields.Dict(required=False)
25+
26+
class Meta:
27+
unknown = EXCLUDE # To ignore unknown fields, the fields not defined in the schema
28+
29+
@post_dump
30+
def stringify_complex(self, data, **kwargs):
31+
for key in ["objects", "resolution", "pipeline", "gva_meta", "tags","S3_meta"]:
32+
if key in data:
33+
data[key] = json.dumps(data[key])
34+
return data
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#
2+
# Apache v2 license
3+
# Copyright (C) 2024 Intel Corporation
4+
# SPDX-License-Identifier: Apache-2.0
5+
#
6+
7+
"""Influx Writer.
8+
Publishes the metadata of frames to InfluxDB.
9+
"""
10+
11+
# pylint: disable=wrong-import-position
12+
import os
13+
import time
14+
import threading as th
15+
from collections import deque
16+
17+
from src.common.log import get_logger
18+
from utils.influx_client import InfluxClient
19+
20+
21+
DEFAULT_APPDEST_INFLUX_QUEUE_SIZE = 1000
22+
23+
24+
class InfluxdbWriter():
25+
"""Influx Writer.
26+
"""
27+
28+
def __init__(self, config, qsize=DEFAULT_APPDEST_INFLUX_QUEUE_SIZE):
29+
"""Constructor
30+
:param json config: Influx publisher config
31+
"""
32+
self.queue = deque(maxlen=qsize)
33+
self.stop_ev = th.Event()
34+
self.host = os.getenv("INFLUXDB_HOST")
35+
self.port = os.getenv("INFLUXDB_PORT")
36+
self.username = os.getenv("INFLUXDB_USER")
37+
self.password = os.getenv("INFLUXDB_PASS")
38+
self.org = config.get("org")
39+
self.influx_bucket_name = config.get("bucket")
40+
self.influx_measurement = config.get("measurement", "dlsps")
41+
self.influxwrite_complete = th.Event()
42+
43+
self.th = None
44+
self.log = get_logger(f'{__name__} ({self.influx_bucket_name})')
45+
if not self.host:
46+
self.log.error(f'Empty value given for INFLUXDB_HOST. It cannot be blank')
47+
self.initialized=False
48+
if not self.port:
49+
self.log.error(f'Empty value given for INFLUXDB_PORT. It cannot be blank')
50+
self.initialized=False
51+
else:
52+
self.port = int(self.port)
53+
if not self.org:
54+
self.log.error(f'Empty value given for INFLUXDB_ORG. It cannot be blank')
55+
self.initialized=False
56+
if not self.username and not self.password:
57+
self.log.error(f'Empty value given for INFLUXDB_USER and INFLUXDB_PASS. It cannot be blank')
58+
self.initialized=False
59+
60+
self.log.info(f'Initializing Influx Writer for bucket - {self.influx_bucket_name}')
61+
self.influx_client = InfluxClient(self.host, self.port, self.org, self.username, self.password)
62+
self.initialized=True
63+
self.log.info("InfluxDB Writer initialized")
64+
65+
def start(self):
66+
"""Start publisher.
67+
"""
68+
self.log.info("Starting influx writer thread")
69+
self.th = th.Thread(target=self._run)
70+
self.th.start()
71+
72+
def stop(self):
73+
"""Stop publisher.
74+
"""
75+
self.influxwrite_complete.set()
76+
if self.stop_ev.set():
77+
return
78+
self.stop_ev.set()
79+
if self.th:
80+
self.th.join()
81+
self.th = None
82+
self.log.info('Influx writer thread stopped')
83+
84+
def error_handler(self, msg):
85+
self.log.error('Error in influx thread: {}'.format(msg))
86+
self.stop()
87+
88+
def _run(self):
89+
"""Run method for publisher.
90+
"""
91+
self.log.info("Influx writer thread started")
92+
try:
93+
while not self.stop_ev.is_set():
94+
try:
95+
_, metadata = self.queue.popleft()
96+
self._publish(metadata)
97+
except IndexError:
98+
self.log.debug("No data in client queue")
99+
time.sleep(0.005)
100+
101+
except Exception as e:
102+
self.error_handler(e)
103+
104+
def _publish(self, metadata):
105+
"""Write object data to influx storage.
106+
:param metadata: Meta data
107+
:type: Dict
108+
"""
109+
self.influx_client.publish(self.influx_bucket_name, self.influx_measurement, metadata)
110+
self.influxwrite_complete.set()

0 commit comments

Comments
 (0)