Skip to content

[client] Fix/pkg loss #3338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e9b3b62
Improve WireGuard handshake success rate
pappz Dec 20, 2024
bfa6df1
The non handshake initiator peer start the handshake after timeout
pappz Jan 20, 2025
6a0f6ef
Always stop timer
pappz Jan 20, 2025
ffe7436
Code cleaning
pappz Feb 14, 2025
1f088b7
Extend the proxy interface with RedirectTo function and implement it …
pappz Feb 16, 2025
06a17f0
Implement redirect to in eBPF proxy
pappz Feb 16, 2025
4db73a1
Implement redirect logic in UDP proxy
pappz Feb 17, 2025
d5042f6
Fix interface changes
pappz Feb 17, 2025
2d5b5f5
Remove log line
pappz Feb 17, 2025
b17c1d9
[client] Improve WireGuard handshake success rate (#3092)
pappz Feb 17, 2025
082452e
Add info log line
pappz Feb 17, 2025
335866a
Close unused rawsocket
pappz Feb 17, 2025
aca443b
Build UDP proxy on Linux only
pappz Feb 17, 2025
775b4fe
Fix close operation
pappz Feb 17, 2025
360c713
Add unit test
pappz Feb 17, 2025
1963644
Add close test for all implementation
pappz Feb 17, 2025
3d80a25
Fix possible blocker if the bind will be closed earlier then proxy
pappz Feb 17, 2025
1f83ba4
Ignore err in tests
pappz Feb 17, 2025
1eacff2
Remove WireGuard kernel code from FreeBSD
pappz Feb 18, 2025
f0020ad
Fallback to package loss solution if the raw socket does not work.
pappz Feb 18, 2025
648b4cd
Update client/iface/wgproxy/udp/proxy.go
pappz Feb 21, 2025
d496d21
Apply same pausedCond logic on all implementation
pappz Feb 21, 2025
651e88d
Eliminate code duplication
pappz Feb 21, 2025
62d1049
Merge branch 'main' into fix/pkg-loss
pappz Feb 21, 2025
06d7125
Build rawsocket code on linux only.
pappz Feb 21, 2025
559d347
Revert async WireGuard handshake
pappz Feb 21, 2025
2e6a44a
Merge branch 'main' into fix/pkg-loss
pappz Feb 24, 2025
990aa8f
Remove wg initiator logic
pappz Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion client/iface/bind/endpoint.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package bind

import wgConn "golang.zx2c4.com/wireguard/conn"
import (
"net"

wgConn "golang.zx2c4.com/wireguard/conn"
)

type Endpoint = wgConn.StdNetEndpoint

func EndpointToUDPAddr(e Endpoint) *net.UDPAddr {
return &net.UDPAddr{
IP: e.Addr().AsSlice(),
Port: int(e.Port()),
}
}
15 changes: 12 additions & 3 deletions client/iface/bind/ice_bind.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bind

import (
"context"
"fmt"
"net"
"net/netip"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (rc receiverCreator) CreateIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UD
// use the port because in the Send function the wgConn.Endpoint the port info is not exported.
type ICEBind struct {
*wgConn.StdNetBind
RecvChan chan RecvMessage
recvChan chan RecvMessage

transportNet transport.Net
filterFn FilterFn
Expand All @@ -58,7 +59,7 @@ func NewICEBind(transportNet transport.Net, filterFn FilterFn) *ICEBind {
b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind)
ib := &ICEBind{
StdNetBind: b,
RecvChan: make(chan RecvMessage, 1),
recvChan: make(chan RecvMessage, 1),
transportNet: transportNet,
filterFn: filterFn,
endpoints: make(map[netip.Addr]net.Conn),
Expand Down Expand Up @@ -155,6 +156,14 @@ func (b *ICEBind) Send(bufs [][]byte, ep wgConn.Endpoint) error {
return nil
}

func (b *ICEBind) Recv(ctx context.Context, msg RecvMessage) {
select {
case <-ctx.Done():
return
case b.recvChan <- msg:
}
}

func (s *ICEBind) createIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UDPConn, rxOffload bool, msgsPool *sync.Pool) wgConn.ReceiveFunc {
s.muUDPMux.Lock()
defer s.muUDPMux.Unlock()
Expand Down Expand Up @@ -264,7 +273,7 @@ func (c *ICEBind) receiveRelayed(buffs [][]byte, sizes []int, eps []wgConn.Endpo
select {
case <-c.closedChan:
return 0, net.ErrClosed
case msg, ok := <-c.RecvChan:
case msg, ok := <-c.recvChan:
if !ok {
return 0, net.ErrClosed
}
Expand Down
40 changes: 40 additions & 0 deletions client/iface/iface_new_freebsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//go:build freebsd

package iface

import (
"fmt"

"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/iface/wgproxy"
)

// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
wgAddress, err := device.ParseWGAddress(opts.Address)
if err != nil {
return nil, err
}

wgIFace := &WGIface{}

if netstack.IsEnabled() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn)
wgIFace.tun = device.NewNetstackDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, netstack.ListenAddr())
wgIFace.userspaceBind = true
wgIFace.wgProxyFactory = wgproxy.NewUSPFactory(iceBind)
return wgIFace, nil
}

if device.ModuleTunIsLoaded() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn)
wgIFace.tun = device.NewUSPDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind)
wgIFace.userspaceBind = true
wgIFace.wgProxyFactory = wgproxy.NewUSPFactory(iceBind)
return wgIFace, nil
}

