Skip to content

Support multiple UDP source ports (multiport) #768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/smoke.yml
Original file line number Diff line number Diff line change
@@ -52,4 +52,12 @@ jobs:
working-directory: ./.github/workflows/smoke
run: NAME="smoke-p256" ./smoke.sh

- name: setup docker image for multiport
working-directory: ./.github/workflows/smoke
run: NAME="smoke-multiport" MULTIPORT_TX=true MULTIPORT_RX=true MULTIPORT_HANDSHAKE=true ./build.sh

- name: run smoke
working-directory: ./.github/workflows/smoke
run: NAME="smoke-multiport" ./smoke.sh

timeout-minutes: 10
4 changes: 4 additions & 0 deletions .github/workflows/smoke/genconfig.sh
Original file line number Diff line number Diff line change
@@ -48,6 +48,10 @@ listen:

tun:
dev: ${TUN_DEV:-tun0}
multiport:
tx_enabled: ${MULTIPORT_TX:-false}
rx_enabled: ${MULTIPORT_RX:-false}
tx_handshake: ${MULTIPORT_HANDSHAKE:-false}

firewall:
inbound_action: reject
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -227,6 +227,10 @@ smoke-relay-docker: bin-docker
cd .github/workflows/smoke/ && ./build-relay.sh
cd .github/workflows/smoke/ && ./smoke-relay.sh

smoke-multiport-docker: bin-docker
cd .github/workflows/smoke/ && NAME="smoke-multiport" MULTIPORT_TX=true MULTIPORT_RX=true MULTIPORT_HANDSHAKE=true ./build.sh
cd .github/workflows/smoke/ && NAME="smoke-multiport" ./smoke.sh

smoke-docker-race: BUILD_ARGS = -race
smoke-docker-race: CGO_ENABLED = 1
smoke-docker-race: smoke-docker
41 changes: 41 additions & 0 deletions examples/config.yml
Original file line number Diff line number Diff line change
@@ -280,6 +280,47 @@ tun:
# SO_RCVBUFFORCE is used to avoid having to raise the system wide max
#use_system_route_table_buffer_size: 0

# EXPERIMENTAL: This option may change or disappear in the future.
# Multiport spreads outgoing UDP packets across multiple UDP send ports,
# which allows nebula to work around any issues on the underlay network.
# Some example issues this could work around:
# - UDP rate limits on a per flow basis.
# - Partial underlay network failure in which some flows work and some don't
# Agreement is done during the handshake to decide if multiport mode will
# be used for a given tunnel (one side must have tx_enabled set, the other
# side must have rx_enabled set)
#
# NOTE: you cannot use multiport on a host if you are relying on UDP hole
# punching to get through a NAT or firewall.
#
# NOTE: Linux only (uses raw sockets to send). Also currently only works
# with IPv4 underlay network remotes.
#
# The default values are listed below:
#multiport:
# This host support sending via multiple UDP ports.
#tx_enabled: false
#
# This host supports receiving packets sent from multiple UDP ports.
#rx_enabled: false
#
# How many UDP ports to use when sending. The lowest source port will be
# listen.port and go up to (but not including) listen.port + tx_ports.
#tx_ports: 100
#
# NOTE: All of your hosts must be running a version of Nebula that supports
# multiport if you want to enable this feature. Older versions of Nebula
# will be confused by these multiport handshakes.
#
# If handshakes are not getting a response, attempt to transmit handshakes
# using random UDP source ports (to get around partial underlay network
# failures).
#tx_handshake: false
#
# How many unresponded handshakes we should send before we attempt to
# send multiport handshakes.
#tx_handshake_delay: 2

# Configure logging level
logging:
# panic, fatal, error, warning, info, or debug. Default is info and is reloadable.
28 changes: 28 additions & 0 deletions firewall/packet.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package firewall
import (
"encoding/json"
"fmt"
mathrand "math/rand"
"net/netip"
)

@@ -60,3 +61,30 @@ func (fp Packet) MarshalJSON() ([]byte, error) {
"Fragment": fp.Fragment,
})
}

