Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/armon/go-radix v1.0.0
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432
github.com/flynn/noise v1.1.0
github.com/gaissmai/bart v0.26.1
github.com/gaissmai/bart v0.27.1
github.com/gogo/protobuf v1.3.2
github.com/google/gopacket v1.1.19
github.com/kardianos/service v1.2.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/gaissmai/bart v0.26.1 h1:+w4rnLGNlA2GDVn382Tfe3jOsK5vOr5n4KmigJ9lbTo=
github.com/gaissmai/bart v0.26.1/go.mod h1:GREWQfTLRWz/c5FTOsIw+KkscuFkIV5t8Rp7Nd1Td5c=
github.com/gaissmai/bart v0.27.1 h1:FysPzqETMJa8q9rNkLW5peT1hq25nLOz8ksHbSVoiAk=
github.com/gaissmai/bart v0.27.1/go.mod h1:GREWQfTLRWz/c5FTOsIw+KkscuFkIV5t8Rp7Nd1Td5c=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
Expand Down
6 changes: 2 additions & 4 deletions handshake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type HandshakeHostInfo struct {
initiatingVersionOverride cert.Version // Should we use a non-default cert version for this handshake?
counter int64 // How many attempts have we made so far
lastRemotes []netip.AddrPort // Remotes that we sent to during the previous attempt
lastRelays []netip.Addr // Relays we attempted to use during the previous attempt
packetStore []*cachedPacket // A set of packets to be transmitted once the handshake completes

hostinfo *HostInfo
Expand Down Expand Up @@ -217,7 +218,6 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered
fields := []any{
"udpAddrs", hh.hostinfo.remotes.CopyAddrs(hm.mainHostMap.GetPreferredRanges()),
"initiatorIndex", hh.hostinfo.localIndexId,
"remoteIndex", hh.hostinfo.remoteIndexId,
"durationNs", time.Since(hh.startTime).Nanoseconds(),
}
// hh.machine can be nil here if buildStage0Packet never succeeded
Expand Down Expand Up @@ -323,7 +323,7 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered
)
}

hm.f.relayManager.StartRelays(hm.f, vpnIp, hostinfo, stage0)
hm.f.relayManager.StartRelays(hm.f, vpnIp, hh, stage0)

// If a lighthouse triggered this attempt then we are still in the timer wheel and do not need to re-add
if !lighthouseTriggered {
Expand Down Expand Up @@ -465,7 +465,6 @@ func (hm *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket
// We have a collision, but this can happen since we can't control
// the remote ID. Just log about the situation as a note.
hostinfo.logger(hm.l).Info("New host shadows existing host remoteIndex",
"remoteIndex", hostinfo.remoteIndexId,
"collision", existingRemoteIndex.vpnAddrs,
)
}
Expand All @@ -488,7 +487,6 @@ func (hm *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) {
// We have a collision, but this can happen since we can't control
// the remote ID. Just log about the situation as a note.
hostinfo.logger(hm.l).Info("New host shadows existing host remoteIndex",
"remoteIndex", hostinfo.remoteIndexId,
"collision", existingRemoteIndex.vpnAddrs,
)
}
Expand Down
1 change: 0 additions & 1 deletion inside.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ func (f *Interface) sendNoMetrics(t header.MessageType, st header.MessageSubType
"error", err,
"udpAddr", remote,
"counter", c,
"attemptedCounter", c,
)
return
}
Expand Down
5 changes: 2 additions & 3 deletions outside.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender,
// The only way this happens is if hostmap has an index to the correct HostInfo, but the HostInfo is missing
// its internal mapping. This should never happen.
hostinfo.logger(f.l).Error("HostInfo missing remote relay index",
"vpnAddrs", hostinfo.vpnAddrs,
"remoteIndex", h.RemoteIndex,
"relayRemoteIndex", h.RemoteIndex,
)
return
}
Expand All @@ -218,8 +217,8 @@ func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender,
if err != nil {
hostinfo.logger(f.l).Info("Failed to find target host info by ip",
"relayTo", relay.PeerAddr,
"relayFrom", hostinfo.vpnAddrs[0],
"error", err,
"hostinfo.vpnAddrs", hostinfo.vpnAddrs,
)
return
}
Expand Down
55 changes: 37 additions & 18 deletions relay_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"net/netip"
"slices"
"sync/atomic"

