Skip to content

Commit 3f21c24

Browse files
committed
Initial commit.
0 parents  commit 3f21c24

3 files changed

Lines changed: 253 additions & 0 deletions

File tree

CaptivePortal.py

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import usocket
2+
import utime
3+
import _thread
4+
import dataCall
5+
import log
6+
from misc import USBNET
7+
from misc import Power
8+
9+
class CaptivePortal:
10+
def __init__(self, target_url, dns_whitelist, real_dns_server=("8.8.8.8", 53)):
11+
"""
12+
Initializing Captive Portal parameters。
13+
:param target_url: The target URL for the redirect (如 "https://example.com/")
14+
:param dns_whitelist: A list of DNS whitelists that allow normal resolution
15+
:param real_dns_server: Real DNS server address and port
16+
"""
17+
self.target_url = target_url
18+
self.dns_whitelist = dns_whitelist
19+
self.real_dns_server = real_dns_server
20+
self.portal_active = True
21+
self.http_thread = None
22+
self.dns_thread = None
23+
log.basicConfig(level=log.INFO)
24+
self.cp_log = log.getLogger("CaptivePortal")
25+
26+
def get_local_ip(self):
27+
"""Get the local IP address that will be used to bind the forwarding Socket for DNS requests."""
28+
try:
29+
return dataCall.getInfo(1, 0)[2][2]
30+
except Exception as e:
31+
self.cp_log.warning("Error getting local IP: {}".format(e))
32+
return "0.0.0.0"
33+
34+
def forward_to_real_dns(self, request, local_ip):
35+
"""Forward DNS requests to the real DNS server."""
36+
sock = None
37+
try:
38+
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
39+
sock.settimeout(5)
40+
sock.bind((local_ip, 0))
41+
sock.sendto(request, self.real_dns_server)
42+
response, _ = sock.recvfrom(512)
43+
return response
44+
except Exception as e:
45+
serr = str(e)
46+
if serr != "[Errno 113] EHOSTUNREACH":
47+
self.cp_log.warning("Error forwarding DNS request: {}".format(e))
48+
return None
49+
finally:
50+
if sock:
51+
sock.close()
52+
53+
def start_http_server(self):
54+
"""Start the HTTP redirection server."""
55+
ip = "192.168.43.1" # USBNET Gateway IP
56+
addr = (ip, 80)
57+
s = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
58+
s.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)
59+
s.bind(addr)
60+
s.listen(3)
61+
self.cp_log.info("HTTP Server running at http://{}:80/".format(ip))
62+
63+
while self.portal_active:
64+
try:
65+
res = s.accept()
66+
if not res or len(res) < 2:
67+
continue
68+
client_sock = res[0]
69+
client_addr = (res[1], res[2])
70+
self.cp_log.debug("Connection from {}".format(client_addr))
71+
72+
try:
73+
request = client_sock.recv(1024).decode('utf-8')
74+
if not request:
75+
self.cp_log.debug("Empty request, closing connection.")
76+
continue
77+
78+
self.cp_log.debug("Request: {}".format(request))
79+
response = "HTTP/1.1 302 Found\r\nLocation: {}\r\n\r\n".format(self.target_url)
80+
client_sock.send(response)
81+
except Exception as e:
82+
self.cp_log.warning("Request handling error: {}".format(e))
83+
finally:
84+
client_sock.close()
85+
utime.sleep_ms(100)
86+
except Exception as e:
87+
self.cp_log.warning("HTTP server error: {}".format(e))
88+
continue
89+
if client_sock:
90+
client_sock.close()
91+
s.close()
92+
93+
def start_dns_server(self):
94+
"""Start a DNS hijacking server."""
95+
ip = "192.168.43.1" # Captive Portal IP
96+
local_ip = self.get_local_ip()
97+
dns_sock = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
98+
dns_sock.bind(("0.0.0.0", 53))
99+
self.cp_log.info("DNS Server running...")
100+
101+
while self.portal_active:
102+
try:
103+
data, addr = dns_sock.recvfrom(1024)
104+
self.cp_log.debug("DNS request from {}".format(addr))
105+
106+
# Resolving DNS requests
107+
p = DNSQuery(data)
108+
109+
if any(whitelisted in p.domain for whitelisted in self.dns_whitelist):
110+
self.cp_log.debug("Domain {} is in whitelist, forwarding to real DNS.".format(p.domain))
111+
response = self.forward_to_real_dns(data, local_ip)
112+
if response is None:
113+
self.cp_log.debug("Failed to get response from real DNS, sending empty response.")
114+
response = b"\x00" * len(data)
115+
else:
116+
response = p.response(ip)
117+
self.cp_log.debug('Hijacking: {:s} -> {:s}'.format(p.domain, ip))
118+
119+
dns_sock.sendto(response, addr)
120+
except Exception as e:
121+
self.cp_log.warning("DNS handling error: {}".format(e))
122+
utime.sleep_ms(100)
123+
dns_sock.close()
124+
125+
def start(self):
126+
"""Start a Captive Portal, including HTTP and DNS services."""
127+
self.portal_active = True
128+
self.dns_thread = _thread.start_new_thread(self.start_dns_server, ())
129+
self.http_thread = _thread.start_new_thread(self.start_http_server, ())
130+
self.cp_log.info("Captive Portal started.")
131+
132+
def stop(self):
133+
"""Close the Captive Portal and stop HTTP and DNS services."""
134+
self.portal_active = False
135+
self.cp_log.info("Captive Portal stopped.")
136+
137+
138+
# Resolve the DNS request Class
139+
class DNSQuery:
140+
def __init__(self, data):
141+
self.data = data
142+
self.domain = ''
143+
try:
144+
m = data[2] # Flags
145+
tipo = (m >> 3) & 15 # Opcode bits
146+
if tipo == 0:
147+
ini = 12
148+
lon = data[ini]
149+
while lon != 0:
150+
self.domain += data[ini+1:ini+lon+1].decode("utf-8") + '.'
151+
ini += lon + 1
152+
lon = data[ini]
153+
except Exception as e:
154+
self.domain = ''
155+
log.warning("Error parsing DNS query: {}".format(e))
156+
157+
def response(self, ip):
158+
"""Building a DNS hijacking response"""
159+
packet = b''
160+
if self.domain:
161+
packet += self.data[:2] + b"\x81\x80"
162+
packet += self.data[4:6] + self.data[4:6] + b'\x00\x00\x00\x00'
163+
packet += self.data[12:]
164+
packet += b'\xc0\x0c'
165+
packet += b'\x00\x01\x00\x01\x00\x00\x00\x3c\x00\x04'
166+
packet += bytes(map(int, ip.split('.')))
167+
return packet
168+
169+
# Example
170+
if __name__ == "__main__":
171+
# Windows: Type_RNDIS
172+
# Linux/Android/IOS: Type_ECM
173+
if (USBNET.get_worktype() != USBNET.Type_RNDIS):
174+
USBNET.set_worktype(USBNET.Type_RNDIS)
175+
Power.powerRestart()
176+
177+
portal = CaptivePortal(
178+
target_url="https://python.quectel.com/",
179+
dns_whitelist=["www.python.quectel.com", "python.quectel.com"]
180+
)
181+
portal.start()
182+
183+
cnt = 0
184+
while True:
185+
utime.sleep(1)
186+
ret = USBNET.open()
187+
cnt = cnt + 1
188+
if ret == 0:
189+
print("USBNET status: ",USBNET.get_status())
190+
print("USBNET type: ",USBNET.get_worktype())
191+
break
192+
if cnt == 60:
193+
print("USBNET open fail!")
194+
portal.stop()
195+
break
196+
197+

