Skip to content

Commit 243d503

Browse files
authored
UDP expose connection tracking (#104)
- Add connection tracking to the UDP expose functionality. - Fix a bug in the TCP proxy used by tunnel and expose that would sometimes close one end of the proxy too soon - Added server messages when expose connections are made, similar to tunnel connection messages - Server UDP tunnel connection messages now only trigger on first connections (when tracking starts), not every packet - Bump dockerfile golang version to 1.24 to match current code requirements - docker-compose file for demo environment now also mounts the src folder to /wiretap-live to make development testing much faster; code can be edited on the host and is immediately available for compilation in all containers.
1 parent e631dbe commit 243d503

File tree

6 files changed

+202
-16
lines changed

6 files changed

+202
-16
lines changed

docker-compose.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ services:
5151
- net.ipv6.conf.all.disable_ipv6=0
5252
environment:
5353
- DISPLAY=host.docker.internal:0
54+
volumes:
55+
- ./src:/wiretap-live/
5456
server:
5557
depends_on:
5658
- client
@@ -71,6 +73,8 @@ services:
7173
- net.ipv6.conf.all.disable_ipv6=0
7274
environment:
7375
- DISPLAY=host.docker.internal:0
76+
volumes:
77+
- ./src:/wiretap-live/
7478
target:
7579
depends_on:
7680
- client
@@ -87,6 +91,8 @@ services:
8791
- SYS_MODULE
8892
sysctls:
8993
- net.ipv6.conf.all.disable_ipv6=0
94+
volumes:
95+
- ./src:/wiretap-live/
9096
target2:
9197
depends_on:
9298
- client
@@ -100,6 +106,8 @@ services:
100106
- SYS_MODULE
101107
sysctls:
102108
- net.ipv6.conf.all.disable_ipv6=0
109+
volumes:
110+
- ./src:/wiretap-live/
103111

104112

105113
networks:

src/cmd/serve.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import (
2323
"gvisor.dev/gvisor/pkg/tcpip/network/ipv4"
2424
"gvisor.dev/gvisor/pkg/tcpip/network/ipv6"
2525
"gvisor.dev/gvisor/pkg/tcpip/stack"
26+
gicmp "gvisor.dev/gvisor/pkg/tcpip/transport/icmp"
2627
"gvisor.dev/gvisor/pkg/tcpip/transport/raw"
2728
gtcp "gvisor.dev/gvisor/pkg/tcpip/transport/tcp"
2829
gudp "gvisor.dev/gvisor/pkg/tcpip/transport/udp"
29-
gicmp "gvisor.dev/gvisor/pkg/tcpip/transport/icmp"
3030

3131
"wiretap/peer"
3232
"wiretap/transport/api"

src/transport/api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ func handleExpose(tnet *netstack.Net, exposeMap *map[ExposeTuple]ExposeConn, exp
533533
}
534534

535535
// Bind successful, expose port.
536-
go transport.ForwardUdpPort(
536+
go transport.ForwardUdpPortWithTracking(
537537
tnet.Stack(),
538538
c,
539539
localAddr,

src/transport/transport.go

Lines changed: 186 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/netip"
1212
"strconv"
1313
"sync"
14+
"time"
1415

1516
"github.com/armon/go-socks5"
1617
"github.com/google/gopacket"
@@ -39,6 +40,7 @@ type ConnCounts struct {
3940
}
4041

4142
var connCounts ConnCounts
43+
const UDP_TIMEOUT = 60 * time.Second
4244

4345
func init() {
4446
connCounts.counts = make(map[netip.Addr]int)
@@ -139,24 +141,30 @@ func SendPacket(s *stack.Stack, packet []byte, addr *tcpip.FullAddress, netProto
139141
}
140142

141143
func Proxy(src net.Conn, dst net.Conn) {
144+
defer src.Close()
145+
defer dst.Close()
142146
var wg sync.WaitGroup
143147

148+
//log.Printf("Proxying between %v <-> %v and %v <-> %v\n", src.LocalAddr(), src.RemoteAddr(), dst.LocalAddr(), dst.RemoteAddr())
149+
144150
wg.Add(1)
145151
go func() {
152+
defer wg.Done()
146153
_, err := io.Copy(src, dst)
147154
if err != nil {
148155
log.Printf("error copying between connections: %v\n", err)
149156
}
150-
src.Close()
151-
wg.Done()
152157
}()
153158

154159
// Copy from peer to new connection.
155-
_, nerr := io.Copy(dst, src)
156-
if nerr != nil {
157-
log.Printf("error copying between connections: %v\n", nerr)
158-
}
159-
dst.Close()
160+
wg.Add(1)
161+
go func() {
162+
defer wg.Done()
163+
_, nerr := io.Copy(dst, src)
164+
if nerr != nil {
165+
log.Printf("error copying between connections: %v\n", nerr)
166+
}
167+
}()
160168

161169
// Wait for both copies to finish.
162170
wg.Wait()
@@ -172,9 +180,12 @@ func ForwardTcpPort(s *stack.Stack, l net.Listener, localAddr tcpip.FullAddress,
172180
return
173181
}
174182

183+
log.Printf("(client [%v]:%v) <- Expose: TCP <- %v", remoteAddr.Addr, remoteAddr.Port, conn.RemoteAddr().String())
184+
175185
// Proxy between conns.
176186
go func() {
177187
var nc net.Conn
188+
// since the localAddr has a port value of 0 it should auto-select a random available source port
178189
nc, err = gonet.DialTCPWithBind(
179190
ctx,
180191
s,
@@ -195,6 +206,7 @@ func ForwardTcpPort(s *stack.Stack, l net.Listener, localAddr tcpip.FullAddress,
195206

196207
// ForwardUdpPort proxies UDP datagrams by forwarding datagrams to a peer, and then returns responses to the last remote address to talk to this endpoint.
197208
// No connection tracking is in place at this time.
209+
// DEPRECATED. Use ForwardUdpPortWithTracking instead.
198210
func ForwardUdpPort(s *stack.Stack, conn *net.UDPConn, localAddr tcpip.FullAddress, remoteAddr tcpip.FullAddress, np tcpip.NetworkProtocolNumber) {
199211
var wg sync.WaitGroup
200212
var clientAddr *netip.AddrPort
@@ -248,6 +260,7 @@ func ForwardUdpPort(s *stack.Stack, conn *net.UDPConn, localAddr tcpip.FullAddre
248260
}
249261
lock.Lock()
250262
if clientAddr == nil {
263+
log.Println("UDP client addr null, cannot forward response")
251264
lock.Unlock()
252265
continue
253266
}
@@ -263,6 +276,172 @@ func ForwardUdpPort(s *stack.Stack, conn *net.UDPConn, localAddr tcpip.FullAddre
263276
wg.Wait()
264277
}
265278

279+
// ForwardUdpPortWithTracking proxies UDP datagrams by forwarding datagrams to and from a peer. All connections are tracked so that the client responses can be sent to the correct sender
280+
//
281+
// "conn" is the listening socket on the "real" network (the port being forwarded).
282+
// "LocalAddr" should be the API listener for this server inside Wiretap's network (src), but port 0 (so a random ephemeral port is assigned).
283+
// "remoteAddr" is the Client's IP(v6) address and port in Wiretap's network (dst)
284+
func ForwardUdpPortWithTracking(s *stack.Stack, conn *net.UDPConn, localAddr tcpip.FullAddress, remoteAddr tcpip.FullAddress, np tcpip.NetworkProtocolNumber) {
285+
var wg sync.WaitGroup
286+
const bufSize = 65535
287+
type trackedConn struct {
288+
udpConn *gonet.UDPConn
289+
lastActive time.Time
290+
}
291+
var connTrack = make(map[netip.AddrPort]*trackedConn)
292+
293+
var ctLock sync.Mutex
294+
var newConn = make(chan netip.AddrPort)
295+
296+
// Sanity check that we can create a connection to the target client port
297+
_, err := gonet.DialUDP(s, &localAddr, &remoteAddr, np)
298+
if err != nil {
299+
log.Println("failed to proxy UDP conn:", err)
300+
conn.Close()
301+
return
302+
}
303+
304+
305+
// Watchdog: periodically check for idle connections
306+
var stopWatchdog = make(chan bool)
307+
wg.Add(1)
308+
go func() {
309+
defer wg.Done()
310+
ticker := time.NewTicker(30 * time.Second)
311+
defer ticker.Stop()
312+
313+
for {
314+
select {
315+
case <-ticker.C:
316+
expire := time.Now().Add(-1 * UDP_TIMEOUT)
317+
ctLock.Lock()
318+
for addr, clientConn := range connTrack {
319+
if clientConn.lastActive.Before(expire) {
320+
log.Printf("Closing idle UDP forward: (client %v) <- Expose: UDP <- %v", clientConn.udpConn.RemoteAddr().String(), addr.String())
321+
clientConn.udpConn.Close()
322+
delete(connTrack, addr)
323+
}
324+
}
325+
ctLock.Unlock()
326+
case <-stopWatchdog:
327+
return
328+
}
329+
}
330+
}()
331+
332+
// Process incoming packets (new or existing “connections”)
333+
wg.Add(1)
334+
go func() {
335+
defer wg.Done()
336+
buf := make([]byte, bufSize)
337+
338+
for {
339+
n, addr, err := conn.ReadFromUDPAddrPort(buf)
340+
if err != nil {
341+
// This "listener" gets closed when the forward is removed via API
342+
log.Println("conn closed:", err)
343+
close(newConn) //signal the other goroutines to shut down
344+
stopWatchdog <-true
345+
346+
ctLock.Lock()
347+
for _, t := range connTrack {
348+
t.udpConn.Close()
349+
}
350+
ctLock.Unlock()
351+
352+
return
353+
}
354+
355+
ctLock.Lock()
356+
clientConn, exists := connTrack[addr]
357+
ctLock.Unlock()
358+
if !exists {
359+
// If this connection is not already being tracked, set it up
360+
udpConn, err := gonet.DialUDP(s, &localAddr, &remoteAddr, np)
361+
if err != nil {
362+
log.Println("failed to establish new proxy conn:", err)
363+
continue
364+
}
365+
clientConn = &trackedConn{udpConn: udpConn, lastActive: time.Now()}
366+
ctLock.Lock()
367+
connTrack[addr] = clientConn
368+
ctLock.Unlock()
369+
370+
// This blocks until it gets picked up by the response handler routine below
371+
newConn <- addr
372+
373+
} else {
374+
clientConn.lastActive = time.Now()
375+
}
376+
377+
// Forward payload to the client
378+
// We currently have no way to capture Dest Unreachable ICMP here, so the first Write() will never fail due to that.
379+
// But the network stack will remember them, so the second Write() will fail and trigger the cleanup.
380+
_, err = clientConn.udpConn.Write(buf[:n])
381+
if err != nil {
382+
log.Println("failed to forward UDP packet to client:", err)
383+
clientConn.udpConn.Close()
384+
continue
385+
}
386+
}
387+
}()
388+
389+
// Process new connections – set up routines that handle return (outgoing) packets
390+
for {
391+
ncAddr, ok := <-newConn
392+
if !ok {
393+
break
394+
}
395+
396+
ctLock.Lock()
397+
clientConn, exists := connTrack[ncAddr]
398+
ctLock.Unlock()
399+
if !exists {
400+
log.Printf("new UDP forward connection %v marked for processing but not found\n", ncAddr)
401+
continue
402+
}
403+
404+
clientRemoteAddr := clientConn.udpConn.RemoteAddr().String()
405+
log.Printf("(client %v) <- Expose: UDP <- %v", clientRemoteAddr, ncAddr.String())
406+
407+
// Spawn routine to forward responses back to the original sender
408+
wg.Add(1)
409+
go func(targetAddr netip.AddrPort) {
410+
defer wg.Done()
411+
buf := make([]byte, bufSize)
412+
413+
for {
414+
n, err := clientConn.udpConn.Read(buf)
415+
if err != nil {
416+
log.Printf("response conn closed: (client %v) <- Expose: UDP <- %v : %s", clientRemoteAddr, targetAddr.String(), err)
417+
clientConn.udpConn.Close()
418+
419+
ctLock.Lock()
420+
delete(connTrack, targetAddr)
421+
ctLock.Unlock()
422+
break
423+
}
424+
425+
ctLock.Lock()
426+
if t, ok := connTrack[targetAddr]; ok {
427+
t.lastActive = time.Now()
428+
}
429+
ctLock.Unlock()
430+
431+
_, err = conn.WriteToUDPAddrPort(buf[:n], targetAddr)
432+
if err != nil {
433+
log.Println("failed to send client UDP response:", err)
434+
continue
435+
}
436+
}
437+
}(ncAddr)
438+
}
439+
440+
441+
wg.Wait()
442+
log.Printf("All routines for UDP forward %v successfully shut down\n", conn.LocalAddr().String())
443+
}
444+
266445
// ForwardTcpPort proxies TCP connections by accepting connections and proxying them back to the client.
267446
func ForwardDynamic(s *stack.Stack, l *net.Listener, localAddr tcpip.FullAddress, remoteAddr tcpip.FullAddress, np tcpip.NetworkProtocolNumber) {
268447
dialer := func(ctx context.Context, network, addr string) (net.Conn, error) {

src/transport/udp/udp.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ type Config struct {
5757
// TODO: Clean this up. Can't use UDPForwarder because it doesn't offer a way to return false, which is required to send Unreachables.
5858
func Handler(c Config) func(stack.TransportEndpointID, *stack.PacketBuffer) bool {
5959
return func(teid stack.TransportEndpointID, pkb *stack.PacketBuffer) bool {
60-
log.Printf("(client %s) - Transport: UDP -> %s", net.JoinHostPort(teid.RemoteAddress.String(), fmt.Sprint(teid.RemotePort)), net.JoinHostPort(teid.LocalAddress.String(), fmt.Sprint(teid.LocalPort)))
61-
6260
packetClone := pkb.Clone()
6361
go func() {
6462
newPacket(packetClone, c.Tnet.Stack())
@@ -129,7 +127,7 @@ func getDataFromPacket(packet *stack.PacketBuffer) []byte {
129127
return transHeader.Payload()
130128
}
131129

132-
// NewPacket handles every new packet and sending it to the proper UDP dialer.
130+
// NewPacket handles every new packet and sends it to the proper UDP dialer.
133131
func newPacket(packet *stack.PacketBuffer, s *stack.Stack) {
134132
netHeader := packet.Network()
135133
transHeader := header.UDP(netHeader.Payload())
@@ -160,6 +158,7 @@ func newPacket(packet *stack.PacketBuffer, s *stack.Stack) {
160158
pktChan = make(chan *stack.PacketBuffer, 1)
161159
connMapWrite(conn, pktChan)
162160

161+
log.Printf("(client %v) - Transport: UDP -> %v", source, dest)
163162
go handleConn(conn, port, s)
164163

165164
pktChan <- packet.Clone()
@@ -202,7 +201,7 @@ func handleConn(conn udpConn, port int, s *stack.Stack) {
202201
sourceMapDecrement(conn.Source)
203202
}()
204203

205-
err = newConn.SetDeadline(time.Now().Add(30 * time.Second))
204+
err = newConn.SetDeadline(time.Now().Add(transport.UDP_TIMEOUT))
206205
if err != nil {
207206
log.Println("failed to set deadline", err)
208207
}
@@ -230,7 +229,7 @@ func handleConn(conn udpConn, port int, s *stack.Stack) {
230229
}
231230

232231
// Reset timer, we got a packet.
233-
err = newConn.SetDeadline(time.Now().Add(30 * time.Second))
232+
err = newConn.SetDeadline(time.Now().Add(transport.UDP_TIMEOUT))
234233
if err != nil {
235234
log.Println("failed to set deadline:", err)
236235
}
@@ -261,7 +260,7 @@ func handleConn(conn udpConn, port int, s *stack.Stack) {
261260
}
262261

263262
// Reset timer, we got a packet.
264-
err = newConn.SetDeadline(time.Now().Add(30 * time.Second))
263+
err = newConn.SetDeadline(time.Now().Add(transport.UDP_TIMEOUT))
265264
if err != nil {
266265
log.Println("failed to set deadline:", err)
267266
}

wiretap.Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.23
1+
FROM golang:1.24
22

33
ARG http_proxy
44
ARG https_proxy

0 commit comments

Comments
 (0)