Skip to content

Commit 6914e06

Browse files
committed
chg: refactor and cleanup
1 parent 123ae87 commit 6914e06

File tree

1 file changed

+93
-60
lines changed

1 file changed

+93
-60
lines changed

bin/wireproxy_manager.py

Lines changed: 93 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import socket
1010
import urllib.request
1111

12+
from collections import defaultdict
1213
from dataclasses import dataclass
1314
from logging import Logger
1415
from pathlib import Path
@@ -19,7 +20,7 @@
1920
FileCreatedEvent, FileDeletedEvent, DirModifiedEvent, FileModifiedEvent)
2021
from watchdog.observers import Observer
2122

22-
from lacus.default import AbstractManager, get_config, get_homedir
23+
from lacus.default import AbstractManager, get_config, get_homedir, safe_create_dir
2324
from lacus.default.exceptions import ConfigError
2425

2526
logging.config.dictConfig(get_config('logging'))
@@ -28,6 +29,7 @@
2829
@dataclass
2930
class WireProxy:
3031

32+
name: str
3133
process: Popen[bytes]
3234
config_file: Path
3335
health_endpoint: str
@@ -75,44 +77,51 @@ def __init__(self, path_to_wireproxy: Path, configs_dir: Path, logger: Logger) -
7577
self._init_configs()
7678
self.launch_all_wireproxies()
7779

78-
def _check_local_port_in_config(self, config_name: str, address: str, port: int | str) -> None:
80+
def _add_local_port_in_config(self, config_name: str, address: str, port: int | str) -> None:
81+
"""Add port in the dict of local ports currently in use."""
7982
if address in ['127.0.0.1', 'localhost']:
8083
p = int(port)
8184
if p in self.used_local_ports and self.used_local_ports[p] != config_name:
8285
raise ConfigError(f"Port {p} already in use by another proxy: {self.used_local_ports[p]}.")
8386
self.used_local_ports[p] = config_name
8487

85-
def check_port_in_use(self, port: int) -> bool:
88+
def _port_in_use(self, port: int) -> bool:
89+
"""Check if the port in currently in use on the machine."""
8690
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
8791
return s.connect_ex(('127.0.0.1', port)) == 0
8892

8993
def _init_configs(self) -> None:
94+
"""Initialize the proxies.json config file used by Lacus to expose the proxies"""
9095
# Load the lacus proxy config file
9196
if not self.proxies_config_path.exists():
9297
self.proxies = {}
9398
else:
9499
with self.proxies_config_path.open('r') as f:
95100
self.proxies = json.load(f)
96101

102+
# Add the ports in the config in the used ports
97103
for name, p in self.proxies.items():
98104
if p.get('proxy_url'):
99105
proxy_url = urlparse(p['proxy_url'])
100-
self._check_local_port_in_config(name, proxy_url.hostname, proxy_url.port)
106+
self._add_local_port_in_config(name, proxy_url.hostname, proxy_url.port)
101107

102-
# Load the wireguard configs
108+
# Add the ports in the wireguard config in the used ports
103109
for config_file in self.configs_dir.glob('*.conf'):
104110
config_name = config_file.stem
105111
wg_config = configparser.ConfigParser()
106112
wg_config.read(config_file)
107113
if socks5_address := wg_config.get('Socks5', 'BindAddress', fallback=None):
108114
address, _port = socks5_address.split(':')
109-
self._check_local_port_in_config(config_name, address, _port)
115+
self._add_local_port_in_config(config_name, address, _port)
110116

117+
# Apply config to all wireguard configs
111118
for config_file in self.configs_dir.glob('*.conf'):
112-
self._sync_wireguard_proxies(config_file)
119+
self.sync_wireguard_proxies(config_file)
113120

114-
def _sync_wireguard_proxies(self, wiregard_config_path: Path) -> bool:
121+
def sync_wireguard_proxies(self, wiregard_config_path: Path) -> bool:
122+
"""Synchronize the wireguard config with the proxies.json config file"""
115123
config_name = wiregard_config_path.stem
124+
wg_config_changed = False
116125
wg_config = configparser.ConfigParser()
117126
wg_config.read(wiregard_config_path)
118127
proxy_config = self.proxies.get(config_name)
@@ -123,20 +132,20 @@ def _sync_wireguard_proxies(self, wiregard_config_path: Path) -> bool:
123132
'meta': {'provider': 'wireguard'}
124133
}
125134

