-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinflux_ruuvi.py
147 lines (127 loc) · 5.19 KB
/
influx_ruuvi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from influxdb import InfluxDBClient
from datetime import datetime, timedelta
from ruuvi import RuuviTagSensor
from os import path
import yaml
class DataCollector:
def __init__(self, influx_client, sensor_yaml, timeout, mock=False, mock_days=3):
self.influx_client = influx_client
self.sensor_yaml = sensor_yaml
self.timeout = timeout
self.log_data = not mock
self.max_iterations = None # run indefinitely by default
self.mock = False
self.sensor_map = None
self.sensor_map_last_change = -1
print('Sensors:')
for mac in sorted(self.get_sensors()):
print('\t', mac, '<-->', self.sensor_map[mac])
if mock:
import sensor_mock
self.mock = sensor_mock.SensorMock(mock_days=int(mock_days))
self._mock_time = self.mock.mock_time_generator()
self._mock_datas = self.mock.mock_data_generator()
self.max_iterations = self.mock.max_iter if int(mock_days) > 0 else None
def get_sensors(self):
assert path.exists(self.sensor_yaml), 'Sensor map not found: %s' % self.sensor_yaml
if path.getmtime(self.sensor_yaml) != self.sensor_map_last_change:
try:
print('Reloading sensor map as file changed')
new_map = yaml.load(open(self.sensor_yaml))
self.sensor_map = new_map
self.sensor_map_last_change = path.getmtime(self.sensor_yaml)
except Exception as e:
print('Failed to re-load sensor map, going on with the old one. Error:')
print(e)
return self.sensor_map
def collect_and_store(self):
metrics = ['battery',
'pressure',
'humidity',
'acceleration',
'acceleration_x',
'acceleration_y',
'acceleration_z',
'temperature']
if self.mock:
sensors = self.mock.mac_to_name
t_utc = next(self._mock_time)
datas = next(self._mock_datas)
else:
sensors = self.get_sensors()
t_utc = datetime.utcnow()
datas = RuuviTagSensor.get_data_for_sensors(macs=[], search_duratio_sec=int(self.timeout))
t_str = t_utc.isoformat() + 'Z'
print('\n' + t_str)
if len(datas) == 0:
print('No data from sensors.')
return
for mac in datas:
if mac in sensors:
print('Seeing sensor %s (%s).' % (sensors[mac], mac))
else:
print('MAC not in sensors.yml: ' + mac)
json_body = [
{
'measurement': 'ruuvi',
'tags': {
'sensor': sensors[mac],
},
'time': t_str,
'fields': {metric: datas[mac][metric] for metric in metrics if metric in datas[mac]}
}
for mac in datas if mac in sensors
]
if len(json_body) > 0:
if not self.influx_client.write_points(json_body):
print('Data not written!')
else:
print('Data written for %d sensors.' % len(json_body))
else:
print(t_utc, 'No data sent.')
def repeat(interval_sec, max_iter, func, *args, **kwargs):
from itertools import count
import time
starttime = time.time()
for i in count():
if i % 1000 == 0:
print('Collected %d samples' % i)
try:
func(*args, **kwargs)
except Exception as ex:
print('Error!')
print(ex)
if interval_sec > 0:
time.sleep(interval_sec - ((time.time() - starttime) % interval_sec))
if max_iter and i >= max_iter:
return
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--mock', action='store_true',
help='generate a database of mock sensor values')
parser.add_argument('--mock_days', help='days of data to mock', default=30, type=int)
parser.add_argument('--interval', default=60,
help='sensor readout interval (seconds), default 60')
parser.add_argument('--sensors', default='sensors.yml',
help='YAML file mapping sensor MACs to names, default "sensors.yml"')
args = parser.parse_args()
if args.mock:
interval = 5
else:
interval = int(args.interval)
# Create the InfluxDB object
influx_config = yaml.load(open('influx_config.yml'))
client = InfluxDBClient(influx_config['host'],
influx_config['port'],
influx_config['user'],
influx_config['password'],
influx_config['dbname'])
collector = DataCollector(influx_client=client,
sensor_yaml='sensors.yml',
timeout=int(0.75 * interval),
mock=args.mock,
mock_days=args.mock_days)
repeat(interval,
max_iter=collector.max_iterations,
func=lambda: collector.collect_and_store())