// UDPSendPort calculates the UDP port to send from when using multiport mode.
// The result will be from [0, numBuckets)
func (fp Packet) UDPSendPort(numBuckets int) uint16 {
if numBuckets <= 1 {
return 0
}

// If there is no port (like an ICMP packet), pick a random UDP send port
if fp.LocalPort == 0 {
return uint16(mathrand.Intn(numBuckets))
}

// A decent enough 32bit hash function
// Prospecting for Hash Functions
// - https://nullprogram.com/blog/2018/07/31/
// - https://github.com/skeeto/hash-prospector
// [16 21f0aaad 15 d35a2d97 15] = 0.10760229515479501
x := (uint32(fp.LocalPort) << 16) | uint32(fp.RemotePort)
x ^= x >> 16
x *= 0x21f0aaad
x ^= x >> 15
x *= 0xd35a2d97
x ^= x >> 15

return uint16(x) % uint16(numBuckets)
}
70 changes: 68 additions & 2 deletions handshake_ix.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/udp"
)

// NOISE IX Handshakes
@@ -69,6 +70,15 @@ func ixHandshakeStage0(f *Interface, hh *HandshakeHostInfo) bool {
},
}

if f.multiPort.Tx || f.multiPort.Rx {
hs.Details.InitiatorMultiPort = &MultiPortDetails{
RxSupported: f.multiPort.Rx,
TxSupported: f.multiPort.Tx,
BasePort: uint32(f.multiPort.TxBasePort),
TotalPorts: uint32(f.multiPort.TxPorts),
}
}

hsBytes, err := hs.Marshal()
if err != nil {
f.l.WithError(err).WithField("vpnAddrs", hh.hostinfo.vpnAddrs).
@@ -241,13 +251,35 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
return
}

var multiportTx, multiportRx bool
if f.multiPort.Rx || f.multiPort.Tx {
if hs.Details.InitiatorMultiPort != nil {
multiportTx = hs.Details.InitiatorMultiPort.RxSupported && f.multiPort.Tx
multiportRx = hs.Details.InitiatorMultiPort.TxSupported && f.multiPort.Rx
}

hs.Details.ResponderMultiPort = &MultiPortDetails{
TxSupported: f.multiPort.Tx,
RxSupported: f.multiPort.Rx,
BasePort: uint32(f.multiPort.TxBasePort),
TotalPorts: uint32(f.multiPort.TxPorts),
}
}
if hs.Details.InitiatorMultiPort != nil && hs.Details.InitiatorMultiPort.BasePort != uint32(addr.Port()) {
// The other side sent us a handshake from a different port, make sure
// we send responses back to the BasePort
addr = netip.AddrPortFrom(addr.Addr(), uint16(hs.Details.InitiatorMultiPort.BasePort))
}