126-
wg_config_changed = False
127135
if proxy_config.get('proxy_url'):
128-
# Case 1: Proxy config exists in proxy config only
136+
# Case 1: Proxy config exists in proxy config, force that config in the wireproxy config
129137
self.logger.debug(f'Setting proxy {config_name} in {config_name}.')
130138
proxy_url = urlparse(proxy_config['proxy_url'])
131-
self._check_local_port_in_config(config_name, proxy_url.hostname, proxy_url.port)
139+
self._add_local_port_in_config(config_name, proxy_url.hostname, proxy_url.port)
132140
if wg_config.get('Socks5', 'BindAddress', fallback=None) != proxy_url.netloc:
141+
# If the proxy URL in the wireguard config is None or different, set it to the value from proxies.json
133142
wg_config['Socks5'] = {'BindAddress': proxy_url.netloc}
134143
wg_config_changed = True
135144
else:
136-
# Case 2: Proxy config does not exist (create new one)
145+
# Case 2: Proxy URL does not exist (create new one)
137146
# find first unused port in interval
138147
for port in range(self.min_port, self.max_port):
139-
if port not in self.used_local_ports and not self.check_port_in_use(port):
148+
if port not in self.used_local_ports and not self._port_in_use(port):
140149
# got a free port
141150
break
142151
else:
@@ -147,12 +156,12 @@ def _sync_wireguard_proxies(self, wiregard_config_path: Path) -> bool:
147156
wg_config_changed = True
148157
self.used_local_ports[port] = config_name
149158

150-
# Make sure the DNS is set in the proxy config
159+
# Make sure the DNS is set in the proxies.json config
151160
if wg_config.get('Interface', 'DNS', fallback=None):
152161
self.logger.debug(f'Setting DNS resolver for {config_name}.')
153162
proxy_config['dns_resolver'] = wg_config['Interface']['DNS']
154163

155-
# Make sure the checkalive is set in the wg config
164+
# Make sure a checkalive is set in the wireguard config
156165
if not wg_config.get('Interface', 'CheckAlive', fallback=None):
157166
self.logger.debug(f'Setting CheckAlive for {config_name}.')
158167
wg_config['Interface']['CheckAlive'] = self.default_checkalive
@@ -163,15 +172,16 @@ def _sync_wireguard_proxies(self, wiregard_config_path: Path) -> bool:
163172
wg_config.write(f)
164173

165174
if self.proxies.get(config_name) != proxy_config:
166-
# It's been changed
175+
# It's been changed, update and save
167176
self.proxies[config_name] = proxy_config
168177
with self.proxies_config_path.open('w') as f:
169178
json.dump(self.proxies, f, indent=4, sort_keys=True)
170179
return wg_config_changed
171180

172181
def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None:
173-
# if wireguard *.conf -> update proxies.json -> initialize wireproxy config
174-
# if proxies.json -> N/A
182+
"""A new file was created. Only for new wireguard config files.
183+
Steps: update proxies.json -> initialize wireproxy config -> launch wireproxy
184+
"""
175185
filepath = Path(str(event.src_path))
176186
if isinstance(event, FileCreatedEvent) and filepath.suffix == '.conf':
177187
if filepath.stem in self.proxies:
@@ -181,22 +191,22 @@ def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None:
181191
else:
182192
self.logger.info(f'Got new wireguard config file: {filepath.stem}.')
183193
try:
184-
self._sync_wireguard_proxies(filepath)
194+
self.sync_wireguard_proxies(filepath)
185195
self.launch_wireproxy(filepath.stem)
186196
except ConfigError as e:
187197
self.logger.warning(f"Unable to create the new proxy: {e}")
188198

189199
def on_modified(self, event: DirModifiedEvent | FileModifiedEvent) -> None:
190-
# NOTE: modification **only** happens in the proxies.json
191-
# if wireguard *.conf -> N/A
192-
# if proxies.json -> update wireguard config
200+
""" A file was modified. Only for proxies.json, any change directly made in a wireproxy config file is reverted.
201+
Steps: update wireproxy config file -> restart wireproxy
202+
"""
193203
filepath = Path(str(event.src_path))
194204
if isinstance(event, FileModifiedEvent) and filepath.suffix == '.conf':
195205
# Modifying the wireproxy config file isn't allowed, but if it happens, we revert it.
196206
try:
197207
self.logger.info(f'Wireproxy file modified: {filepath}. Apply the json config')
198-
if self._sync_wireguard_proxies(filepath):
199-
# Do nothing here, the wireproxy config has been reverted, just warn te user.
208+
if self.sync_wireguard_proxies(filepath):
209+
# The wireproxy config has been reverted, stop and restart
200210
self.stop_wireproxy(filepath.stem)
201211
self.launch_wireproxy(filepath.stem)
202212
except ConfigError as e:
@@ -209,24 +219,12 @@ def on_modified(self, event: DirModifiedEvent | FileModifiedEvent) -> None:
209219
return
210220

