Skip to content

Commit 072edd5

Browse files
authored
Fix re-entrant GetOrHandshake issues (#1044)
1 parent beb5f6b commit 072edd5

8 files changed

+74
-36
lines changed

connection_manager.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const (
2323
swapPrimary trafficDecision = 3
2424
migrateRelays trafficDecision = 4
2525
tryRehandshake trafficDecision = 5
26+
sendTestPacket trafficDecision = 6
2627
)
2728

2829
type connectionManager struct {
@@ -176,7 +177,7 @@ func (n *connectionManager) Run(ctx context.Context) {
176177
}
177178

178179
func (n *connectionManager) doTrafficCheck(localIndex uint32, p, nb, out []byte, now time.Time) {
179-
decision, hostinfo, primary := n.makeTrafficDecision(localIndex, p, nb, out, now)
180+
decision, hostinfo, primary := n.makeTrafficDecision(localIndex, now)
180181

181182
switch decision {
182183
case deleteTunnel:
@@ -197,6 +198,9 @@ func (n *connectionManager) doTrafficCheck(localIndex uint32, p, nb, out []byte,
197198

198199
case tryRehandshake:
199200
n.tryRehandshake(hostinfo)
201+
202+
case sendTestPacket:
203+
n.intf.SendMessageToHostInfo(header.Test, header.TestRequest, hostinfo, p, nb, out)
200204
}
201205

202206
n.resetRelayTrafficCheck(hostinfo)
@@ -289,7 +293,7 @@ func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo)
289293
}
290294
}
291295

292-
func (n *connectionManager) makeTrafficDecision(localIndex uint32, p, nb, out []byte, now time.Time) (trafficDecision, *HostInfo, *HostInfo) {
296+
func (n *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time) (trafficDecision, *HostInfo, *HostInfo) {
293297
n.hostMap.RLock()
294298
defer n.hostMap.RUnlock()
295299

@@ -356,6 +360,7 @@ func (n *connectionManager) makeTrafficDecision(localIndex uint32, p, nb, out []
356360
return deleteTunnel, hostinfo, nil
357361
}
358362

363+
decision := doNothing
359364
if hostinfo != nil && hostinfo.ConnectionState != nil && mainHostInfo {
360365
if !outTraffic {
361366
// If we aren't sending or receiving traffic then its an unused tunnel and we don't to test the tunnel.
@@ -380,7 +385,7 @@ func (n *connectionManager) makeTrafficDecision(localIndex uint32, p, nb, out []
380385
}
381386

382387
// Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
383-
n.intf.SendMessageToHostInfo(header.Test, header.TestRequest, hostinfo, p, nb, out)
388+
decision = sendTestPacket
384389

385390
} else {
386391
if n.l.Level >= logrus.DebugLevel {
@@ -390,7 +395,7 @@ func (n *connectionManager) makeTrafficDecision(localIndex uint32, p, nb, out []
390395

391396
n.pendingDeletion[hostinfo.localIndexId] = struct{}{}
392397
n.trafficTimer.Add(hostinfo.localIndexId, n.pendingDeletionInterval)
393-
return doNothing, nil, nil
398+
return decision, hostinfo, nil
394399
}
395400

396401
func (n *connectionManager) shouldSwapPrimary(current, primary *HostInfo) bool {

connection_manager_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ var vpnIp iputil.VpnIp
2121

2222
func newTestLighthouse() *LightHouse {
2323
lh := &LightHouse{
24-
l: test.NewLogger(),
25-
addrMap: map[iputil.VpnIp]*RemoteList{},
24+
l: test.NewLogger(),
25+
addrMap: map[iputil.VpnIp]*RemoteList{},
26+
queryChan: make(chan iputil.VpnIp, 10),
2627
}
2728
lighthouses := map[iputil.VpnIp]struct{}{}
2829
staticList := map[iputil.VpnIp]struct{}{}

examples/config.yml

+4
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,10 @@ logging:
289289
# A 100ms interval with the default 10 retries will give a handshake 5.5 seconds to resolve before timing out
290290
#try_interval: 100ms
291291
#retries: 20
292+
293+
# query_buffer is the size of the buffer channel for querying lighthouses
294+
#query_buffer: 64
295+
292296
# trigger_buffer is the size of the buffer channel for quickly sending handshakes
293297
# after receiving the response for lighthouse queries
294298
#trigger_buffer: 64

handshake_manager.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func (hm *HandshakeManager) handleOutbound(vpnIp iputil.VpnIp, lighthouseTrigger
230230
// If we only have 1 remote it is highly likely our query raced with the other host registered within the lighthouse
231231
// Our vpnIp here has a tunnel with a lighthouse but has yet to send a host update packet there so we only know about
232232
// the learned public ip for them. Query again to short circuit the promotion counter
233-
hm.lightHouse.QueryServer(vpnIp, hm.f)
233+
hm.lightHouse.QueryServer(vpnIp)
234234
}
235235

236236
// Send the handshake to all known ips, stage 2 takes care of assigning the hostinfo.remote based on the first to reply
@@ -374,13 +374,13 @@ func (hm *HandshakeManager) GetOrHandshake(vpnIp iputil.VpnIp, cacheCb func(*Han
374374
// StartHandshake will ensure a handshake is currently being attempted for the provided vpn ip
375375
func (hm *HandshakeManager) StartHandshake(vpnIp iputil.VpnIp, cacheCb func(*HandshakeHostInfo)) *HostInfo {
376376
hm.Lock()
377+
defer hm.Unlock()
377378

378379
if hh, ok := hm.vpnIps[vpnIp]; ok {
379380
// We are already trying to handshake with this vpn ip
380381
if cacheCb != nil {
381382
cacheCb(hh)
382383
}
383-
hm.Unlock()
384384
return hh.hostinfo
385385
}
386386

@@ -421,8 +421,7 @@ func (hm *HandshakeManager) StartHandshake(vpnIp iputil.VpnIp, cacheCb func(*Han
421421
}
422422
}
423423

424-
hm.Unlock()
425-
hm.lightHouse.QueryServer(vpnIp, hm.f)
424+
hm.lightHouse.QueryServer(vpnIp)
426425
return hostinfo
427426
}
428427

hostmap.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ func (i *HostInfo) TryPromoteBest(preferredRanges []*net.IPNet, ifce *Interface)
561561
}
562562

563563
i.nextLHQuery.Store(now + ifce.reQueryWait.Load())
564-
ifce.lightHouse.QueryServer(i.vpnIp, ifce)
564+
ifce.lightHouse.QueryServer(i.vpnIp)
565565
}
566566
}
567567

inside.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ func (f *Interface) sendNoMetrics(t header.MessageType, st header.MessageSubType
288288
if t != header.CloseTunnel && hostinfo.lastRebindCount != f.rebindCount {
289289
//NOTE: there is an update hole if a tunnel isn't used and exactly 256 rebinds occur before the tunnel is
290290
// finally used again. This tunnel would eventually be torn down and recreated if this action didn't help.
291-
f.lightHouse.QueryServer(hostinfo.vpnIp, f)
291+
f.lightHouse.QueryServer(hostinfo.vpnIp)
292292
hostinfo.lastRebindCount = f.rebindCount
293293
if f.l.Level >= logrus.DebugLevel {
294294
f.l.WithField("vpnIp", hostinfo.vpnIp).Debug("Lighthouse update triggered for punch due to rebind counter")

lighthouse.go

+52-23
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ type LightHouse struct {
7474
// IP's of relays that can be used by peers to access me
7575
relaysForMe atomic.Pointer[[]iputil.VpnIp]
7676

77+
queryChan chan iputil.VpnIp
78+
7779
calculatedRemotes atomic.Pointer[cidr.Tree4[[]*calculatedRemote]] // Maps VpnIp to []*calculatedRemote
7880

7981
metrics *MessageMetrics
@@ -110,6 +112,7 @@ func NewLightHouseFromConfig(ctx context.Context, l *logrus.Logger, c *config.C,
110112
nebulaPort: nebulaPort,
111113
punchConn: pc,
112114
punchy: p,
115+
queryChan: make(chan iputil.VpnIp, c.GetUint32("handshakes.query_buffer", 64)),
113116
l: l,
114117
}
115118
lighthouses := make(map[iputil.VpnIp]struct{})
@@ -139,6 +142,8 @@ func NewLightHouseFromConfig(ctx context.Context, l *logrus.Logger, c *config.C,
139142
}
140143
})
141144

145+
h.startQueryWorker()
146+
142147
return &h, nil
143148
}
144149

@@ -443,9 +448,9 @@ func (lh *LightHouse) loadStaticMap(c *config.C, tunCidr *net.IPNet, staticList
443448
return nil
444449
}
445450

446-
func (lh *LightHouse) Query(ip iputil.VpnIp, f EncWriter) *RemoteList {
451+
func (lh *LightHouse) Query(ip iputil.VpnIp) *RemoteList {
447452
if !lh.IsLighthouseIP(ip) {
448-
lh.QueryServer(ip, f)
453+
lh.QueryServer(ip)
449454
}
450455
lh.RLock()
451456
if v, ok := lh.addrMap[ip]; ok {
@@ -456,30 +461,14 @@ func (lh *LightHouse) Query(ip iputil.VpnIp, f EncWriter) *RemoteList {
456461
return nil
457462
}
458463

459-
// This is asynchronous so no reply should be expected
460-
func (lh *LightHouse) QueryServer(ip iputil.VpnIp, f EncWriter) {
461-
if lh.amLighthouse {
462-
return
463-
}
464-
465-
if lh.IsLighthouseIP(ip) {
466-
return
467-
}
468-
469-
// Send a query to the lighthouses and hope for the best next time
470-
query, err := NewLhQueryByInt(ip).Marshal()
471-
if err != nil {
472-
lh.l.WithError(err).WithField("vpnIp", ip).Error("Failed to marshal lighthouse query payload")
464+
// QueryServer is asynchronous so no reply should be expected
465+
func (lh *LightHouse) QueryServer(ip iputil.VpnIp) {
466+
// Don't put lighthouse ips in the query channel because we can't query lighthouses about lighthouses
467+
if lh.amLighthouse || lh.IsLighthouseIP(ip) {
473468
return
474469
}
475470

476-
lighthouses := lh.GetLighthouses()
477-
lh.metricTx(NebulaMeta_HostQuery, int64(len(lighthouses)))
478-
nb := make([]byte, 12, 12)
479-
out := make([]byte, mtu)
480-
for n := range lighthouses {
481-
f.SendMessageToVpnIp(header.LightHouse, 0, n, query, nb, out)
482-
}
471+
lh.queryChan <- ip
483472
}
484473

485474
func (lh *LightHouse) QueryCache(ip iputil.VpnIp) *RemoteList {
@@ -752,6 +741,46 @@ func NewUDPAddrFromLH6(ipp *Ip6AndPort) *udp.Addr {
752741
return udp.NewAddr(lhIp6ToIp(ipp), uint16(ipp.Port))
753742
}
754743

744+
func (lh *LightHouse) startQueryWorker() {
745+
if lh.amLighthouse {
746+
return
747+
}
748+
749+
go func() {
750+
nb := make([]byte, 12, 12)
751+
out := make([]byte, mtu)
752+
753+
for {
754+
select {
755+
case <-lh.ctx.Done():
756+
return
757+
case ip := <-lh.queryChan:
758+
lh.innerQueryServer(ip, nb, out)
759+
}
760+
}
761+
}()
762+
}
763+
764+
func (lh *LightHouse) innerQueryServer(ip iputil.VpnIp, nb, out []byte) {
765+
if lh.IsLighthouseIP(ip) {
766+
return
767+
}
768+
769+
// Send a query to the lighthouses and hope for the best next time
770+
query, err := NewLhQueryByInt(ip).Marshal()
771+
if err != nil {
772+
lh.l.WithError(err).WithField("vpnIp", ip).Error("Failed to marshal lighthouse query payload")
773+
return
774+
}
775+
776+
lighthouses := lh.GetLighthouses()
777+
lh.metricTx(NebulaMeta_HostQuery, int64(len(lighthouses)))
778+
779+
for n := range lighthouses {
780+
lh.ifce.SendMessageToVpnIp(header.LightHouse, 0, n, query, nb, out)
781+
}
782+
}
783+
755784
func (lh *LightHouse) StartUpdateWorker() {
756785
interval := lh.GetUpdateInterval()
757786
if lh.amLighthouse || interval == 0 {

ssh.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ func sshQueryLighthouse(ifce *Interface, fs interface{}, a []string, w sshd.Stri
518518
}
519519

520520
var cm *CacheMap
521-
rl := ifce.lightHouse.Query(vpnIp, ifce)
521+
rl := ifce.lightHouse.Query(vpnIp)
522522
if rl != nil {
523523
cm = rl.CopyCache()
524524
}

0 commit comments

Comments
 (0)