hostinfo := &HostInfo{
ConnectionState: ci,
localIndexId: myIndex,
remoteIndexId: hs.Details.InitiatorIndex,
vpnAddrs: vpnAddrs,
HandshakePacket: make(map[uint8][]byte, 0),
lastHandshakeTime: hs.Details.Time,
multiportTx: multiportTx,
multiportRx: multiportRx,
relayState: RelayState{
relays: nil,
relayForByAddr: map[netip.Addr]*Relay{},
@@ -262,6 +294,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
WithField("issuer", issuer).
WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex).
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
WithField("multiportTx", multiportTx).WithField("multiportRx", multiportRx).
Info("Handshake message received")

hs.Details.ResponderIndex = myIndex
@@ -338,6 +371,10 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
if err != nil {
switch err {
case ErrAlreadySeen:
if hostinfo.multiportRx {
// The other host is sending to us with multiport, so only grab the IP
addr = netip.AddrPortFrom(addr.Addr(), hostinfo.remote.Port())
}
// Update remote if preferred
if existing.SetRemoteIfPreferred(f.hostMap, addr) {
// Send a test packet to ensure the other side has also switched to
@@ -348,7 +385,14 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
msg = existing.HandshakePacket[2]
f.messageMetrics.Tx(header.Handshake, header.MessageSubType(msg[1]), 1)
if addr.IsValid() {
err := f.outside.WriteTo(msg, addr)
if multiportTx {
// TODO remove alloc here
raw := make([]byte, len(msg)+udp.RawOverhead)
copy(raw[udp.RawOverhead:], msg)
err = f.udpRaw.WriteTo(raw, udp.RandomSendPort.UDPSendPort(f.multiPort.TxPorts), addr)
} else {
err = f.outside.WriteTo(msg, addr)
}
if err != nil {
f.l.WithField("vpnAddrs", existing.vpnAddrs).WithField("udpAddr", addr).
WithField("handshake", m{"stage": 2, "style": "ix_psk0"}).WithField("cached", true).
@@ -417,7 +461,14 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
// Do the send
f.messageMetrics.Tx(header.Handshake, header.MessageSubType(msg[1]), 1)
if addr.IsValid() {
err = f.outside.WriteTo(msg, addr)
if multiportTx {
// TODO remove alloc here
raw := make([]byte, len(msg)+udp.RawOverhead)
copy(raw[udp.RawOverhead:], msg)
err = f.udpRaw.WriteTo(raw, udp.RandomSendPort.UDPSendPort(f.multiPort.TxPorts), addr)
} else {
err = f.outside.WriteTo(msg, addr)
}
if err != nil {
f.l.WithField("vpnAddrs", vpnAddrs).WithField("udpAddr", addr).
WithField("certName", certName).
@@ -513,6 +564,20 @@ func ixHandshakeStage2(f *Interface, addr netip.AddrPort, via *ViaSender, hh *Ha
return true
}

if (f.multiPort.Tx || f.multiPort.Rx) && hs.Details.ResponderMultiPort != nil {
hostinfo.multiportTx = hs.Details.ResponderMultiPort.RxSupported && f.multiPort.Tx
hostinfo.multiportRx = hs.Details.ResponderMultiPort.TxSupported && f.multiPort.Rx
}

if hs.Details.ResponderMultiPort != nil && hs.Details.ResponderMultiPort.BasePort != uint32(addr.Port()) {
// The other side sent us a handshake from a different port, make sure
// we send responses back to the BasePort
addr = netip.AddrPortFrom(
addr.Addr(),
uint16(hs.Details.ResponderMultiPort.BasePort),
)
}

rc, err := cert.Recombine(cert.Version(hs.Details.CertVersion), hs.Details.Cert, ci.H.PeerStatic(), ci.Curve())
if err != nil {
f.l.WithError(err).WithField("udpAddr", addr).
@@ -644,6 +709,7 @@ func ixHandshakeStage2(f *Interface, addr netip.AddrPort, via *ViaSender, hh *Ha
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}).
WithField("durationNs", duration).
WithField("sentCachedPackets", len(hh.packetStore)).
WithField("multiportTx", hostinfo.multiportTx).WithField("multiportRx", hostinfo.multiportRx).
Info("Handshake message received")

// Build up the radix for the firewall if we have subnets in the cert
26 changes: 26 additions & 0 deletions handshake_manager.go
Original file line number Diff line number Diff line change
@@ -61,6 +61,9 @@ type HandshakeManager struct {
f *Interface
l *logrus.Logger

multiPort MultiPortConfig
udpRaw *udp.RawConn

// can be used to trigger outbound handshake for the given vpnIp
trigger chan netip.Addr
}
@@ -236,6 +239,7 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered

// Send the handshake to all known ips, stage 2 takes care of assigning the hostinfo.remote based on the first to reply
var sentTo []netip.AddrPort
var sentMultiport bool
hostinfo.remotes.ForEach(hm.mainHostMap.GetPreferredRanges(), func(addr netip.AddrPort, _ bool) {
hm.messageMetrics.Tx(header.Handshake, header.MessageSubType(hostinfo.HandshakePacket[0][1]), 1)
err := hm.outside.WriteTo(hostinfo.HandshakePacket[0], addr)
@@ -248,6 +252,27 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered
} else {
sentTo = append(sentTo, addr)
}

// Attempt a multiport handshake if we are past the TxHandshakeDelay attempts
if hm.multiPort.TxHandshake && hm.udpRaw != nil && hh.counter >= hm.multiPort.TxHandshakeDelay {
sentMultiport = true
// We need to re-allocate with 8 bytes at the start of SOCK_RAW
raw := hostinfo.HandshakePacket[0x80]
if raw == nil {
raw = make([]byte, len(hostinfo.HandshakePacket[0])+udp.RawOverhead)
copy(raw[udp.RawOverhead:], hostinfo.HandshakePacket[0])
hostinfo.HandshakePacket[0x80] = raw
}

hm.messageMetrics.Tx(header.Handshake, header.MessageSubType(hostinfo.HandshakePacket[0][1]), 1)
err = hm.udpRaw.WriteTo(raw, udp.RandomSendPort.UDPSendPort(hm.multiPort.TxPorts), addr)
if err != nil {
hostinfo.logger(hm.l).WithField("udpAddr", addr).
WithField("initiatorIndex", hostinfo.localIndexId).
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
WithError(err).Error("Failed to send handshake message")
}
}
})

// Don't be too noisy or confusing if we fail to send a handshake - if we don't get through we'll eventually log a timeout,
@@ -256,6 +281,7 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered
hostinfo.logger(hm.l).WithField("udpAddrs", sentTo).
WithField("initiatorIndex", hostinfo.localIndexId).
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
WithField("multiportHandshake", sentMultiport).
Info("Handshake message sent")
} else if hm.l.Level >= logrus.DebugLevel {
hostinfo.logger(hm.l).WithField("udpAddrs", sentTo).
6 changes: 6 additions & 0 deletions hostmap.go
Original file line number Diff line number Diff line change
@@ -232,6 +232,12 @@ type HostInfo struct {
networks *bart.Lite
relayState RelayState

// If true, we should send to this remote using multiport
multiportTx bool

// If true, we will receive from this remote using multiport
multiportRx bool

// HandshakePacket records the packets used to create this hostinfo
// We need these to avoid replayed handshake packets creating new hostinfos which causes churn
HandshakePacket map[uint8][]byte
46 changes: 38 additions & 8 deletions inside.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/slackhq/nebula/iputil"
"github.com/slackhq/nebula/noiseutil"
"github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/udp"
)

func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) {
@@ -68,7 +69,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet

dropReason := f.firewall.Drop(*fwPacket, false, hostinfo, f.pki.GetCAPool(), localCache)
if dropReason == nil {
f.sendNoMetrics(header.Message, 0, hostinfo.ConnectionState, hostinfo, netip.AddrPort{}, packet, nb, out, q)
f.sendNoMetrics(header.Message, 0, hostinfo.ConnectionState, hostinfo, netip.AddrPort{}, packet, nb, out, q, fwPacket)

} else {
f.rejectInside(packet, out, q)
@@ -117,7 +118,7 @@ func (f *Interface) rejectOutside(packet []byte, ci *ConnectionState, hostinfo *
return
}

f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, out, nb, packet, q)
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, out, nb, packet, q, nil)
}

// Handshake will attempt to initiate a tunnel with the provided vpn address if it is within our vpn networks. This is a no-op if the tunnel is already established or being established
@@ -228,7 +229,7 @@ func (f *Interface) sendMessageNow(t header.MessageType, st header.MessageSubTyp
return
}

f.sendNoMetrics(header.Message, st, hostinfo.ConnectionState, hostinfo, netip.AddrPort{}, p, nb, out, 0)
f.sendNoMetrics(header.Message, st, hostinfo.ConnectionState, hostinfo, netip.AddrPort{}, p, nb, out, 0, nil)
}

// SendMessageToVpnAddr handles real addr:port lookup and sends to the current best known address for vpnAddr
@@ -258,12 +259,12 @@ func (f *Interface) SendMessageToHostInfo(t header.MessageType, st header.Messag

func (f *Interface) send(t header.MessageType, st header.MessageSubType, ci *ConnectionState, hostinfo *HostInfo, p, nb, out []byte) {
f.messageMetrics.Tx(t, st, 1)
f.sendNoMetrics(t, st, ci, hostinfo, netip.AddrPort{}, p, nb, out, 0)
f.sendNoMetrics(t, st, ci, hostinfo, netip.AddrPort{}, p, nb, out, 0, nil)
}

func (f *Interface) sendTo(t header.MessageType, st header.MessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote netip.AddrPort, p, nb, out []byte) {
f.messageMetrics.Tx(t, st, 1)
f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out, 0)
f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out, 0, nil)
}

// SendVia sends a payload through a Relay tunnel. No authentication or encryption is done
@@ -331,10 +332,27 @@ func (f *Interface) SendVia(via *HostInfo,
f.connectionManager.RelayUsed(relay.LocalIndex)
}

func (f *Interface) sendNoMetrics(t header.MessageType, st header.MessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote netip.AddrPort, p, nb, out []byte, q int) {
func (f *Interface) sendNoMetrics(t header.MessageType, st header.MessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote netip.AddrPort, p, nb, out []byte, q int, udpPortGetter udp.SendPortGetter) {
if ci.eKey == nil {
return
}

multiport := f.multiPort.Tx && hostinfo.multiportTx
rawOut := out
if multiport {
if len(out) < udp.RawOverhead {
// NOTE: This is because some spots in the code send us `out[:0]`, so
// we need to expand the slice back out to get our 8 bytes back.
out = out[:udp.RawOverhead]
}
// Preserve bytes needed for the raw socket
out = out[udp.RawOverhead:]

if udpPortGetter == nil {
udpPortGetter = udp.RandomSendPort
}
}

useRelay := !remote.IsValid() && !hostinfo.remote.IsValid()
fullOut := out

@@ -384,13 +402,25 @@ func (f *Interface) sendNoMetrics(t header.MessageType, st header.MessageSubType
}

if remote.IsValid() {
err = f.writers[q].WriteTo(out, remote)
if multiport {
rawOut = rawOut[:len(out)+udp.RawOverhead]
port := udpPortGetter.UDPSendPort(f.multiPort.TxPorts)
err = f.udpRaw.WriteTo(rawOut, port, remote)
} else {
err = f.writers[q].WriteTo(out, remote)
}
if err != nil {
hostinfo.logger(f.l).WithError(err).
WithField("udpAddr", remote).Error("Failed to write outgoing packet")
}
} else if hostinfo.remote.IsValid() {
err = f.writers[q].WriteTo(out, hostinfo.remote)
if multiport {
rawOut = rawOut[:len(out)+udp.RawOverhead]
port := udpPortGetter.UDPSendPort(f.multiPort.TxPorts)
err = f.udpRaw.WriteTo(rawOut, port, hostinfo.remote)
} else {
err = f.writers[q].WriteTo(out, hostinfo.remote)
}
if err != nil {
hostinfo.logger(f.l).WithError(err).
WithField("udpAddr", remote).Error("Failed to write outgoing packet")
23 changes: 23 additions & 0 deletions interface.go
Original file line number Diff line number Diff line change
@@ -87,6 +87,9 @@ type Interface struct {

writers []udp.Conn
readers []io.ReadWriteCloser
udpRaw *udp.RawConn

multiPort MultiPortConfig

metricHandshakes metrics.Histogram
messageMetrics *MessageMetrics
@@ -95,6 +98,15 @@ type Interface struct {
l *logrus.Logger
}

type MultiPortConfig struct {
Tx bool
Rx bool
TxBasePort uint16
TxPorts int
TxHandshake bool
TxHandshakeDelay int64
}

type EncWriter interface {
SendVia(via *HostInfo,
relay *Relay,
@@ -224,6 +236,8 @@ func (f *Interface) activate() {

metrics.GetOrRegisterGauge("routines", nil).Update(int64(f.routines))

metrics.GetOrRegisterGauge("multiport.tx_ports", nil).Update(int64(f.multiPort.TxPorts))

// Prepare n tun queues
var reader io.ReadWriteCloser = f.inside
for i := 0; i < f.routines; i++ {
@@ -412,6 +426,8 @@ func (f *Interface) emitStats(ctx context.Context, i time.Duration) {

udpStats := udp.NewUDPStatsEmitter(f.writers)

var rawStats func()

certExpirationGauge := metrics.GetOrRegisterGauge("certificate.ttl_seconds", nil)
certInitiatingVersion := metrics.GetOrRegisterGauge("certificate.initiating_version", nil)
certMaxVersion := metrics.GetOrRegisterGauge("certificate.max_version", nil)
@@ -430,6 +446,13 @@ func (f *Interface) emitStats(ctx context.Context, i time.Duration) {
certExpirationGauge.Update(int64(defaultCrt.NotAfter().Sub(time.Now()) / time.Second))
certInitiatingVersion.Update(int64(defaultCrt.Version()))

if f.udpRaw != nil {
if rawStats == nil {
rawStats = udp.NewRawStatsEmitter(f.udpRaw)
}
rawStats()
}

// Report the max certificate version we are capable of using
if certState.v2Cert != nil {
certMaxVersion.Update(int64(certState.v2Cert.Version()))
33 changes: 33 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -255,6 +255,39 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
ifce.writers = udpConns
lightHouse.ifce = ifce

loadMultiPortConfig := func(c *config.C) {
ifce.multiPort.Rx = c.GetBool("tun.multiport.rx_enabled", false)

tx := c.GetBool("tun.multiport.tx_enabled", false)

if tx && ifce.udpRaw == nil {
ifce.udpRaw, err = udp.NewRawConn(l, c.GetString("listen.host", "0.0.0.0"), port, uint16(port))
if err != nil {
l.WithError(err).Error("Failed to get raw socket for tun.multiport.tx_enabled")
ifce.udpRaw = nil
tx = false
}
}

if tx {
ifce.multiPort.TxBasePort = uint16(port)
ifce.multiPort.TxPorts = c.GetInt("tun.multiport.tx_ports", 100)
ifce.multiPort.TxHandshake = c.GetBool("tun.multiport.tx_handshake", false)
ifce.multiPort.TxHandshakeDelay = int64(c.GetInt("tun.multiport.tx_handshake_delay", 2))
ifce.udpRaw.ReloadConfig(c)
}
ifce.multiPort.Tx = tx

// TODO: if we upstream this, make this cleaner
handshakeManager.udpRaw = ifce.udpRaw
handshakeManager.multiPort = ifce.multiPort

l.WithField("multiPort", ifce.multiPort).Info("Multiport configured")
}

loadMultiPortConfig(c)
c.RegisterReloadCallback(loadMultiPortConfig)

ifce.RegisterConfigChangeCallbacks(c)
ifce.reloadDisconnectInvalid(c)
ifce.reloadSendRecvError(c)
515 changes: 455 additions & 60 deletions nebula.pb.go

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions nebula.proto
Original file line number Diff line number Diff line change
@@ -65,15 +65,23 @@ message NebulaHandshake {
bytes Hmac = 2;
}

message MultiPortDetails {
bool RxSupported = 1;
bool TxSupported = 2;
uint32 BasePort = 3;
uint32 TotalPorts = 4;
}

message NebulaHandshakeDetails {
bytes Cert = 1;
uint32 InitiatorIndex = 2;
uint32 ResponderIndex = 3;
uint64 Cookie = 4;
uint64 Time = 5;
uint32 CertVersion = 8;
// reserved for WIP multiport
reserved 6, 7;

MultiPortDetails InitiatorMultiPort = 6;
MultiPortDetails ResponderMultiPort = 7;
}

message NebulaControl {
10 changes: 10 additions & 0 deletions outside.go
Original file line number Diff line number Diff line change
@@ -232,6 +232,16 @@ func (f *Interface) sendCloseTunnel(h *HostInfo) {

func (f *Interface) handleHostRoaming(hostinfo *HostInfo, udpAddr netip.AddrPort) {
if udpAddr.IsValid() && hostinfo.remote != udpAddr {
if hostinfo.multiportRx {
// If the remote is sending with multiport, we aren't roaming unless
// the IP has changed
if hostinfo.remote.Addr().Compare(udpAddr.Addr()) == 0 {
return
}
// Keep the port from the original hostinfo, because the remote is transmitting from multiport ports
udpAddr = netip.AddrPortFrom(udpAddr.Addr(), hostinfo.remote.Port())
}

if !f.lightHouse.GetRemoteAllowList().AllowAll(hostinfo.vpnAddrs, udpAddr.Addr()) {
hostinfo.logger(f.l).WithField("newAddr", udpAddr).Debug("lighthouse.remote_allow_list denied roaming")
return
16 changes: 16 additions & 0 deletions udp/udp_raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package udp

import mathrand "math/rand"

type SendPortGetter interface {
// UDPSendPort returns the port to use
UDPSendPort(maxPort int) uint16
}

type randomSendPort struct{}

func (randomSendPort) UDPSendPort(maxPort int) uint16 {
return uint16(mathrand.Intn(maxPort))
}

var RandomSendPort = randomSendPort{}
191 changes: 191 additions & 0 deletions udp/udp_raw_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//go:build !android && !e2e_testing
// +build !android,!e2e_testing

package udp

import (
"encoding/binary"
"fmt"
"net"
"net/netip"
"syscall"
"unsafe"

"github.com/rcrowley/go-metrics"
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula/config"
"golang.org/x/net/ipv4"
"golang.org/x/sys/unix"
)

// RawOverhead is the number of bytes that need to be reserved at the start of
// the raw bytes passed to (*RawConn).WriteTo. This is used by WriteTo to prefix
// the IP and UDP headers.
const RawOverhead = 28

type RawConn struct {
sysFd int
basePort uint16
l *logrus.Logger
}

func NewRawConn(l *logrus.Logger, ip string, port int, basePort uint16) (*RawConn, error) {
syscall.ForkLock.RLock()
// With IPPROTO_UDP, the linux kernel tries to deliver every UDP packet
// received in the system to our socket. This constantly overflows our
// buffer and marks our socket as having dropped packets. This makes the
// stats on the socket useless.
//
// In contrast, IPPROTO_RAW is not delivered any packets and thus our read
// buffer will not fill up and mark as having dropped packets. The only
// difference is that we have to assemble the IP header as well, but this
// is fairly easy since Linux does the checksum for us.
//
// TODO: How to get this working with Inet6 correctly? I was having issues
// with the source address when testing before, probably need to `bind(2)`?
fd, err := unix.Socket(unix.AF_INET, unix.SOCK_RAW, unix.IPPROTO_RAW)
if err == nil {
unix.CloseOnExec(fd)
}
syscall.ForkLock.RUnlock()
if err != nil {
return nil, err
}

// We only want to send, not recv. This will hopefully help the kernel avoid
// wasting time on us
if err = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, 0); err != nil {
return nil, fmt.Errorf("unable to set SO_RCVBUF: %s", err)
}

var lip [16]byte
copy(lip[:], net.ParseIP(ip))

// TODO do we need to `bind(2)` so that we send from the correct address/interface?
if err = unix.Bind(fd, &unix.SockaddrInet6{Addr: lip, Port: port}); err != nil {
return nil, fmt.Errorf("unable to bind to socket: %s", err)
}

return &RawConn{
sysFd: fd,
basePort: basePort,
l: l,
}, nil
}

// WriteTo must be called with raw leaving the first `udp.RawOverhead` bytes empty,
// for the IP/UDP headers.
func (u *RawConn) WriteTo(raw []byte, fromPort uint16, ip netip.AddrPort) error {
var rsa unix.RawSockaddrInet4
rsa.Family = unix.AF_INET
rsa.Addr = ip.Addr().As4()

totalLen := len(raw)
udpLen := totalLen - ipv4.HeaderLen

// IP header
raw[0] = byte(ipv4.Version<<4 | (ipv4.HeaderLen >> 2 & 0x0f))
raw[1] = 0 // tos
binary.BigEndian.PutUint16(raw[2:4], uint16(totalLen))
binary.BigEndian.PutUint16(raw[4:6], 0) // id (linux does it for us)
binary.BigEndian.PutUint16(raw[6:8], 0) // frag options
raw[8] = byte(64) // ttl
raw[9] = byte(17) // protocol
binary.BigEndian.PutUint16(raw[10:12], 0) // checksum (linux does it for us)
binary.BigEndian.PutUint32(raw[12:16], 0) // src (linux does it for us)
copy(raw[16:20], rsa.Addr[:]) // dst

// UDP header
fromPort = u.basePort + fromPort
binary.BigEndian.PutUint16(raw[20:22], uint16(fromPort)) // src port
binary.BigEndian.PutUint16(raw[22:24], uint16(ip.Port())) // dst port
binary.BigEndian.PutUint16(raw[24:26], uint16(udpLen)) // UDP length
binary.BigEndian.PutUint16(raw[26:28], 0) // checksum (optional)

for {
_, _, err := unix.Syscall6(
unix.SYS_SENDTO,
uintptr(u.sysFd),
uintptr(unsafe.Pointer(&raw[0])),
uintptr(len(raw)),
uintptr(0),
uintptr(unsafe.Pointer(&rsa)),
uintptr(unix.SizeofSockaddrInet4),
)

if err != 0 {
return &net.OpError{Op: "sendto", Err: err}
}

//TODO: handle incomplete writes

return nil
}
}

func (u *RawConn) ReloadConfig(c *config.C) {
b := c.GetInt("listen.write_buffer", 0)
if b <= 0 {
return
}

if err := u.SetSendBuffer(b); err != nil {
u.l.WithError(err).Error("Failed to set listen.write_buffer")
return
}

s, err := u.GetSendBuffer()
if err != nil {
u.l.WithError(err).Warn("Failed to get listen.write_buffer")
return
}

u.l.WithField("size", s).Info("listen.write_buffer was set")
}

func (u *RawConn) SetSendBuffer(n int) error {
return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_SNDBUFFORCE, n)
}

func (u *RawConn) GetSendBuffer() (int, error) {
return unix.GetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_SNDBUF)
}

func (u *RawConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {
var vallen uint32 = 4 * unix.SK_MEMINFO_VARS
_, _, err := unix.Syscall6(unix.SYS_GETSOCKOPT, uintptr(u.sysFd), uintptr(unix.SOL_SOCKET), uintptr(unix.SO_MEMINFO), uintptr(unsafe.Pointer(meminfo)), uintptr(unsafe.Pointer(&vallen)), 0)
if err != 0 {
return err
}
return nil
}

func NewRawStatsEmitter(rawConn *RawConn) func() {
// Check if our kernel supports SO_MEMINFO before registering the gauges
var gauges [unix.SK_MEMINFO_VARS]metrics.Gauge
var meminfo [unix.SK_MEMINFO_VARS]uint32
if err := rawConn.getMemInfo(&meminfo); err == nil {
gauges = [unix.SK_MEMINFO_VARS]metrics.Gauge{
metrics.GetOrRegisterGauge("raw.rmem_alloc", nil),
metrics.GetOrRegisterGauge("raw.rcvbuf", nil),
metrics.GetOrRegisterGauge("raw.wmem_alloc", nil),
metrics.GetOrRegisterGauge("raw.sndbuf", nil),
metrics.GetOrRegisterGauge("raw.fwd_alloc", nil),
metrics.GetOrRegisterGauge("raw.wmem_queued", nil),
metrics.GetOrRegisterGauge("raw.optmem", nil),
metrics.GetOrRegisterGauge("raw.backlog", nil),
metrics.GetOrRegisterGauge("raw.drops", nil),
}
} else {
// return no-op because we don't support SO_MEMINFO
return func() {}
}

return func() {
if err := rawConn.getMemInfo(&meminfo); err == nil {
for j := 0; j < unix.SK_MEMINFO_VARS; j++ {
gauges[j].Update(int64(meminfo[j]))
}
}
}
}
29 changes: 29 additions & 0 deletions udp/udp_raw_unsupported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//go:build !linux || android || e2e_testing
// +build !linux android e2e_testing

package udp

import (
"fmt"
"net/netip"
"runtime"

"github.com/sirupsen/logrus"
"github.com/slackhq/nebula/config"
)

const RawOverhead = 0

type RawConn struct{}

func NewRawConn(l *logrus.Logger, ip string, port int, basePort uint16) (*RawConn, error) {
return nil, fmt.Errorf("multiport tx is not supported on %s", runtime.GOOS)
}

func (u *RawConn) WriteTo(raw []byte, fromPort uint16, addr netip.AddrPort) error {
return fmt.Errorf("multiport tx is not supported on %s", runtime.GOOS)
}

func (u *RawConn) ReloadConfig(c *config.C) {}

func NewRawStatsEmitter(rawConn *RawConn) func() { return func() {} }