-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinflux_mock.py
59 lines (46 loc) · 1.42 KB
/
influx_mock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import time
from influxdb import InfluxDBClient
import numpy as np
from datetime import datetime, timedelta
host = 'localhost'
port = 8086
user = 'admin'
password = 'admin'
# The database we created
dbname = 'first'
# Create the InfluxDB object
client = InfluxDBClient(host, port, user, password, dbname)
# Run until keyboard out
try:
from itertools import count
N = 3
temp = np.random.normal(loc=20, scale=5, size=N)
pressure = np.random.normal(loc=1000, scale=100, size=N)
humidity = np.random.normal(loc=50, scale=10, size=N)
t = datetime.now() - timedelta(days=7)
for i in count():
json_body = [
{
'measurement': 'ruuvi',
'tags': {
'sensor': 'sensor %i' % i,
},
'time': t.strftime('%Y-%m-%d %H:%M:%S'),
'fields': {
'temp': temp[i],
'pressure': pressure[i],
'humidity': humidity[i]
}
} for i in range(N)
]
# Write JSON to InfluxDB
client.write_points(json_body)
# Wait for next sample
time.sleep(0.2)
print(t.strftime('%Y-%m-%d %H:%M:%S'), temp)
temp += np.random.normal(scale=1, size=N)
pressure += np.random.normal(scale=10, size=N)
humidity += np.random.normal(scale=2, size=N)
t = t + timedelta(minutes=1)
except KeyboardInterrupt:
pass