return nil, fmt.Errorf("couldn't check or load tun module")
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:build (linux && !android) || freebsd
//go:build linux && !android

package iface

Expand Down
101 changes: 71 additions & 30 deletions client/iface/wgproxy/bind/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,66 +13,99 @@ import (
"github.com/netbirdio/netbird/client/iface/bind"
)

type IceBind interface {
SetEndpoint(addr *net.UDPAddr, conn net.Conn) (*net.UDPAddr, error)
RemoveEndpoint(addr *net.UDPAddr)
Recv(ctx context.Context, msg bind.RecvMessage)
}

type ProxyBind struct {
Bind *bind.ICEBind

wgAddr *net.UDPAddr
wgEndpoint *bind.Endpoint
remoteConn net.Conn
ctx context.Context
cancel context.CancelFunc
closeMu sync.Mutex
closed bool

pausedMu sync.Mutex
paused bool
isStarted bool
bind IceBind

// wgEndpoint is a fake address that generated by the Bind.SetEndpoint based on the remote NetBird peer address
wgRelayedEndpoint *bind.Endpoint
wgCurrentUsed *bind.Endpoint
remoteConn net.Conn
ctx context.Context
cancel context.CancelFunc
closeMu sync.Mutex
closed bool

paused bool
pausedCond *sync.Cond
isStarted bool
}

func NewProxyBind(bind IceBind) *ProxyBind {
return &ProxyBind{
bind: bind,
pausedCond: sync.NewCond(&sync.Mutex{}),
}
}

// AddTurnConn adds a new connection to the bind.
// endpoint is the NetBird address of the remote peer. The SetEndpoint return with the address what will be used in the
// WireGuard configuration.
//
// Parameters:
// - ctx: Context is used for proxyToLocal to avoid unnecessary error messages
// - nbAddr: The NetBird UDP address of the remote peer, it required to generate fake address
// - remoteConn: The established TURN connection to the remote peer
func (p *ProxyBind) AddTurnConn(ctx context.Context, nbAddr *net.UDPAddr, remoteConn net.Conn) error {
addr, err := p.Bind.SetEndpoint(nbAddr, remoteConn)
fakeAddr, err := p.bind.SetEndpoint(nbAddr, remoteConn)
if err != nil {
return err
}

p.wgAddr = addr
p.wgEndpoint = addrToEndpoint(addr)
p.wgRelayedEndpoint = addrToEndpoint(fakeAddr)
p.remoteConn = remoteConn
p.ctx, p.cancel = context.WithCancel(ctx)
return err

}

func (p *ProxyBind) EndpointAddr() *net.UDPAddr {
return p.wgAddr
return bind.EndpointToUDPAddr(*p.wgRelayedEndpoint)
}

func (p *ProxyBind) Work() {
if p.remoteConn == nil {
return
}

p.pausedMu.Lock()
p.pausedCond.L.Lock()
p.paused = false
p.pausedMu.Unlock()

p.wgCurrentUsed = p.wgRelayedEndpoint

// Start the proxy only once
if !p.isStarted {
p.isStarted = true
go p.proxyToLocal(p.ctx)
}

p.pausedCond.L.Unlock()
// todo: review to should be inside the lock scope
p.pausedCond.Signal()
}

func (p *ProxyBind) Pause() {
if p.remoteConn == nil {
return
}

p.pausedMu.Lock()
p.pausedCond.L.Lock()
p.paused = true
p.pausedMu.Unlock()
p.pausedCond.L.Unlock()
}

func (p *ProxyBind) RedirectAs(endpoint *net.UDPAddr) {
p.pausedCond.L.Lock()
p.paused = false

p.wgCurrentUsed = addrToEndpoint(endpoint)

p.pausedCond.L.Unlock()
p.pausedCond.Signal()
}

