-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathweather_station.py
357 lines (270 loc) · 13.8 KB
/
weather_station.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
#!/usr/bin/python
'''*****************************************************************************************************************
Raspberry Pi + Raspbian Weather Station
By Uladzislau Bayouski
https://www.linkedin.com/in/uladzislau-bayouski-a7474111b/
A Raspberry Pi based weather station that measures temperature, humidity and pressure using
the Astro Pi Sense HAT then uploads the data to a Weather Underground weather station.
Calculates dew point. Completely configurable and working asyncroniously in multi threads.
Uses stick for choosing different weather entities and visual styles.
Uses logger to log runtime issues/errors.
Inspired by http://makezine.com/projects/raspberry-pi-weather-station-mount/ project.
********************************************************************************************************************'''
from __future__ import print_function
from collections import deque
from sense_hat import SenseHat, ACTION_RELEASED, DIRECTION_UP, DIRECTION_DOWN, DIRECTION_LEFT, DIRECTION_RIGHT
from threading import Timer
from urllib import urlencode
import datetime
import logging
import os
import signal
import sys
import time
import urllib2
from config import Config
from weather_entities import DEFAULT_WEATHER_ENTITIES, CarouselContainer, WeatherEntityType
class WeatherStation(CarouselContainer):
"""Weather Station controlling class, setups and manages station run time."""
# Constants
SMOOTH_READINGS_NUMBER = 3
READINGS_PRINT_TEMPLATE = 'Temp: %sC (%sF), Humidity: %s%%, Pressure: %s inHg'
def __init__(self):
super(WeatherStation, self).__init__()
self._sense_hat = None
self._log_timer = None
self._upload_timer = None
self._update_timer = None
self._last_readings = None
@property
def carousel_items(self):
return DEFAULT_WEATHER_ENTITIES
@property
def current_style(self):
return self.current_item.current_style
def activate_sensors(self):
"""Activates sensors by requesting first values and assigning handlers."""
self._sense_hat = SenseHat()
# Scroll Init message over HAT screen
self._show_message('Init Sensors', (255, 255, 0), (0, 0, 255))
# Init sensors, to be sure first effective run uses correct sensors values
self._sense_hat.get_humidity()
self._sense_hat.get_pressure()
# Setup Sense Hat stick
self._sense_hat.stick.direction_up = self._change_weather_entity
self._sense_hat.stick.direction_down = self._change_weather_entity
self._sense_hat.stick.direction_left = self._change_weather_entity
self._sense_hat.stick.direction_right = self._change_weather_entity
def start_station(self):
"""Launches multiple threads to handle configured behavior."""
if Config.LOG_TO_CONSOLE and Config.LOG_INTERVAL:
self._log_results(first_time=True)
if Config.WEATHER_UPLOAD and Config.UPLOAD_INTERVAL:
self._upload_results(first_time=True)
if Config.UPDATE_DISPLAY and Config.UPDATE_INTERVAL:
self._update_display()
def stop_station(self):
"""Tries to stop active threads and clean up screen."""
if self._sense_hat:
self._sense_hat.clear()
if self._log_timer:
self._log_timer.cancel()
if self._upload_timer:
self._upload_timer.cancel()
if self._update_timer:
self._update_timer.cancel()
@staticmethod
def to_fahrenheit(value):
"""Converts celsius temperature to fahrenheit."""
return (value * 1.8) + 32
@staticmethod
def calculate_dew_point(temp, hum):
"""
Calculates dewpoint in celsius, uses simplified formula less accurate but obvious.
https://en.wikipedia.org/wiki/Dew_point#Calculating_the_dew_point
"""
return temp - (100 - hum) / 5
def get_temperature(self):
"""
Gets temperature and adjusts it with environmental impacts (like cpu temperature).
There are some issues, getting an accurate temperature reading from the
Sense HAT is improbable, see here:
https://www.raspberrypi.org/forums/viewtopic.php?f=104&t=111457
We need to take CPU temp into account. The Pi foundation recommendeds using the following:
http://yaab-arduino.blogspot.co.uk/2016/08/accurate-temperature-reading-sensehat.html
"""
# Get temp readings from both sensors
humidity_temp = self._sense_hat.get_temperature_from_humidity()
pressure_temp = self._sense_hat.get_temperature_from_pressure()
# avg_temp becomes the average of the temperatures from both sensors
# We need to check for pressure_temp value is not 0, to not ruin avg_temp calculation
avg_temp = (humidity_temp + pressure_temp) / 2 if pressure_temp else humidity_temp
# Get the CPU temperature
cpu_temp = self._get_cpu_temp()
# Calculate temperature compensation for CPU heating
# Depending on Raspberry Pi model (2, 3 etc.) and case you may try different formulas
# The commented option below is recommended for Raspberry Pi 3 by:
# http://yaab-arduino.blogspot.co.uk/2016/08/accurate-temperature-reading-sensehat.html
# adj_temp = avg_temp - (cpu_temp - avg_temp) / 1.5
# However I found this one more efficient and used it for Raspberry Pi 2 and Zebra case:
adj_temp = avg_temp - (cpu_temp - avg_temp) / 0.69
print('\033[92mCPU temp: %s, Avg temp: %s, Adj temp: %s\033[0m' % (cpu_temp, avg_temp, adj_temp))
# Average out value across the last three readings
return self._get_smooth(adj_temp)
def get_humidity(self):
"""Gets humidity sensor value."""
return self._sense_hat.get_humidity()
def get_pressure(self):
"""Gets humidity sensor value and converts pressure from millibars to inHg before posting."""
return self._sense_hat.get_pressure() * 0.0295300
def get_sensors_data(self):
"""Returns sensors data tuple."""
temp_in_celsius = self.get_temperature()
return (
round(temp_in_celsius, 1),
round(self.to_fahrenheit(temp_in_celsius), 1),
round(self.get_humidity(), 0),
round(self.get_pressure(), 1)
)
def _change_weather_entity(self, event):
"""Internal. Switches to next/previous weather entity or next/previous visual style."""
# We need to handle release event state
if event.action == ACTION_RELEASED:
self._sense_hat.clear()
if event.direction == DIRECTION_UP:
next_entity = self.next_item
self._show_message(next_entity.entity_messsage, next_entity.positive_color)
elif event.direction == DIRECTION_DOWN:
previous_entity = self.previous_item
self._show_message(previous_entity.entity_messsage, previous_entity.positive_color)
elif event.direction == DIRECTION_LEFT:
self.current_item.previous_item
else:
self.current_item.next_item
self._update_display(loop=False)
def _show_message(self, message, message_color, background_color=(0, 0, 0)):
"""Internal. Shows message by scrolling it over HAT screen."""
# Need to be sure we revert any changes to rotation
self._sense_hat.rotation = 0
self._sense_hat.show_message(message, Config.SCROLL_TEXT_SPEED, message_color, background_color)
def _log_results(self, first_time=False):
"""Internal. Continuously logs sensors values."""
if not first_time:
print(self.READINGS_PRINT_TEMPLATE % self.get_sensors_data())
self._log_timer = self._start_timer(Config.LOG_INTERVAL, self._log_results)
def _update_display(self, loop=True):
"""Internal. Continuously updates screen with new sensors values."""
sensors_data = self.get_sensors_data()
if self.current_item.entity_type is WeatherEntityType.TEMPERATURE:
pixels = self.current_item.show_pixels(sensors_data[0])
elif self.current_item.entity_type is WeatherEntityType.HUMIDITY:
pixels = self.current_item.show_pixels(sensors_data[2])
else:
pixels = self.current_item.show_pixels(sensors_data[3])
self._sense_hat.set_rotation(self.current_style.rotation)
self._sense_hat.set_pixels(pixels)
if loop:
self._update_timer = self._start_timer(Config.UPDATE_INTERVAL, self._update_display)
def _upload_results(self, first_time=False):
"""Internal. Continuously uploads new sensors values to Weather Underground."""
if not first_time:
print('Uploading data to Weather Underground')
sensors_data = self.get_sensors_data()
# Build a weather data object http://wiki.wunderground.com/index.php/PWS_-_Upload_Protocol
weather_data = {
'action': 'updateraw',
'ID': Config.STATION_ID,
'PASSWORD': Config.STATION_KEY,
'dateutc': 'now',
'tempf': sensors_data[1],
'humidity': sensors_data[2],
'baromin': sensors_data[3],
'dewptf': self.to_fahrenheit(self.calculate_dew_point(sensors_data[0], sensors_data[2]))
}
for plugin in Config.PLUGINS:
try:
data = plugin.get_data()
for error in plugin.errors:
logging.warning('%s got an error: %s', plugin.plugin_name, error.message)
if data:
print('\033[94m%s got data: %s\033[0m' % (plugin.plugin_name, data))
weather_data.update(data)
else:
print('\033[94m%s has no data\033[0m' % plugin.plugin_name)
logging.warning('%s has no data', plugin.plugin_name)
except:
logging.warning('Unexpected error occured in %s', plugin.plugin_name, exc_info=True)
try:
upload_url = Config.WU_URL + '?' + urlencode(weather_data)
response = urllib2.urlopen(upload_url)
html = response.read()
print('Server response: ', html)
# Close response object
response.close()
except:
print('Could not upload to Weather Underground')
logging.warning('Could not upload to Weather Underground\r\nWeather Data: %s\r\nUpload URL: %s', weather_data, upload_url, exc_info=True)
self._upload_timer = self._start_timer(Config.UPLOAD_INTERVAL, self._upload_results)
def _start_timer(self, interval, callback):
"""Internal. Starts timer with given interval and callback function."""
timer = Timer(interval, callback)
timer.daemon = True
timer.start()
return timer
def _get_cpu_temp(self):
""""
Internal.
Executes a command at the OS to pull in the CPU temperature.
Thanks to https://www.raspberrypi.org/forums/viewtopic.php?f=104&t=111457
"""
res = os.popen('vcgencmd measure_temp').readline()
return float(res.replace('temp=', '').replace("'C\n", ''))
def _get_smooth(self, value):
"""Moving average to smooth reading."""
# We use deque here as it is more efficient for in/out behaviour than regular list/tuple
if not self._last_readings:
self._last_readings = deque((value, ) * self.SMOOTH_READINGS_NUMBER, self.SMOOTH_READINGS_NUMBER)
else:
self._last_readings.appendleft(value)
# Average last temperature readings
return sum(self._last_readings) / self.SMOOTH_READINGS_NUMBER
# Check prerequisites and launch Weather Station
if __name__ == '__main__':
# Setup logger, to log warning/errors during execution
logging.basicConfig(
filename='/home/pi/weather_station/error.log',
format='\r\n%(asctime)s %(levelname)s %(message)s',
level=logging.WARNING
)
# Read Weather Underground Configuration Parameters
if Config.STATION_ID is None or Config.STATION_KEY is None:
print('Missing values from the Weather Underground configuration file\n')
logging.warning('Missing values from the Weather Underground configuration file')
sys.exit(1)
# Make sure we don't have an upload interval more than 3600 seconds
if Config.UPLOAD_INTERVAL > 3600:
print('The application\'s upload interval cannot be greater than 3600 seconds')
logging.warning('The application\'s upload interval cannot be greater than 3600 seconds')
sys.exit(1)
print('Successfully read Weather Underground configuration values')
print('Station ID: ', Config.STATION_ID)
def _terminate_application(signal=None, frame=None):
"""Nested. Internal. Tries to terminate weather station and make a clean up."""
# We need to check if station was initialized
if 'station' in globals():
station.stop_station()
logging.warning('Application terminated', exc_info=not signal)
print('\nExiting application')
# Subscribe to signals events
signal.signal(signal.SIGTERM, _terminate_application)
try:
station = WeatherStation()
station.activate_sensors()
print('Successfully initialized sensors')
print(station.READINGS_PRINT_TEMPLATE % station.get_sensors_data())
station.start_station()
print('Weather Station successfully launched')
signal.pause()
except:
_terminate_application()
sys.exit(0)