Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,28 @@ tun:

# Unsafe routes allows you to route traffic over nebula to non-nebula nodes
# Unsafe routes should be avoided unless you have hosts/services that cannot run nebula
# NOTE: The nebula certificate of the "via" node *MUST* have the "route" defined as a subnet in its certificate
# Supports weighted ECMP if you define a list of gateways, this can be used for load balancing or redundancy to hosts outside of nebula
# NOTES:
# * You will only see a single gateway in the routing table if you are not on linux
# * If a gateway is not reachable through the overlay another gateway will be selected to send the traffic through, ignoring weights
#
# unsafe_routes:
# # Multiple gateways without defining a weight defaults to a weight of 1, this will balance traffic equally between the three gateways
# - route: 192.168.87.0/24
# via:
# - gateway: 10.0.0.1
# - gateway: 10.0.0.2
# - gateway: 10.0.0.3
# # Multiple gateways with a weight, this will balance traffic accordingly
# - route: 192.168.87.0/24
# via:
# - gateway: 10.0.0.1
# weight: 10
# - gateway: 10.0.0.2
# weight: 5
#
# NOTE: The nebula certificate of the "via" node(s) *MUST* have the "route" defined as a subnet in its certificate
# `via`: single node or list of gateways to use for this route
# `mtu`: will default to tun mtu if this option is not specified
# `metric`: will default to 0 if this option is not specified
# `install`: will default to true, controls whether this route is installed in the systems routing table.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
google.golang.org/protobuf v1.36.5
gopkg.in/yaml.v2 v2.4.0
gvisor.dev/gvisor v0.0.0-20240423190808-9d7a357edefe
github.com/zeebo/xxh3 v1.0.2
)

require (
Expand All @@ -54,4 +55,5 @@ require (
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.22.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
Expand Down Expand Up @@ -151,6 +153,8 @@ github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1Y
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
118 changes: 108 additions & 10 deletions inside.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/iputil"
"github.com/slackhq/nebula/noiseutil"
"github.com/slackhq/nebula/routing"
"github.com/zeebo/xxh3"
)

func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) {
Expand Down Expand Up @@ -49,7 +51,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
return
}

