-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathastwifid.py
More file actions
265 lines (228 loc) · 9.42 KB
/
astwifid.py
File metadata and controls
265 lines (228 loc) · 9.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
"""
Astoria WiFi Daemon.
Manages a WiFi hotspot for the robot.
"""
import asyncio
import logging
import os
import signal
import tempfile
from json import JSONDecodeError, loads
from pathlib import Path
from typing import IO, Match, Optional
import click
from pydantic import ValidationError, parse_obj_as
from astoria.common.components import StateManager
from astoria.common.ipc import MetadataManagerMessage, WiFiManagerMessage
from astoria.common.metadata import Metadata
LOGGER = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
@click.command("astwifid")
@click.option("-v", "--verbose", is_flag=True)
@click.option("-c", "--config-file", type=click.Path(exists=True))
def main(*, verbose: bool, config_file: Optional[str]) -> None:
"""The WiFi Daemon Application Entrypoint."""
wifid = WiFiHotspotDaemon(verbose, config_file)
loop.run_until_complete(wifid.run())
class WiFiHotspotDaemon(StateManager[WiFiManagerMessage]):
"""
Hotspot management daemon.
Receives metadata information from astmetad and manages the WiFi hotspot.
"""
name = "astwifid"
dependencies = ["astmetad"]
def _init(self) -> None:
self._lifecycle: Optional[WiFiHotspotLifeCycle] = None
self._mqtt.subscribe("astmetad", self.handle_astmetad_message)
async def main(self) -> None:
"""Main routine for astwifid."""
# Wait whilst the program is running.
await self.wait_loop()
# Stop the hotspot when we shutdown
if self._lifecycle:
await self._lifecycle.stop_hotspot()
@property
def offline_status(self) -> WiFiManagerMessage:
"""
Status to publish when the manager goes offline.
This status should ensure that any other components relying
on this data go into a safe state.
"""
return WiFiManagerMessage(
status=WiFiManagerMessage.Status.STOPPED,
hotspot_running=False,
)
async def handle_astmetad_message(
self,
match: Match[str],
payload: str,
) -> None:
"""Event handler for metadata changes."""
if payload:
try:
data = loads(payload)
metadata_manager_message = parse_obj_as(MetadataManagerMessage, data)
await self.handle_metadata(metadata_manager_message.metadata)
except ValidationError:
LOGGER.warning("Received bad metadata manager message.")
except JSONDecodeError:
LOGGER.warning("Received bad JSON in metadata manager message.")
else:
LOGGER.warning("Received empty metadata manager message.")
async def handle_metadata(self, metadata: Metadata) -> None:
"""
Update the state of the hotspot based on the current metadata.
:param metadata: The metadata included in the update.
"""
wifi_interface = Path(f"/sys/class/net/{self.config.wifi.interface}")
if self._lifecycle:
if metadata.is_wifi_valid() and wifi_interface.exists():
if self._lifecycle.has_metadata_changed(metadata):
await self._lifecycle.stop_hotspot()
self._lifecycle = WiFiHotspotLifeCycle(
# The types here are checked by is_wifi_valid
metadata.wifi_ssid, # type: ignore
metadata.wifi_psk, # type: ignore
metadata.wifi_region, # type: ignore
self.config.wifi.interface,
self.config.wifi.bridge,
self.config.wifi.enable_wpa3,
)
self.status = WiFiManagerMessage(
status=WiFiManagerMessage.Status.RUNNING,
hotspot_running=True,
)
asyncio.ensure_future(self._lifecycle.run_hotspot())
else:
# Turn it off!
await self._lifecycle.stop_hotspot()
self.status = WiFiManagerMessage(
status=WiFiManagerMessage.Status.RUNNING,
hotspot_running=False,
)
self._lifecycle = None
else:
if metadata.is_wifi_valid() and wifi_interface.exists():
# Turn it on!
self._lifecycle = WiFiHotspotLifeCycle(
# The types here are checked by is_wifi_valid
metadata.wifi_ssid, # type: ignore
metadata.wifi_psk, # type: ignore
metadata.wifi_region, # type: ignore
self.config.wifi.interface,
self.config.wifi.bridge,
self.config.wifi.enable_wpa3,
)
self.status = WiFiManagerMessage(
status=WiFiManagerMessage.Status.RUNNING,
hotspot_running=True,
)
asyncio.ensure_future(self._lifecycle.run_hotspot())
class WiFiHotspotLifeCycle:
"""Manages the lifecycle of the hostapd process."""
HOSTAPD_BINARY: str = "hostapd"
def __init__(
self,
ssid: str,
psk: str,
region: str,
interface: str,
bridge: str,
enable_wpa3: bool,
) -> None:
LOGGER.info("Starting WiFi Hotspot lifecycle")
self._ssid: str = ssid
self._psk: str = psk
self._region: str = region
self._interface: str = interface
self._bridge: str = bridge
self._enable_wpa3: bool = enable_wpa3
self._config_file: Optional[IO[bytes]] = None
self._proc: Optional[asyncio.subprocess.Process] = None
self._running: bool = False
def has_metadata_changed(self, metadata: Metadata) -> bool:
"""Checks if the hotspot's properties match that of a set of metadata."""
return not all(
[
self._ssid == metadata.wifi_ssid,
self._psk == metadata.wifi_psk,
self._region == metadata.wifi_region,
],
)
async def run_hotspot(self) -> None:
"""Starts the hostapd process."""
self._running = True
LOGGER.info(f"Starting WiFi Hotspot \"{self._ssid}\" on {self._interface}")
self.generate_hostapd_config()
if self._config_file is not None:
while self._running:
self._proc = await asyncio.create_subprocess_exec(
self.HOSTAPD_BINARY,
self._config_file.name,
)
LOGGER.info(f"{self.HOSTAPD_BINARY} started wth PID: {self._proc.pid}")
sc = await self._proc.wait()
if sc is None:
continue
LOGGER.info(f"{self.HOSTAPD_BINARY} terminated with status code {sc}")
else:
raise RuntimeError( # pragma: nocover
"Tried to start hotspot, but the config file was not set.",
)
def generate_hostapd_config(self) -> None:
"""Generates a configuration file for hostapd based on the current metadata."""
self._config_file = tempfile.NamedTemporaryFile(delete=False)
LOGGER.debug(
f"Writing {self.HOSTAPD_BINARY} configuration to {self._config_file.name}",
)
config = {
"interface": self._interface,
"bridge": self._bridge,
"ssid": self._ssid,
"country_code": self._region,
"channel": 6,
"hw_mode": "g",
# Bit field: bit0 = WPA, bit1 = WPA2
"wpa": 2,
# Bit field: 1=wpa, 2=wep, 3=both
"auth_algs": 1,
# Set of accepted cipher suites; disabling insecure TKIP
"wpa_pairwise": "CCMP",
# Set of accepted key management algorithms
# SAE = WPA3, WPA-PSK = WPA2
"wpa_key_mgmt": "WPA-PSK",
"wpa_passphrase": self._psk,
}
if self._enable_wpa3:
config["wpa_key_mgmt"] = "SAE WPA-PSK"
# Management frame support (802.11w)
# Most client devices will not connect to a
# WPA3-SAE secured AP unless it is using 802.11w.
# This must be supported by both the AP and client.
config["ieee80211w"] = 2
config["sae_require_mfp"] = 1
contents = "\n".join(f"{k}={v}" for k, v in config.items())
self._config_file.write(contents.encode())
self._config_file.close()
async def stop_hotspot(self) -> None:
"""Stops the hostapd process."""
self._running = False
LOGGER.info("Stopping WiFi Hotspot")
if self._proc is not None:
self._proc.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(self._proc.communicate(), timeout=5.0)
except asyncio.TimeoutError:
if self._proc is not None:
LOGGER.info(f"Sent SIGKILL to pid {self._proc.pid}")
self._proc.send_signal(signal.SIGKILL)
except AttributeError:
# Under some circumstances, there is a race condition such that
# _proc becomes None whilst the communicate timeout is running.
# We want to catch and discard this error.
pass
self._proc = None
if self._config_file:
os.unlink(self._config_file.name)
if __name__ == "__main__":
main()