-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmiscalegwtest.py
More file actions
128 lines (107 loc) · 4.23 KB
/
Copy pathmiscalegwtest.py
File metadata and controls
128 lines (107 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/python3
from __future__ import print_function
import argparse
import binascii
import os
import sys
from bluepy import btle
import paho.mqtt.client as mqtt
height = 173 # height in cm
MISCALE_MAC = 'MI:SC:AL:EM:AC:AD'
MQTT_USERNAME = 'username'
MQTT_PASSWORD = 'password'
MQTT_HOST = 'ho.st.ip.add'
MQTT_PORT = 1883
MQTT_TIMEOUT = 60
global bmi
global water
global fat
global leanfat
bmi = 0.00
water = 0.00
fat = 0.00
leanfat = 0.00
if os.getenv('C', '1') == '0':
ANSI_RED = ''
ANSI_GREEN = ''
ANSI_YELLOW = ''
ANSI_CYAN = ''
ANSI_WHITE = ''
ANSI_OFF = ''
else:
ANSI_CSI = "\033["
ANSI_RED = ANSI_CSI + '31m'
ANSI_GREEN = ANSI_CSI + '32m'
ANSI_YELLOW = ANSI_CSI + '33m'
ANSI_CYAN = ANSI_CSI + '36m'
ANSI_WHITE = ANSI_CSI + '37m'
ANSI_OFF = ANSI_CSI + '0m'
class ScanProcessor():
def __init__(self):
self.mqtt_client = None
self.connected = False
self._start_client()
def handleDiscovery(self, dev, isNewDev, isNewData):
if dev.addr == MISCALE_MAC.lower() and isNewDev:
print (' Device: %s (%s), %d dBm %s. ' %
(
ANSI_WHITE + dev.addr + ANSI_OFF,
dev.addrType,
dev.rssi,
('' if dev.connectable else '(not connectable)'))
, end='')
for (sdid, desc, data) in dev.getScanData():
if data.startswith('1b18') and sdid == 22:
measunit = data[4:6]
measured = int((data[28:30]+data[26:28]),16)*0.01
#measured = int((data[8:10] + data[6:8]), 16) * 0.01
unit = ''
if measunit.startswith(('03', 'b3')): unit = 'lbs'
if measunit.startswith(('12', 'b2')): unit = 'jin'
if measunit == "02": unit = 'kg' ; measured = measured / 2
if measunit.startswith(('22', 'a2')): unit = 'kg' ; measured = measured / 2
bmi = (measured / (height*height))*10000
water = 0.72 * (-1.976 + 0.907 * measured)
fat = (1.281* bmi) - 10.13
leanfat = measured - fat
if unit:
print('')
self._publish(round(measured, 2), unit, bmi, water, fat, leanfat)
else:
print("Scale is sleeping.")
if not dev.scanData:
print ('\t(no data)')
print
def _start_client(self):
self.mqtt_client = mqtt.Client()
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
def _on_connect(client, _, flags, return_code):
self.connected = True
print("MQTT connection returned result: %s" % mqtt.connack_string(return_code))
self.mqtt_client.on_connect = _on_connect
self.mqtt_client.connect(MQTT_HOST, MQTT_PORT, MQTT_TIMEOUT)
self.mqtt_client.loop_start()
def _publish(self, weight, unit, bmi, water, fat, leanfat):
if not self.connected:
raise Exception('not connected to MQTT server')
prefix = '{}/{}'.format('miscale/weight', unit)
self.mqtt_client.publish(prefix, weight, qos=1, retain=True)
self.mqtt_client.publish('miscale/bmi', bmi, qos=1, retain=True)
self.mqtt_client.publish('miscale/fat', fat, qos=1, retain=True)
self.mqtt_client.publish('miscale/water', water, qos=1, retain=True)
self.mqtt_client.publish('miscale/leanfat', leanfat, qos=1, retain=True)
print('\tSent data to topic %s: %s %s' % (prefix, weight, unit))
print('\tSent data to topic miscale/bmi: %s ' % (bmi))
print('\tSent data to topic miscale/fat: %s ' % (fat))
print('\tSent data to topic miscale/water: %s ' % (water))
print('\tSent data to topic miscale/leanfat: %s ' % (leanfat))
#print('BMI: %s' % (bmi))
#print('Body fat: %s' % (fat))
#print('Body water: %s' % (water))
#print('Lean body mass: %s' % (leanfat))
def main():
scanner = btle.Scanner().withDelegate(ScanProcessor())
print (ANSI_RED + "Scanning for devices..." + ANSI_OFF)
devices = scanner.scan(5)
if __name__ == "__main__":
main()