hostinfo, ready := f.getOrHandshake(fwPacket.RemoteAddr, func(hh *HandshakeHostInfo) {
hostinfo, ready := f.getOrHandshakeConsiderRouting(fwPacket, func(hh *HandshakeHostInfo) {
hh.cachePacket(f.l, header.Message, 0, packet, f.sendMessageNow, f.cachedPacketMetrics)
})

Expand Down Expand Up @@ -121,22 +123,118 @@ func (f *Interface) rejectOutside(packet []byte, ci *ConnectionState, hostinfo *
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, out, nb, packet, q)
}

// This should only called for internal nebula traffic
func (f *Interface) Handshake(vpnAddr netip.Addr) {
f.getOrHandshake(vpnAddr, nil)
f.getOrHandshakeNoRouting(vpnAddr, nil)
}

// getOrHandshake returns nil if the vpnAddr is not routable.
// getOrHandshakeNoRouting returns nil if the vpnAddr is not routable.
// If the 2nd return var is false then the hostinfo is not ready to be used in a tunnel
func (f *Interface) getOrHandshake(vpnAddr netip.Addr, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {
func (f *Interface) getOrHandshakeNoRouting(vpnAddr netip.Addr, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {
_, found := f.myVpnNetworksTable.Lookup(vpnAddr)
if !found {
vpnAddr = f.inside.RouteFor(vpnAddr)
if !vpnAddr.IsValid() {
return nil, false
if found {
return f.handshakeManager.GetOrHandshake(vpnAddr, cacheCallback)
}

return nil, false
}

func hashPacket(p *firewall.Packet) int {
hasher := xxh3.Hasher{}

hasher.Write(p.LocalAddr.AsSlice())
hasher.Write(p.RemoteAddr.AsSlice())
hasher.Write([]byte{
byte(p.LocalPort & 0xFF),
byte((p.LocalPort >> 8) & 0xFF),
byte(p.RemotePort & 0xFF),
byte((p.RemotePort >> 8) & 0xFF),
byte(p.Protocol),
})

// Uses xxh3 as it is a fast hash with good distribution
return int(hasher.Sum64() & 0x7FFFFFFF)
}

func balancePacket(fwPacket *firewall.Packet, gateways []routing.Gateway) netip.Addr {
hash := hashPacket(fwPacket)

for i := range gateways {
if hash <= gateways[i].UpperBound() {
return gateways[i].Ip()
}
}

return f.handshakeManager.GetOrHandshake(vpnAddr, cacheCallback)
// This should never happen
panic("The packet hash value should always fall inside a gateway bucket")
}

// This is called for external traffic
// getOrHandshake returns nil if the vpnIp is not routable.
// If the 2nd return var is false then the hostinfo is not ready to be used in a tunnel
func (f *Interface) getOrHandshakeConsiderRouting(fwPacket *firewall.Packet, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {

destinationIp := fwPacket.RemoteAddr

hostinfo, ready := f.getOrHandshakeNoRouting(destinationIp, cacheCallback)

// Host is inside the mesh, no routing required
if hostinfo != nil {
return hostinfo, ready
}

gateways := f.inside.RoutesFor(destinationIp)
if len(gateways) == 0 {
return nil, false
} else if len(gateways) == 1 {
// Single gateway route
return f.handshakeManager.GetOrHandshake(gateways[0].Ip(), cacheCallback)
} else {
// Multi gateway route, perform ECMP categorization
gatewayIp := balancePacket(fwPacket, gateways)

// Do not pass a cacheCallback, if the node is not reachable we will attempt other nodes
// so we don't want to cache the packet to avoid sending duplicate packets if this gateway comes back up.
if hostInfo, ready := f.handshakeManager.GetOrHandshake(gatewayIp, nil); ready {
return hostInfo, true
}

// It appears the selected gateway cannot be reached, find another gateway to fallback on.
// The current implementation breaks ECMP but that seems better than no connectivity.
// If ECMP is also required when a gateway is down then connectivity status
// for each gateway nees to be kept and the weights recalculated when they go up or down.
// This would also need to interact with unsafe_route updates through reloading the config or
// use of the use_system_route_table option

if f.l.Level >= logrus.DebugLevel {
f.l.WithField("destination", destinationIp).
WithField("originalGateway", gatewayIp).
Debugln("Calculated gateway for ECMP not available, attempting other gateways")
}

var hostInfo *HostInfo
var ready bool
var handshakeHostInfo *HandshakeHostInfo
var hhReceiver = func(hh *HandshakeHostInfo) {
handshakeHostInfo = hh
}

for i := range gateways {
// Skip the gateway that failed previously
if gateways[i].Ip() == gatewayIp {
continue
}

// Store the HandshakeHostInfo for the cache callback
if hostInfo, ready = f.handshakeManager.GetOrHandshake(gateways[i].Ip(), hhReceiver); ready {
return hostInfo, true
}
}

// No gateways reachable, cache packet in last gateway attempted
cacheCallback(handshakeHostInfo)
return hostInfo, false
}
}

func (f *Interface) sendMessageNow(t header.MessageType, st header.MessageSubType, hostinfo *HostInfo, p, nb, out []byte) {
Expand All @@ -163,7 +261,7 @@ func (f *Interface) sendMessageNow(t header.MessageType, st header.MessageSubTyp

// SendMessageToVpnAddr handles real addr:port lookup and sends to the current best known address for vpnAddr
func (f *Interface) SendMessageToVpnAddr(t header.MessageType, st header.MessageSubType, vpnAddr netip.Addr, p, nb, out []byte) {
hostInfo, ready := f.getOrHandshake(vpnAddr, func(hh *HandshakeHostInfo) {
hostInfo, ready := f.getOrHandshakeNoRouting(vpnAddr, func(hh *HandshakeHostInfo) {
hh.cachePacket(f.l, t, st, p, f.SendMessageToHostInfo, f.cachedPacketMetrics)
})

Expand Down
102 changes: 102 additions & 0 deletions inside_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package nebula

import (
"net/netip"
"testing"

"github.com/slackhq/nebula/firewall"
"github.com/slackhq/nebula/routing"
"github.com/stretchr/testify/assert"
)

func TestPacketsAreBalancedEqually(t *testing.T) {

gateways := []routing.Gateway{}

gw1Ip := netip.MustParseAddr("1.0.0.1")
gw2Ip := netip.MustParseAddr("1.0.0.2")
gw3Ip := netip.MustParseAddr("1.0.0.3")

gateways = append(gateways, routing.NewGateway(gw1Ip, 1))
gateways = append(gateways, routing.NewGateway(gw2Ip, 1))
gateways = append(gateways, routing.NewGateway(gw3Ip, 1))

routing.RebalanceGateways(gateways)

gw1count := 0
gw2count := 0
gw3count := 0

iterationCount := uint16(65535)
for i := uint16(0); i < iterationCount; i++ {
packet := firewall.Packet{
LocalAddr: netip.MustParseAddr("192.168.1.1"),
RemoteAddr: netip.MustParseAddr("10.0.0.1"),
LocalPort: i,
RemotePort: 65535 - i,
Protocol: 6, // TCP
Fragment: false,
}

selectedGw := balancePacket(&packet, gateways)

switch selectedGw {
case gw1Ip:
gw1count += 1
case gw2Ip:
gw2count += 1
case gw3Ip:
gw3count += 1
}

}

// Assert packets are balanced, allow variation of up to 100 packets per gateway
assert.InDeltaf(t, iterationCount/3, gw1count, 100, "Expected %d +/- 100, but got %d", iterationCount/3, gw1count)
assert.InDeltaf(t, iterationCount/3, gw2count, 100, "Expected %d +/- 100, but got %d", iterationCount/3, gw1count)
assert.InDeltaf(t, iterationCount/3, gw3count, 100, "Expected %d +/- 100, but got %d", iterationCount/3, gw1count)

}

func TestPacketsAreBalancedByPriority(t *testing.T) {

gateways := []routing.Gateway{}

gw1Ip := netip.MustParseAddr("1.0.0.1")
gw2Ip := netip.MustParseAddr("1.0.0.2")

gateways = append(gateways, routing.NewGateway(gw1Ip, 10))
gateways = append(gateways, routing.NewGateway(gw2Ip, 5))

routing.RebalanceGateways(gateways)

gw1count := 0
gw2count := 0

iterationCount := uint16(65535)
for i := uint16(0); i < iterationCount; i++ {
packet := firewall.Packet{
LocalAddr: netip.MustParseAddr("192.168.1.1"),
RemoteAddr: netip.MustParseAddr("10.0.0.1"),
LocalPort: i,
RemotePort: 65535 - i,
Protocol: 6, // TCP
Fragment: false,
}

selectedGw := balancePacket(&packet, gateways)

switch selectedGw {
case gw1Ip:
gw1count += 1
case gw2Ip:
gw2count += 1
}

}

iterationCountAsFloat := float32(iterationCount)

assert.InDeltaf(t, iterationCountAsFloat*(2.0/3.0), gw1count, 100, "Expected %d +/- 100, but got %d", iterationCountAsFloat*(2.0/3.0), gw1count)
assert.InDeltaf(t, iterationCountAsFloat*(1.0/3.0), gw2count, 100, "Expected %d +/- 100, but got %d", iterationCountAsFloat*(1.0/3.0), gw2count)
}
4 changes: 3 additions & 1 deletion overlay/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package overlay
import (
"io"
"net/netip"

"github.com/slackhq/nebula/routing"
)

type Device interface {
io.ReadWriteCloser
Activate() error
Networks() []netip.Prefix
Name() string
RouteFor(netip.Addr) netip.Addr
RoutesFor(netip.Addr) []routing.Gateway
NewMultiQueueReader() (io.ReadWriteCloser, error)
}
Loading
Loading