"github.com/slackhq/nebula/cert"
Expand Down Expand Up @@ -57,14 +58,25 @@ func (rm *relayManager) GetUseRelays() bool {
// For each candidate relay it either kicks off a handshake to the relay, sends a CreateRelayRequest, retransmits
// one that may have been lost, or, once the relay is Established, forwards the in-progress
// stage 0 handshake packet for vpnIp through it.
func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *HostInfo, stage0 []byte) {
func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hh *HandshakeHostInfo, stage0 []byte) {
hostinfo := hh.hostinfo
if !rm.GetUseRelays() || len(hostinfo.remotes.relays) == 0 {
hh.lastRelays = nil
return
}

hostinfo.logger(rm.l).Info("Attempt to relay through hosts", "relays", hostinfo.remotes.relays)
relays := hostinfo.remotes.relays
listLevel := slog.LevelDebug
prior := hh.lastRelays
if !slices.Equal(relays, prior) {
listLevel = slog.LevelInfo
hh.lastRelays = slices.Clone(relays)
}
hl := hostinfo.logger(rm.l)
hl.Log(context.Background(), listLevel, "Attempt to relay through hosts", "relays", relays)

// Send a RelayRequest to all known Relay IP's
for _, relay := range hostinfo.remotes.relays {
for _, relay := range relays {
// Don't relay through the host I'm trying to connect to
if relay == vpnIp {
continue
Expand All @@ -75,20 +87,27 @@ func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *Ho
continue
}

// Each relay's per-attempt log fires at Info on the first time we hit it and Debug after that.
level := slog.LevelInfo
if slices.Contains(prior, relay) {
level = slog.LevelDebug
}

relayHostInfo := rm.hostmap.QueryVpnAddr(relay)
if relayHostInfo == nil || !relayHostInfo.remote.IsValid() {
hostinfo.logger(rm.l).Info("Establish tunnel to relay target", "relay", relay.String())
hl.Log(context.Background(), level, "Establish tunnel to relay target", "relay", relay.String())
f.Handshake(relay)
continue
}

// Check the relay HostInfo to see if we already established a relay through
existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp)
if !ok {
// No relays exist or requested yet.
if relayHostInfo.remote.IsValid() {
idx, err := AddRelay(rm.l, relayHostInfo, rm.hostmap, vpnIp, nil, TerminalType, Requested)
if err != nil {
hostinfo.logger(rm.l).Info("Failed to add relay to hostmap", "relay", relay.String(), "error", err)
hl.Info("Failed to add relay to hostmap", "relay", relay.String(), "error", err)
}

m := NebulaControl{
Expand All @@ -99,12 +118,12 @@ func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *Ho
switch relayHostInfo.GetCert().Certificate.Version() {
case cert.Version1:
if !f.myVpnAddrs[0].Is4() {
hostinfo.logger(rm.l).Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version")
hl.Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version")
continue
}

if !vpnIp.Is4() {
hostinfo.logger(rm.l).Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version")
hl.Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version")
continue
}

Expand All @@ -116,16 +135,16 @@ func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *Ho
m.RelayFromAddr = netAddrToProtoAddr(f.myVpnAddrs[0])
m.RelayToAddr = netAddrToProtoAddr(vpnIp)
default:
hostinfo.logger(rm.l).Error("Unknown certificate version found while creating relay")
hl.Error("Unknown certificate version found while creating relay")
continue
}

msg, err := m.Marshal()
if err != nil {
hostinfo.logger(rm.l).Error("Failed to marshal Control message to create relay", "error", err)
hl.Error("Failed to marshal Control message to create relay", "error", err)
} else {
f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu))
rm.l.Info("send CreateRelayRequest",
rm.l.Log(context.Background(), level, "send CreateRelayRequest",
"relayFrom", f.myVpnAddrs[0],
"relayTo", vpnIp,
"initiatorRelayIndex", idx,
Expand All @@ -138,14 +157,14 @@ func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *Ho

switch existingRelay.State {
case Established:
hostinfo.logger(rm.l).Info("Send handshake via relay", "relay", relay.String())
hl.Log(context.Background(), level, "Send handshake via relay", "relay", relay.String())
f.SendVia(relayHostInfo, existingRelay, stage0, make([]byte, 12), make([]byte, mtu), false)
case Disestablished:
// Mark this relay as 'requested'
relayHostInfo.relayState.UpdateRelayForByIpState(vpnIp, Requested)
fallthrough
case Requested:
hostinfo.logger(rm.l).Info("Re-send CreateRelay request", "relay", relay.String())
hl.Log(context.Background(), level, "Re-send CreateRelay request", "relay", relay.String())
// Re-send the CreateRelay request, in case the previous one was lost.
m := NebulaControl{
Type: NebulaControl_CreateRelayRequest,
Expand All @@ -155,12 +174,12 @@ func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *Ho
switch relayHostInfo.GetCert().Certificate.Version() {
case cert.Version1:
if !f.myVpnAddrs[0].Is4() {
hostinfo.logger(rm.l).Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version")
hl.Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version")
continue
}

if !vpnIp.Is4() {
hostinfo.logger(rm.l).Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version")
hl.Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version")
continue
}

Expand All @@ -172,16 +191,16 @@ func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *Ho
m.RelayFromAddr = netAddrToProtoAddr(f.myVpnAddrs[0])
m.RelayToAddr = netAddrToProtoAddr(vpnIp)
default:
hostinfo.logger(rm.l).Error("Unknown certificate version found while creating relay")
hl.Error("Unknown certificate version found while creating relay")
continue
}
msg, err := m.Marshal()
if err != nil {
hostinfo.logger(rm.l).Error("Failed to marshal Control message to create relay", "error", err)
hl.Error("Failed to marshal Control message to create relay", "error", err)
} else {
// This must send over the hostinfo, not over hm.Hosts[ip]
f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu))
rm.l.Info("send CreateRelayRequest",
rm.l.Log(context.Background(), level, "send CreateRelayRequest",
"relayFrom", f.myVpnAddrs[0],
"relayTo", vpnIp,
"initiatorRelayIndex", existingRelay.LocalIndex,
Expand All @@ -192,7 +211,7 @@ func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *Ho
// PeerRequested only occurs in Forwarding relays, not Terminal relays, and this is a Terminal relay case.
fallthrough
default:
hostinfo.logger(rm.l).Error("Relay unexpected state",
hl.Error("Relay unexpected state",
"vpnIp", vpnIp,
"state", existingRelay.State,
"relay", relay,
Expand Down
97 changes: 97 additions & 0 deletions relay_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package nebula

import (
"bytes"
"log/slog"
"net/netip"
"testing"

"github.com/gaissmai/bart"
"github.com/slackhq/nebula/test"
"github.com/stretchr/testify/assert"
)

// TestStartRelaysLogDedupe verifies that repeated attempts with the same relay set drop the log
// chatter to Debug, mirroring how the normal handshake retry loop quiets down once it's already
// announced its targets.
func TestStartRelaysLogDedupe(t *testing.T) {
vpnIp := netip.MustParseAddr("100.64.99.4")
otherRelay := netip.MustParseAddr("100.64.99.5")

newHH := func() *HandshakeHostInfo {
// Use the target's own vpnIp as the "relay" so the loop body skips it without
// touching any sender-side state. That isolates the test to the level-selection
// behavior of the top-level "Attempt to relay through hosts" log.
hostinfo := &HostInfo{
vpnAddrs: []netip.Addr{vpnIp},
localIndexId: 1,
remotes: NewRemoteList([]netip.Addr{vpnIp}, nil),
}
hostinfo.remotes.relays = []netip.Addr{vpnIp}
return &HandshakeHostInfo{hostinfo: hostinfo}
}

// Park any extra relay addresses we'll introduce mid-test in myVpnAddrsTable so the loop
// body always skips before touching f.Handshake (which would need a real handshakeManager).
addrTable := new(bart.Lite)
addrTable.Insert(netip.PrefixFrom(otherRelay, otherRelay.BitLen()))
f := &Interface{myVpnAddrsTable: addrTable}

newRM := func(buf *bytes.Buffer) *relayManager {
l := test.NewLoggerWithOutputAndLevel(buf, slog.LevelDebug)
rm := &relayManager{l: l, hostmap: newHostMap(l)}
rm.useRelays.Store(true)
return rm
}

const msg = `msg="Attempt to relay through hosts"`

t.Run("first attempt logs at Info", func(t *testing.T) {
var buf bytes.Buffer
rm := newRM(&buf)
hh := newHH()
rm.StartRelays(f, vpnIp, hh, nil)
assert.Equal(t, []netip.Addr{vpnIp}, hh.lastRelays, "lastRelays should record the relay set we just attempted")
assert.Contains(t, buf.String(), "level=INFO "+msg, "expected Info level on first attempt")
})

t.Run("repeat attempt with same relays drops to Debug", func(t *testing.T) {
var buf bytes.Buffer
rm := newRM(&buf)
hh := newHH()
rm.StartRelays(f, vpnIp, hh, nil)
first := append([]netip.Addr(nil), hh.lastRelays...)
buf.Reset()
rm.StartRelays(f, vpnIp, hh, nil)
assert.Equal(t, first, hh.lastRelays)
assert.Contains(t, buf.String(), "level=DEBUG "+msg, "expected Debug level on identical retry")
assert.NotContains(t, buf.String(), "level=INFO "+msg, "Info should not fire on identical retry")
})

t.Run("changed relay list bumps back to Info", func(t *testing.T) {
var buf bytes.Buffer
rm := newRM(&buf)
hh := newHH()
rm.StartRelays(f, vpnIp, hh, nil)
buf.Reset()

// The lighthouse handed us a new set this round.
hh.hostinfo.remotes.relays = []netip.Addr{vpnIp, otherRelay}

rm.StartRelays(f, vpnIp, hh, nil)
assert.Equal(t, []netip.Addr{vpnIp, otherRelay}, hh.lastRelays)
assert.Contains(t, buf.String(), "level=INFO "+msg, "expected Info when the relay list changes")
})

t.Run("disabled relays clears lastRelays and emits no Attempt log", func(t *testing.T) {
var buf bytes.Buffer
rm := newRM(&buf)
rm.useRelays.Store(false)
hh := newHH()
hh.lastRelays = []netip.Addr{vpnIp}

rm.StartRelays(f, vpnIp, hh, nil)
assert.Nil(t, hh.lastRelays, "with relays disabled lastRelays should be cleared")
assert.NotContains(t, buf.String(), msg, "should not log when we shortcut out")
})
}
Loading