211221
self.logger.info('Proxies file changed.')
212-
self.proxies = proxies
213222
self._init_configs()
214-
for name in self.proxies.keys():
215-
filepath = self.configs_dir / f'{name}.conf'
216-
if not filepath.exists():
217-
self.logger.debug(f"Config file for proxy {name} does not exist.")
218-
continue
219-
try:
220-
if self._sync_wireguard_proxies(filepath):
221-
# it changed, stop ond config & start new one
222-
self.stop_wireproxy(name)
223-
self.launch_wireproxy(name)
224-
except ConfigError as e:
225-
self.logger.warning(f"Unable to update the proxy: {e}")
223+
self.launch_all_wireproxies()
226224

227225
def on_deleted(self, event: DirDeletedEvent | FileDeletedEvent) -> None:
228-
# if wireguard *.conf -> update proxies.json
229-
# if proxies.json -> re-run init
226+
""" A file was deleted. If it is a wireguard config file, remove it from the proxies.json config file.
227+
It is it the proxies.json file, reinitialize it from the wireguard config files."""
230228
filepath = Path(str(event.src_path))
231229
if filepath.exists():
232230
# NOTE: sometimes, modifying the file triggers a delete event
@@ -244,61 +242,93 @@ def on_deleted(self, event: DirDeletedEvent | FileDeletedEvent) -> None:
244242
elif isinstance(event, FileDeletedEvent) and filepath.name == 'proxies.json':
245243
self.logger.info(f'Proxies file deleted: {filepath}, reseting.')
246244
self._init_configs()
245+
self.launch_all_wireproxies()
247246

248247
def remove_proxy(self, name: str) -> None:
248+
"""Remove the proxy entry from proxies.json."""
249249
if self.proxies.pop(name, None):
250250
with self.proxies_config_path.open('w') as f:
251251
json.dump(self.proxies, f, indent=4, sort_keys=True)
252252

253253
# #### Manage proxy services #### #
254254

255255
def launch_wireproxy(self, name: str) -> None:
256+
"""Launch wireproxy on a config file, auto-generate a port for healthchecks."""
256257
if name in self.wireproxies:
257258
self.logger.info(f"Wireproxy for {name} already exists.")
258-
return
259+
if self.wireproxies[name].is_running():
260+
self.logger.info(f"Wireproxy for {name} is already running.")
261+
return
262+
else:
263+
self.logger.warning(f"Wireproxy for {name} is not running, restarting it.")
264+
self.wireproxies.pop(name, None)
259265

260266
config_file = self.configs_dir / f'{name}.conf'
261267
wg_config = configparser.ConfigParser()
262268
wg_config.read(config_file)
263269

264270
address, _ = wg_config.get('Socks5', 'BindAddress').split(':')
265271
for port in range(self.min_port, self.max_port):
266-
if port not in self.used_local_ports and not self.check_port_in_use(port):
272+
if port not in self.used_local_ports and not self._port_in_use(port):
267273
self.used_local_ports[port] = f'{config_file.stem}_health'
268274
break
269275
else:
270276
raise Exception(f"[Health] No free port found in range {self.min_port}-{self.max_port}")
271277
health_endpoint = f'{address}:{port}'
272278
process = Popen([self.wireproxy, '--config', config_file, '--info', health_endpoint], stdout=PIPE, stderr=PIPE)
273279

