1+ import configparser
12import json
23import os
4+ import socket
35import subprocess
46import time
5- from enum import IntEnum
67from functools import cached_property , lru_cache
78from pathlib import Path
89
1516from openpilot .system .hardware .tici .pins import GPIO
1617from openpilot .system .hardware .tici .amplifier import Amplifier
1718
18- NM = 'org.freedesktop.NetworkManager'
19- NM_CON_ACT = NM + '.Connection.Active'
20- NM_DEV = NM + '.Device'
21- NM_DEV_WL = NM + '.Device.Wireless'
22- NM_AP = NM + '.AccessPoint'
23- DBUS_PROPS = 'org.freedesktop.DBus.Properties'
24-
25- class NMMetered (IntEnum ):
26- NM_METERED_UNKNOWN = 0
27- NM_METERED_YES = 1
28- NM_METERED_NO = 2
29- NM_METERED_GUESS_YES = 3
30- NM_METERED_GUESS_NO = 4
31-
3219MODEM_STATE_PATH = "/dev/shm/modem"
33- TIMEOUT = 0.1
3420
3521NetworkType = log .DeviceState .NetworkType
3622NetworkStrength = log .DeviceState .NetworkStrength
@@ -52,16 +38,27 @@ def get_device_type():
5238 model = f .read ().strip ('\x00 ' )
5339 return model .split ('comma ' )[- 1 ]
5440
55- class Tici (HardwareBase ):
56- @cached_property
57- def bus (self ):
58- import dbus
59- return dbus .SystemBus ()
41+ def wpa_supplicant_cmd (cmd : str , timeout : float = 0.2 ) -> dict [str , str ]:
42+ with socket .socket (socket .AF_UNIX , socket .SOCK_DGRAM ) as sock :
43+ sock .settimeout (timeout )
44+ sock .bind (f"\0 openpilot-wpa-{ os .getpid ()} -{ time .monotonic_ns ()} " )
45+ sock .connect ("/run/wpa_supplicant/wlan0" )
46+ sock .send (cmd .encode ())
6047
61- @cached_property
62- def nm (self ):
63- return self .bus .get_object (NM , '/org/freedesktop/NetworkManager' )
48+ while True :
49+ out = sock .recv (8192 ).decode ("utf-8" , "replace" )
50+ if out .startswith ("<" ):
51+ continue
52+ if out .startswith ("FAIL" ):
53+ return {}
54+ return dict (l .split ("=" , 1 ) for l in out .splitlines () if "=" in l )
6455
56+ def get_default_route_iface ():
57+ with open ("/proc/net/route" ) as f :
58+ routes = [(int (route [6 ]), route [0 ]) for line in f .readlines ()[1 :] if (route := line .split ())[1 ] == "00000000" and int (route [3 ], 16 ) & 0x1 ]
59+ return min (routes )[1 ] if routes else None
60+
61+ class Tici (HardwareBase ):
6562 @cached_property
6663 def amplifier (self ):
6764 if self .get_device_type () == "mici" :
@@ -115,13 +112,11 @@ def set_ir_power(self, percent: int):
115112
116113 def get_network_type (self ):
117114 try :
118- primary_connection = self .nm .Get (NM , 'PrimaryConnection' , dbus_interface = DBUS_PROPS , timeout = TIMEOUT )
119- primary_connection = self .bus .get_object (NM , primary_connection )
120- primary_type = primary_connection .Get (NM_CON_ACT , 'Type' , dbus_interface = DBUS_PROPS , timeout = TIMEOUT )
121- if primary_type == '802-3-ethernet' :
122- return NetworkType .ethernet
123- elif primary_type == '802-11-wireless' :
124- return NetworkType .wifi
115+ if (iface := get_default_route_iface ()):
116+ if iface .startswith ('wlan' ):
117+ return NetworkType .wifi
118+ if iface .startswith ('eth' ):
119+ return NetworkType .ethernet
125120 except Exception :
126121 pass
127122
@@ -138,10 +133,6 @@ def get_network_type(self):
138133 return NetworkType .cell2G
139134 return NetworkType .none
140135
141- def get_wlan (self ):
142- wlan_path = self .nm .GetDeviceByIpIface ('wlan0' , dbus_interface = NM , timeout = TIMEOUT )
143- return self .bus .get_object (NM , wlan_path )
144-
145136 def get_sim_info (self ):
146137 ms = self .get_modem_state ()
147138 sim_id = ms .get ('iccid' , '' )
@@ -191,13 +182,14 @@ def get_network_strength(self, network_type):
191182 try :
192183 if network_type == NetworkType .none :
193184 pass
185+ elif network_type == NetworkType .ethernet :
186+ network_strength = NetworkStrength .great
194187 elif network_type == NetworkType .wifi :
195- wlan = self .get_wlan ()
196- active_ap_path = wlan .Get (NM_DEV_WL , 'ActiveAccessPoint' , dbus_interface = DBUS_PROPS , timeout = TIMEOUT )
197- if active_ap_path != "/" :
198- active_ap = self .bus .get_object (NM , active_ap_path )
199- strength = int (active_ap .Get (NM_AP , 'Strength' , dbus_interface = DBUS_PROPS , timeout = TIMEOUT ))
200- network_strength = self .parse_strength (strength )
188+ rssi = wpa_supplicant_cmd ("SIGNAL_POLL" ).get ("RSSI" )
189+ if rssi is not None :
190+ dbm = int (rssi )
191+ if - 100 < dbm <= 0 :
192+ network_strength = self .parse_strength (120 + max (- 100 , min (- 20 , dbm )))
201193 else : # Cellular
202194 network_strength = self .parse_strength (self .get_modem_state ().get ('signal_quality' , 0 ))
203195 except Exception :
@@ -210,17 +202,32 @@ def get_network_metered(self, network_type) -> bool:
210202 from openpilot .common .params import Params
211203 return Params ().get_bool ("GsmMetered" )
212204 try :
213- primary_connection = self .nm .Get (NM , 'PrimaryConnection' , dbus_interface = DBUS_PROPS , timeout = TIMEOUT )
214- primary_connection = self .bus .get_object (NM , primary_connection )
215- primary_devices = primary_connection .Get (NM_CON_ACT , 'Devices' , dbus_interface = DBUS_PROPS , timeout = TIMEOUT )
216-
217- for dev in primary_devices :
218- dev_obj = self .bus .get_object (NM , str (dev ))
219- metered_prop = dev_obj .Get (NM_DEV , 'Metered' , dbus_interface = DBUS_PROPS , timeout = TIMEOUT )
220-
221- if network_type == NetworkType .wifi :
222- if metered_prop in [NMMetered .NM_METERED_YES , NMMetered .NM_METERED_GUESS_YES ]:
223- return True
205+ if network_type == NetworkType .wifi :
206+ ssid = wpa_supplicant_cmd ("STATUS" ).get ("ssid" , "" )
207+ if ssid :
208+ # wpa_supplicant escapes non-printable bytes as \xNN; NM keyfile stores ASCII SSIDs as a literal and others as a byte;byte; list
209+ ssid_bytes = ssid .encode ().decode ('unicode_escape' ).encode ('latin-1' )
210+ ssid_keyfile_list = ';' .join (str (b ) for b in ssid_bytes ) + ';'
211+
212+ nm_dirs = ("/run/NetworkManager/system-connections" , "/data/etc/NetworkManager/system-connections" )
213+ for fpath in (p for d in nm_dirs for p in Path (d ).glob ("*.nmconnection" )):
214+ raw = sudo_read (str (fpath ))
215+ if not raw :
216+ continue
217+ cp = configparser .ConfigParser (interpolation = None )
218+ try :
219+ cp .read_string (raw )
220+ keyfile_ssid = cp .get ("wifi" , "ssid" , fallback = "" )
221+ if keyfile_ssid != ssid and keyfile_ssid != ssid_keyfile_list :
222+ continue
223+ metered = cp .getint ("connection" , "metered" , fallback = 0 )
224+ except (configparser .Error , ValueError ):
225+ continue
226+ if metered == 1 : # NM_METERED_YES
227+ return True
228+ if metered == 2 : # NM_METERED_NO
229+ return False
230+ break
224231 except Exception :
225232 pass
226233
0 commit comments