forked from swingerman/ha-dual-smart-thermostat
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpcap.py
More file actions
187 lines (153 loc) · 5.37 KB
/
pcap.py
File metadata and controls
187 lines (153 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Lightweight ctypes-based wrapper around libpcap for basic operations.
This is a minimal shim to allow compiling and setting BPF filters and opening
live captures without requiring a C-extension build (useful for Python 3.13
dev environments where upstream wheels are not available).
It intentionally implements only a small surface: findalldevs, open_live,
compile, setfilter, close, next_ex. Not a full replacement for pcapy/pypcap.
"""
from ctypes import (
CDLL,
POINTER,
Structure,
byref,
c_char,
c_char_p,
c_int,
c_uint,
c_void_p,
)
from ctypes.util import find_library
libname = find_library("pcap")
if not libname:
raise ImportError("libpcap not found on system; install libpcap-dev/libpcap")
_pcap = CDLL(libname)
class PcapBpfProgram(Structure):
_fields_ = [("bf_len", c_uint), ("bf_insns", c_void_p)]
class PcapIf(Structure):
pass
PcapIf._fields_ = [
("next", POINTER(PcapIf)),
("name", c_char_p),
("description", c_char_p),
("addresses", c_void_p),
("flags", c_uint),
]
_pcap.pcap_findalldevs.argtypes = [POINTER(POINTER(PcapIf)), c_char_p]
_pcap.pcap_findalldevs.restype = c_int
_pcap.pcap_freealldevs.argtypes = [POINTER(PcapIf)]
_pcap.pcap_freealldevs.restype = None
_pcap.pcap_open_live.argtypes = [c_char_p, c_int, c_int, c_int, c_char_p]
_pcap.pcap_open_live.restype = c_void_p
_pcap.pcap_close.argtypes = [c_void_p]
_pcap.pcap_close.restype = None
_pcap.pcap_compile.argtypes = [
c_void_p,
POINTER(PcapBpfProgram),
c_char_p,
c_int,
c_uint,
]
_pcap.pcap_compile.restype = c_int
_pcap.pcap_freecode.argtypes = [POINTER(PcapBpfProgram)]
_pcap.pcap_freecode.restype = None
_pcap.pcap_setfilter.argtypes = [c_void_p, POINTER(PcapBpfProgram)]
_pcap.pcap_setfilter.restype = c_int
_pcap.pcap_next_ex.argtypes = [c_void_p, POINTER(c_void_p), POINTER(c_void_p)]
_pcap.pcap_next_ex.restype = c_int
# pcap_open_dead allows compiling filters without opening a live device
_pcap.pcap_open_dead.argtypes = [c_int, c_int]
_pcap.pcap_open_dead.restype = c_void_p
def findalldevs():
devpp = POINTER(PcapIf)()
errbuf = (c_char * 256)()
res = _pcap.pcap_findalldevs(byref(devpp), errbuf)
if res != 0:
raise OSError(
f"pcap_findalldevs failed: {errbuf.value.decode(errors='ignore')}"
)
devs = []
cur = devpp
while bool(cur):
dev = cur.contents
name = dev.name.decode() if dev.name else None
desc = dev.description.decode() if dev.description else None
devs.append((name, desc))
cur = dev.next
_pcap.pcap_freealldevs(devpp)
return devs
class Pcap:
def __init__(self, device, snaplen=65535, promisc=1, to_ms=1000):
errbuf = (c_char * 256)()
self._p = _pcap.pcap_open_live(
device.encode() if isinstance(device, str) else device,
snaplen,
promisc,
to_ms,
errbuf,
)
if not self._p:
raise OSError(
f"pcap_open_live failed: {errbuf.value.decode(errors='ignore')}"
)
def compile_filter(self, filter_expr, optimize=True, netmask=0xFFFFFFFF):
prog = PcapBpfProgram()
res = _pcap.pcap_compile(
self._p, byref(prog), filter_expr.encode(), 1 if optimize else 0, netmask
)
if res != 0:
# attempt to get error via pcap_geterr if present
try:
_pcap.pcap_geterr.argtypes = [c_void_p]
_pcap.pcap_geterr.restype = c_char_p
msg = _pcap.pcap_geterr(self._p)
raise OSError(
f"pcap_compile failed: {msg.decode() if msg else 'unknown'}"
)
except Exception:
raise OSError("pcap_compile failed")
return prog
def setfilter(self, prog):
res = _pcap.pcap_setfilter(self._p, byref(prog))
if res != 0:
raise OSError("pcap_setfilter failed")
def close(self):
if self._p:
_pcap.pcap_close(self._p)
self._p = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.close()
def compile_filter_on_device(filter_expr):
"""Convenience: find first device and compile filter on it (raises on failure)."""
# Use pcap_open_dead so we can compile filters without opening a live capture
DLT_EN10MB = 1
SNAPLEN = 65535
dead = _pcap.pcap_open_dead(DLT_EN10MB, SNAPLEN)
if not dead:
raise OSError("pcap_open_dead failed")
prog = PcapBpfProgram()
res = _pcap.pcap_compile(dead, byref(prog), filter_expr.encode(), 1, 0xFFFFFFFF)
if res != 0:
# try to get error
try:
_pcap.pcap_geterr.argtypes = [c_void_p]
_pcap.pcap_geterr.restype = c_char_p
msg = _pcap.pcap_geterr(dead)
raise OSError(f"pcap_compile failed: {msg.decode() if msg else 'unknown'}")
except Exception:
raise OSError("pcap_compile failed")
_pcap.pcap_freecode(byref(prog))
# close dead handle if pcap_close exists
try:
_pcap.pcap_close(dead)
except Exception:
pass
if __name__ == "__main__":
print("libpcap shim using:", libname)
print("devices:", findalldevs())
try:
compile_filter_on_device("port 67 or port 68")
print("compiled DHCP filter OK")
except Exception as e:
print("compile failed:", e)