LICENSE

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
Copyright (c) Quectel Wireless Solution, Co., Ltd.All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.

readme.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# CaptivePortal lib for QuecPython
2+
3+
## Overview
4+
5+
**Captive Portal** is a network access control mechanism commonly used in public Wi-Fi networks. When users connect to a network requiring authentication, they are automatically redirected to a webpage where they must enter a username and password or accept the terms of use. In addition to authentication, Captive Portals can also be used for displaying advertisements, informing users of network usage policies, and more.
6+
7+
Integrating **Captive Portal functionality into a USB dongle** allows you to manage devices connected to your network more conveniently and provide a personalized network experience. This feature can enhance security, enable user interaction through customized portals, and facilitate the deployment of network usage policies, advertisements, or service announcements.
8+
9+
**QuecPython** provides an implementation of the Captive Portal technology, allowing users to specify the webpage they want the Captive Portal to redirect to. This enables users to implement their own network management strategies effectively.
10+
11+
## Example
12+
13+
For demonstration purposes, this example specifies **https://python.quectel.com** as the target page for Captive Portal redirection.
14+
15+
```python
16+
# Example
17+
if __name__ == "__main__":
18+
# Windows: Type_RNDIS
19+
# Linux/Android/IOS: Type_ECM
20+
if (USBNET.get_worktype() != USBNET.Type_RNDIS):
21+
USBNET.set_worktype(USBNET.Type_RNDIS)
22+
Power.powerRestart()
23+
24+
portal = CaptivePortal(
25+
target_url="https://python.quectel.com/",
26+
dns_whitelist=["www.python.quectel.com", "python.quectel.com"]
27+
)
28+
portal.start()
29+
30+
cnt = 0
31+
while True:
32+
utime.sleep(1)
33+
ret = USBNET.open()
34+
cnt = cnt + 1
35+
if ret == 0:
36+
print("USBNET status: ",USBNET.get_status())
37+
print("USBNET type: ",USBNET.get_worktype())
38+
break
39+
if cnt == 60:
40+
print("USBNET open fail!")
41+
portal.stop()
42+
break
43+
```

0 commit comments

Comments
 (0)