-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathble_scanner.py
More file actions
194 lines (159 loc) · 6.36 KB
/
ble_scanner.py
File metadata and controls
194 lines (159 loc) · 6.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/bin/env python3
"""
BLE device scanner for Home Assistant via MQTT.
Scans continuously and publishes presence/absence via MQTT discovery.
"""
import asyncio
import configparser
import json
import logging
import os
import socket
import signal
import sys
import time
from datetime import datetime, timezone
from bleak import BleakScanner
import paho.mqtt.client as mqtt
# --- Load configuration ---
_CONF_SEARCH = [
os.path.join(os.path.dirname(os.path.abspath(__file__)), "ble_scanner.conf"),
"/etc/ble_scanner.conf",
]
_cfg = configparser.ConfigParser(delimiters=("=",))
_loaded = _cfg.read(_CONF_SEARCH)
if not _loaded:
print(f"ERROR: no config file found (searched: {_CONF_SEARCH})", file=sys.stderr)
sys.exit(1)
SCAN_DURATION = _cfg.getfloat("bluetooth", "scan_duration", fallback=2.0)
EXPIRY_TIME = _cfg.getfloat("bluetooth", "expiry_time", fallback=60.0)
if not _cfg.has_section("devices") or not _cfg.items("devices"):
print("ERROR: no [devices] entries in config", file=sys.stderr)
sys.exit(1)
# {MAC_UPPER: nickname}
DEVICES = {mac.upper(): nick for mac, nick in _cfg.items("devices")}
MQTT_HOST = _cfg.get("mqtt", "host")
MQTT_PORT = _cfg.getint("mqtt", "port", fallback=1883)
MQTT_USER = _cfg.get("mqtt", "user")
MQTT_PASSWORD = _cfg.get("mqtt", "password")
HOSTNAME = socket.gethostname()
MQTT_TOPIC_BASE = _cfg.get("mqtt", "topic", fallback=f"homeassistant/{HOSTNAME}").replace("$(hostname)", HOSTNAME)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
def state_topic(nickname: str) -> str:
return f"{MQTT_TOPIC_BASE}/{nickname}/state"
def attr_topic(nickname: str) -> str:
return f"{MQTT_TOPIC_BASE}/{nickname}/attributes"
def discovery_topic(nickname: str) -> str:
return f"homeassistant/device_tracker/{HOSTNAME}_{nickname}/config"
def build_discovery_payload(mac: str, nickname: str) -> dict:
return {
"name": f"{nickname} {mac}",
"unique_id": f"{HOSTNAME}_{nickname}",
"state_topic": state_topic(nickname),
"json_attributes_topic": attr_topic(nickname),
"payload_home": "home",
"payload_not_home": "not_home",
"source_type": "bluetooth_le",
"device": {
"identifiers": [f"{HOSTNAME}_{nickname}"],
"name": f"{nickname} {mac}",
"connections": [["mac", mac]],
},
}
# Pre-built once; reused on every MQTT reconnect
_DISCOVERY_PAYLOADS = {
mac: json.dumps(build_discovery_payload(mac, nick))
for mac, nick in DEVICES.items()
}
def mqtt_connect() -> mqtt.Client:
client = mqtt.Client(client_id=f"ble_scanner_{HOSTNAME}", clean_session=True)
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
def on_connect(c, userdata, flags, rc):
if rc == 0:
log.info("MQTT connected")
for mac, nick in DEVICES.items():
c.publish(discovery_topic(nick), _DISCOVERY_PAYLOADS[mac], retain=True)
else:
log.error("MQTT connect failed, rc=%d", rc)
def on_disconnect(c, userdata, rc):
log.warning("MQTT disconnected rc=%d, will reconnect", rc)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect_async(MQTT_HOST, MQTT_PORT, keepalive=60)
client.loop_start()
return client
async def scan_loop(client: mqtt.Client):
# Populated only when a device has actually been seen
last_seen: dict[str, float] = {}
last_rssi: dict[str, int] = {}
last_state: dict[str, str] = {}
def publish_state(mac: str, nick: str, state: str, rssi: int | None = None):
if state != last_state.get(mac):
client.publish(state_topic(nick), state, retain=True)
log.info("[%s] state -> %s", nick, state)
last_state[mac] = state
# Only publish attributes when the device is present
if rssi is not None:
attrs = {
"last_seen": datetime.now(timezone.utc).isoformat(),
"rssi": rssi,
"mac": mac,
}
client.publish(attr_topic(nick), json.dumps(attrs), retain=True)
def detection_callback(device, advertisement_data):
mac = device.address.upper()
if mac in DEVICES:
last_seen[mac] = time.monotonic()
last_rssi[mac] = advertisement_data.rssi
# Publish home immediately on first detection (don't wait for the check loop)
publish_state(mac, DEVICES[mac], "home", advertisement_data.rssi)
log.info("Scanning for %d device(s), expiry=%ss", len(DEVICES), EXPIRY_TIME)
while True:
try:
async with BleakScanner(detection_callback=detection_callback):
while True:
await asyncio.sleep(SCAN_DURATION)
now = time.monotonic()
for mac, nick in DEVICES.items():
# Unseen devices use a virtual last_seen of EXPIRY_TIME ago
elapsed = now - last_seen.get(mac, now - EXPIRY_TIME)
if elapsed >= EXPIRY_TIME:
log.debug("[%s] not seen (%.0fs ago)", nick, elapsed)
publish_state(mac, nick, "not_home")
else:
log.debug("[%s] seen RSSI=%s", nick, last_rssi.get(mac))
except asyncio.CancelledError:
raise
except Exception as exc:
log.error("Scan error: %s", exc)
if "powered" in str(exc).lower() or "adapter" in str(exc).lower():
log.info("Waiting 10s for Bluetooth adapter...")
await asyncio.sleep(10)
async def main():
client = mqtt_connect()
# Give MQTT a moment to connect
await asyncio.sleep(1)
loop = asyncio.get_running_loop()
stop = asyncio.Event()
def _signal_handler(*_):
log.info("Shutting down...")
stop.set()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _signal_handler)
scan_task = asyncio.create_task(scan_loop(client))
await stop.wait()
scan_task.cancel()
try:
await scan_task
except asyncio.CancelledError:
pass
for mac, nick in DEVICES.items():
client.publish(state_topic(nick), "not_home", retain=True)
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
asyncio.run(main())