274-
self.wireproxies[config_file.stem] = WireProxy(process=process,
280+
self.wireproxies[config_file.stem] = WireProxy(name=name, process=process,
275281
config_file=config_file,
276282
health_endpoint=health_endpoint)
277283

278284
def launch_all_wireproxies(self) -> None:
285+
"""Launch wireproxies on each of the config files in the directory."""
279286
for config_file in self.configs_dir.glob('*.conf'):
280287
self.launch_wireproxy(config_file.stem)
281288
self.logger.info("All wireproxies launched.")
282289

283290
def stop_wireproxy(self, name: str) -> None:
291+
"""Stop a specific wireproxy, update the proxies.json config file."""
284292
if name not in self.wireproxies:
285293
self.logger.debug(f"Wireproxy {name} is not running.")
286294
return
287295
self.wireproxies[name].stop()
288296
self.wireproxies.pop(name, None)
297+
self.remove_proxy(name)
289298
self.logger.info(f"Wireproxy for {name} stopped.")
290299

291300
def stop_all_wireproxies(self) -> None:
301+
"""Stop all the wireproxies."""
292302
for config_file in self.configs_dir.glob('*.conf'):
293303
self.stop_wireproxy(config_file.stem)
294304
self.logger.info("All wireproxies stopped.")
295305

296306
def clean_used_ports(self) -> None:
297-
# once everything is running, regularly check which ports are actually un use
307+
"""Once everything is running, make sure the ports are still in use. (they won't if a wireproxy is stopped)"""
298308
for port in list(self.used_local_ports.keys()):
299-
if not self.check_port_in_use(port):
309+
if not self._port_in_use(port):
300310
self.used_local_ports.pop(port)
301311

312+
def is_wireproxy_runinng(self, name: str) -> bool:
313+
"""Check if the wireproxy is running"""
314+
if name not in self.wireproxies:
315+
return False
316+
return self.wireproxies[name].is_running()
317+
318+
def is_wireproxy_healthy(self, name: str) -> bool:
319+
"""Check if the wireproxy is healthy"""
320+
if name not in self.wireproxies:
321+
self.logger.warning("Unable to check health of wireproxy, {name} is unknown.")
322+
return False
323+
if self.wireproxies[name].is_healthy():
324+
return True
325+
if self.wireproxies[name].failed_healthcheck > self.max_failed_healthcheck:
326+
self.logger.warning(f"{name} failed too many healthcheck.")
327+
return False
328+
else:
329+
self.logger.info(f"{name} failed healthcheck, retry later.")
330+
return True
331+
302332

303333
class WireProxyManager(AbstractManager):
304334

@@ -311,6 +341,11 @@ def __init__(self, loglevel: int | None=None) -> None:
311341
urllib3_logger = logging.getLogger('urllib3')
312342
urllib3_logger.setLevel(logging.WARNING)
313343

344+
# The max amount of time a proxy can be restarted before it is archived
345+
self.max_restarts = 3
346+
self.restart_counter: dict[str, int] = defaultdict(int)
347+
safe_create_dir(get_homedir() / 'config' / 'archived_wireproxies')
348+
314349
path_to_wireproxy = Path(get_config('generic', 'wireproxy_path'))
315350
if not path_to_wireproxy.exists() or not path_to_wireproxy.is_file():
316351
raise ConfigError(f"Wireproxy executable not found at {path_to_wireproxy}.")
@@ -325,24 +360,22 @@ def __init__(self, loglevel: int | None=None) -> None:
325360
def _to_run_forever(self) -> None:
326361
# Monitor the status of the proxies
327362
for config_file in self.configs_dir.glob('*.conf'):
328-
if (config_file.stem not in self.wpm.wireproxies
329-
or not self.wpm.wireproxies[config_file.stem].is_running()):
330-
self.logger.info(f'{config_file.stem} is not running.')
331-
self.wpm.remove_proxy(config_file.stem)
332-
self.wpm.launch_wireproxy(config_file.stem)
333-
continue
334-
335-
self.logger.debug(f'{config_file.stem} is running.')
336-
if self.wpm.wireproxies[config_file.stem].is_healthy():
337-
self.logger.debug(f'{config_file.stem} is healthy.')
338-
pass
339-
else:
340-
if self.wpm.wireproxies[config_file.stem].failed_healthcheck > self.wpm.max_failed_healthcheck:
341-
self.logger.warning(f'{config_file.stem} failed too many healthcheck.')
363+
if self.wpm.is_wireproxy_runinng(config_file.stem):
364+
self.logger.debug(f'{config_file.stem} is running.')
365+
if self.wpm.is_wireproxy_healthy(config_file.stem):
366+
self.logger.debug(f'{config_file.stem} is healthy.')
367+
else:
342368
self.wpm.stop_wireproxy(config_file.stem)
343-
self.wpm.remove_proxy(config_file.stem)
369+
else:
370+
if self.restart_counter[config_file.stem] >= self.max_restarts:
371+
self.logger.warning(f'{config_file.stem} has been restarted too many times, archiving.')
372+
config_file.rename(get_homedir() / 'config' / 'archived_wireproxies' / config_file.name)
373+
self.logger.info(f"Wireproxy {config_file.name} archived.")
344374
else:
345-
self.logger.info(f'{config_file.stem} failed healthcheck, retry later.')
375+
self.wpm.sync_wireguard_proxies(config_file)
376+
self.wpm.launch_wireproxy(config_file.stem)
377+
self.restart_counter[config_file.stem] += 1
378+
self.logger.info(f'{config_file.stem} was not running, restart counter: {self.restart_counter[config_file.stem]}')
346379
self.wpm.clean_used_ports()
347380

348381
def _wait_to_finish(self) -> None:

0 commit comments

Comments
 (0)