func (p *ProxyBind) CloseConn() error {
Expand All @@ -83,6 +116,10 @@ func (p *ProxyBind) CloseConn() error {
}

func (p *ProxyBind) close() error {
if p.remoteConn == nil {
return nil
}

p.closeMu.Lock()
defer p.closeMu.Unlock()

Expand All @@ -93,7 +130,12 @@ func (p *ProxyBind) close() error {

p.cancel()

p.Bind.RemoveEndpoint(p.wgAddr)
p.pausedCond.L.Lock()
p.paused = false
p.pausedCond.L.Unlock()
p.pausedCond.Signal()

p.bind.RemoveEndpoint(bind.EndpointToUDPAddr(*p.wgRelayedEndpoint))

if rErr := p.remoteConn.Close(); rErr != nil && !errors.Is(rErr, net.ErrClosed) {
return rErr
Expand All @@ -119,18 +161,17 @@ func (p *ProxyBind) proxyToLocal(ctx context.Context) {
return
}

p.pausedMu.Lock()
if p.paused {
p.pausedMu.Unlock()
continue
p.pausedCond.L.Lock()
for p.paused {
p.pausedCond.Wait()
}

msg := bind.RecvMessage{
Endpoint: p.wgEndpoint,
Endpoint: p.wgCurrentUsed,
Buffer: buf[:n],
}
p.Bind.RecvChan <- msg
p.pausedMu.Unlock()
p.bind.Recv(ctx, msg)
p.pausedCond.L.Unlock()
}
}

Expand Down
59 changes: 11 additions & 48 deletions client/iface/wgproxy/ebpf/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"context"
"fmt"
"net"
"os"
"sync"
"syscall"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
Expand All @@ -17,6 +15,7 @@ import (
log "github.com/sirupsen/logrus"

nberrors "github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/client/iface/wgproxy/rawsocket"
"github.com/netbirdio/netbird/client/internal/ebpf"
ebpfMgr "github.com/netbirdio/netbird/client/internal/ebpf/manager"
nbnet "github.com/netbirdio/netbird/util/net"
Expand All @@ -26,6 +25,10 @@ const (
loopbackAddr = "127.0.0.1"
)

var (
localHostNetIP = net.ParseIP("127.0.0.1")
)

// WGEBPFProxy definition for proxy with EBPF support
type WGEBPFProxy struct {
localWGListenPort int
Expand Down Expand Up @@ -61,7 +64,7 @@ func (p *WGEBPFProxy) Listen() error {
return err
}

p.rawConn, err = p.prepareSenderRawSocket()
p.rawConn, err = rawsocket.PrepareSenderRawSocket()
if err != nil {
return err
}
Expand Down Expand Up @@ -211,57 +214,17 @@ generatePort:
return p.lastUsedPort, nil
}

func (p *WGEBPFProxy) prepareSenderRawSocket() (net.PacketConn, error) {
// Create a raw socket.
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
if err != nil {
return nil, fmt.Errorf("creating raw socket failed: %w", err)
}

// Set the IP_HDRINCL option on the socket to tell the kernel that headers are included in the packet.
err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1)
if err != nil {
return nil, fmt.Errorf("setting IP_HDRINCL failed: %w", err)
}

// Bind the socket to the "lo" interface.
err = syscall.SetsockoptString(fd, syscall.SOL_SOCKET, syscall.SO_BINDTODEVICE, "lo")
if err != nil {
return nil, fmt.Errorf("binding to lo interface failed: %w", err)
}

// Set the fwmark on the socket.
err = nbnet.SetSocketOpt(fd)
if err != nil {
return nil, fmt.Errorf("setting fwmark failed: %w", err)
}

// Convert the file descriptor to a PacketConn.
file := os.NewFile(uintptr(fd), fmt.Sprintf("fd %d", fd))
if file == nil {
return nil, fmt.Errorf("converting fd to file failed")
}
packetConn, err := net.FilePacketConn(file)
if err != nil {
return nil, fmt.Errorf("converting file to packet conn failed: %w", err)
}

return packetConn, nil
}

func (p *WGEBPFProxy) sendPkg(data []byte, port int) error {
localhost := net.ParseIP("127.0.0.1")

func (p *WGEBPFProxy) sendPkg(data []byte, endpointAddr *net.UDPAddr) error {
payload := gopacket.Payload(data)
ipH := &layers.IPv4{
DstIP: localhost,
SrcIP: localhost,
DstIP: localHostNetIP,
SrcIP: endpointAddr.IP,
Version: 4,
TTL: 64,
Protocol: layers.IPProtocolUDP,
}
udpH := &layers.UDP{
SrcPort: layers.UDPPort(port),
SrcPort: layers.UDPPort(endpointAddr.Port),
DstPort: layers.UDPPort(p.localWGListenPort),
}

Expand All @@ -276,7 +239,7 @@ func (p *WGEBPFProxy) sendPkg(data []byte, port int) error {
if err != nil {
return fmt.Errorf("serialize layers: %w", err)
}
if _, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localhost}); err != nil {
if _, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localHostNetIP}); err != nil {
return fmt.Errorf("write to raw conn: %w", err)
}
return nil
Expand Down
Loading
Loading