Skip to content

Commit 30f2b1c

Browse files
authored
Merge pull request #2740 from wpaulino/gossip-sync-manager
discovery: introduce gossiper syncManager subsystem
2 parents b96da69 + ca01695 commit 30f2b1c

14 files changed

+2835
-1001
lines changed

config.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ type config struct {
248248
Color string `long:"color" description:"The color of the node in hex format (i.e. '#3399FF'). Used to customize node appearance in intelligence services"`
249249
MinChanSize int64 `long:"minchansize" description:"The smallest channel size (in satoshis) that we should accept. Incoming channels smaller than this will be rejected"`
250250

251-
NoChanUpdates bool `long:"nochanupdates" description:"If specified, lnd will not request real-time channel updates from connected peers. This option should be used by routing nodes to save bandwidth."`
251+
NumGraphSyncPeers int `long:"numgraphsyncpeers" description:"The number of peers that we should receive new graph updates from. This option can be tuned to save bandwidth for light clients or routing nodes."`
252252

253253
RejectPush bool `long:"rejectpush" description:"If true, lnd will not accept channel opening requests with non-zero push amounts. This should prevent accidental pushes to merchant nodes."`
254254

@@ -335,6 +335,7 @@ func loadConfig() (*config, error) {
335335
Alias: defaultAlias,
336336
Color: defaultColor,
337337
MinChanSize: int64(minChanFundingSize),
338+
NumGraphSyncPeers: defaultMinPeers,
338339
Tor: &torConfig{
339340
SOCKS: defaultTorSOCKS,
340341
DNS: defaultTorDNS,

discovery/gossiper.go

+72-113
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/lightningnetwork/lnd/lnwire"
2121
"github.com/lightningnetwork/lnd/multimutex"
2222
"github.com/lightningnetwork/lnd/routing"
23+
"github.com/lightningnetwork/lnd/ticker"
2324
)
2425

2526
var (
@@ -75,7 +76,7 @@ type Config struct {
7576
Router routing.ChannelGraphSource
7677

7778
// ChanSeries is an interfaces that provides access to a time series
78-
// view of the current known channel graph. Each gossipSyncer enabled
79+
// view of the current known channel graph. Each GossipSyncer enabled
7980
// peer will utilize this in order to create and respond to channel
8081
// graph time series queries.
8182
ChanSeries ChannelGraphTimeSeries
@@ -143,6 +144,28 @@ type Config struct {
143144
// TODO(roasbeef): extract ann crafting + sign from fundingMgr into
144145
// here?
145146
AnnSigner lnwallet.MessageSigner
147+
148+
// NumActiveSyncers is the number of peers for which we should have
149+
// active syncers with. After reaching NumActiveSyncers, any future
150+
// gossip syncers will be passive.
151+
NumActiveSyncers int
152+
153+
// RotateTicker is a ticker responsible for notifying the SyncManager
154+
// when it should rotate its active syncers. A single active syncer with
155+
// a chansSynced state will be exchanged for a passive syncer in order
156+
// to ensure we don't keep syncing with the same peers.
157+
RotateTicker ticker.Ticker
158+
159+
// HistoricalSyncTicker is a ticker responsible for notifying the
160+
// syncManager when it should attempt a historical sync with a gossip
161+
// sync peer.
162+
HistoricalSyncTicker ticker.Ticker
163+
164+
// ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
165+
// syncManager when it should attempt to start the next pending
166+
// activeSyncer due to the current one not completing its state machine
167+
// within the timeout.
168+
ActiveSyncerTimeoutTicker ticker.Ticker
146169
}
147170

148171
// AuthenticatedGossiper is a subsystem which is responsible for receiving
@@ -212,13 +235,14 @@ type AuthenticatedGossiper struct {
212235
rejectMtx sync.RWMutex
213236
recentRejects map[uint64]struct{}
214237

215-
// peerSyncers keeps track of all the gossip syncers we're maintain for
216-
// peers that understand this mode of operation. When we go to send out
217-
// new updates, for all peers in the map, we'll send the messages
218-
// directly to their gossiper, rather than broadcasting them. With this
219-
// change, we ensure we filter out all updates properly.
220-
syncerMtx sync.RWMutex
221-
peerSyncers map[routing.Vertex]*gossipSyncer
238+
// syncMgr is a subsystem responsible for managing the gossip syncers
239+
// for peers currently connected. When a new peer is connected, the
240+
// manager will create its accompanying gossip syncer and determine
241+
// whether it should have an activeSync or passiveSync sync type based
242+
// on how many other gossip syncers are currently active. Any activeSync
243+
// gossip syncers are started in a round-robin manner to ensure we're
244+
// not syncing with multiple peers at the same time.
245+
syncMgr *SyncManager
222246

223247
// reliableSender is a subsystem responsible for handling reliable
224248
// message send requests to peers. This should only be used for channels
@@ -243,7 +267,14 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
243267
prematureChannelUpdates: make(map[uint64][]*networkMsg),
244268
channelMtx: multimutex.NewMutex(),
245269
recentRejects: make(map[uint64]struct{}),
246-
peerSyncers: make(map[routing.Vertex]*gossipSyncer),
270+
syncMgr: newSyncManager(&SyncManagerCfg{
271+
ChainHash: cfg.ChainHash,
272+
ChanSeries: cfg.ChanSeries,
273+
RotateTicker: cfg.RotateTicker,
274+
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
275+
ActiveSyncerTimeoutTicker: cfg.ActiveSyncerTimeoutTicker,
276+
NumActiveSyncers: cfg.NumActiveSyncers,
277+
}),
247278
}
248279

249280
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
@@ -419,6 +450,8 @@ func (d *AuthenticatedGossiper) Start() error {
419450
return err
420451
}
421452

453+
d.syncMgr.Start()
454+
422455
d.wg.Add(1)
423456
go d.networkHandler()
424457

@@ -435,11 +468,7 @@ func (d *AuthenticatedGossiper) Stop() {
435468

436469
d.blockEpochs.Cancel()
437470

438-
d.syncerMtx.RLock()
439-
for _, syncer := range d.peerSyncers {
440-
syncer.Stop()
441-
}
442-
d.syncerMtx.RUnlock()
471+
d.syncMgr.Stop()
443472

444473
close(d.quit)
445474
d.wg.Wait()
@@ -463,20 +492,20 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
463492
errChan := make(chan error, 1)
464493

465494
// For messages in the known set of channel series queries, we'll
466-
// dispatch the message directly to the gossipSyncer, and skip the main
495+
// dispatch the message directly to the GossipSyncer, and skip the main
467496
// processing loop.
468497
switch m := msg.(type) {
469498
case *lnwire.QueryShortChanIDs,
470499
*lnwire.QueryChannelRange,
471500
*lnwire.ReplyChannelRange,
472501
*lnwire.ReplyShortChanIDsEnd:
473502

474-
syncer, err := d.findGossipSyncer(peer.IdentityKey())
475-
if err != nil {
476-
log.Warnf("Unable to find gossip syncer for "+
477-
"peer=%x: %v", peer.PubKey(), err)
503+
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
504+
if !ok {
505+
log.Warnf("Gossip syncer for peer=%x not found",
506+
peer.PubKey())
478507

479-
errChan <- err
508+
errChan <- ErrGossipSyncerNotFound
480509
return errChan
481510
}
482511

@@ -488,24 +517,22 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
488517
return errChan
489518

490519
// If a peer is updating its current update horizon, then we'll dispatch
491-
// that directly to the proper gossipSyncer.
520+
// that directly to the proper GossipSyncer.
492521
case *lnwire.GossipTimestampRange:
493-
syncer, err := d.findGossipSyncer(peer.IdentityKey())
494-
if err != nil {
495-
log.Warnf("Unable to find gossip syncer for "+
496-
"peer=%x: %v", peer.PubKey(), err)
522+
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
523+
if !ok {
524+
log.Warnf("Gossip syncer for peer=%x not found",
525+
peer.PubKey())
497526

498-
errChan <- err
527+
errChan <- ErrGossipSyncerNotFound
499528
return errChan
500529
}
501530

502531
// If we've found the message target, then we'll dispatch the
503532
// message directly to it.
504-
err = syncer.ApplyGossipFilter(m)
505-
if err != nil {
506-
log.Warnf("unable to apply gossip "+
507-
"filter for peer=%x: %v",
508-
peer.PubKey(), err)
533+
if err := syncer.ApplyGossipFilter(m); err != nil {
534+
log.Warnf("Unable to apply gossip filter for peer=%x: "+
535+
"%v", peer.PubKey(), err)
509536

510537
errChan <- err
511538
return errChan
@@ -590,10 +617,10 @@ type msgWithSenders struct {
590617
}
591618

592619
// mergeSyncerMap is used to merge the set of senders of a particular message
593-
// with peers that we have an active gossipSyncer with. We do this to ensure
620+
// with peers that we have an active GossipSyncer with. We do this to ensure
594621
// that we don't broadcast messages to any peers that we have active gossip
595622
// syncers for.
596-
func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]*gossipSyncer) {
623+
func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]*GossipSyncer) {
597624
for peerPub := range syncers {
598625
m.senders[peerPub] = struct{}{}
599626
}
@@ -812,28 +839,6 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
812839
return msgs
813840
}
814841

815-
// findGossipSyncer is a utility method used by the gossiper to locate the
816-
// gossip syncer for an inbound message so we can properly dispatch the
817-
// incoming message. If a gossip syncer isn't found, then one will be created
818-
// for the target peer.
819-
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (
820-
*gossipSyncer, error) {
821-
822-
target := routing.NewVertex(pub)
823-
824-
// First, we'll try to find an existing gossiper for this peer.
825-
d.syncerMtx.RLock()
826-
syncer, ok := d.peerSyncers[target]
827-
d.syncerMtx.RUnlock()
828-
829-
// If one exists, then we'll return it directly.
830-
if ok {
831-
return syncer, nil
832-
}
833-
834-
return nil, ErrGossipSyncerNotFound
835-
}
836-
837842
// networkHandler is the primary goroutine that drives this service. The roles
838843
// of this goroutine includes answering queries related to the state of the
839844
// network, syncing up newly connected peers, and also periodically
@@ -1028,12 +1033,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
10281033
// For the set of peers that have an active gossip
10291034
// syncers, we'll collect their pubkeys so we can avoid
10301035
// sending them the full message blast below.
1031-
d.syncerMtx.RLock()
1032-
syncerPeers := make(map[routing.Vertex]*gossipSyncer)
1033-
for peerPub, syncer := range d.peerSyncers {
1034-
syncerPeers[peerPub] = syncer
1035-
}
1036-
d.syncerMtx.RUnlock()
1036+
syncerPeers := d.syncMgr.GossipSyncers()
10371037

10381038
log.Infof("Broadcasting batch of %v new announcements",
10391039
len(announcementBatch))
@@ -1088,62 +1088,16 @@ func (d *AuthenticatedGossiper) networkHandler() {
10881088
// InitSyncState is called by outside sub-systems when a connection is
10891089
// established to a new peer that understands how to perform channel range
10901090
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1091-
// needed to handle new queries. The recvUpdates bool indicates if we should
1092-
// continue to receive real-time updates from the remote peer once we've synced
1093-
// channel state.
1094-
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
1095-
recvUpdates bool) {
1096-
1097-
d.syncerMtx.Lock()
1098-
defer d.syncerMtx.Unlock()
1099-
1100-
// If we already have a syncer, then we'll exit early as we don't want
1101-
// to override it.
1102-
nodeID := routing.Vertex(syncPeer.PubKey())
1103-
if _, ok := d.peerSyncers[nodeID]; ok {
1104-
return
1105-
}
1106-
1107-
log.Infof("Creating new gossipSyncer for peer=%x", nodeID[:])
1108-
1109-
encoding := lnwire.EncodingSortedPlain
1110-
syncer := newGossiperSyncer(gossipSyncerCfg{
1111-
chainHash: d.cfg.ChainHash,
1112-
syncChanUpdates: recvUpdates,
1113-
channelSeries: d.cfg.ChanSeries,
1114-
encodingType: encoding,
1115-
chunkSize: encodingTypeToChunkSize[encoding],
1116-
sendToPeer: func(msgs ...lnwire.Message) error {
1117-
return syncPeer.SendMessageLazy(false, msgs...)
1118-
},
1119-
})
1120-
copy(syncer.peerPub[:], nodeID[:])
1121-
d.peerSyncers[nodeID] = syncer
1122-
1123-
syncer.Start()
1091+
// needed to handle new queries.
1092+
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
1093+
d.syncMgr.InitSyncState(syncPeer)
11241094
}
11251095

11261096
// PruneSyncState is called by outside sub-systems once a peer that we were
11271097
// previously connected to has been disconnected. In this case we can stop the
1128-
// existing gossipSyncer assigned to the peer and free up resources.
1129-
func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) {
1130-
d.syncerMtx.Lock()
1131-
defer d.syncerMtx.Unlock()
1132-
1133-
log.Infof("Removing gossipSyncer for peer=%x",
1134-
peer.SerializeCompressed())
1135-
1136-
vertex := routing.NewVertex(peer)
1137-
syncer, ok := d.peerSyncers[vertex]
1138-
if !ok {
1139-
return
1140-
}
1141-
1142-
syncer.Stop()
1143-
1144-
delete(d.peerSyncers, vertex)
1145-
1146-
return
1098+
// existing GossipSyncer assigned to the peer and free up resources.
1099+
func (d *AuthenticatedGossiper) PruneSyncState(peer routing.Vertex) {
1100+
d.syncMgr.PruneSyncState(peer)
11471101
}
11481102

11491103
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
@@ -2514,3 +2468,8 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
25142468

25152469
return chanAnn, chanUpdate, err
25162470
}
2471+
2472+
// SyncManager returns the gossiper's SyncManager instance.
2473+
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
2474+
return d.syncMgr
2475+
}

discovery/gossiper_test.go

+25-16
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/lightningnetwork/lnd/lntest"
2828
"github.com/lightningnetwork/lnd/lnwire"
2929
"github.com/lightningnetwork/lnd/routing"
30+
"github.com/lightningnetwork/lnd/ticker"
3031
)
3132

3233
var (
@@ -713,12 +714,16 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
713714
c := make(chan struct{})
714715
return c
715716
},
716-
Router: router,
717-
TrickleDelay: trickleDelay,
718-
RetransmitDelay: retransmitDelay,
719-
ProofMatureDelta: proofMatureDelta,
720-
WaitingProofStore: waitingProofStore,
721-
MessageStore: newMockMessageStore(),
717+
Router: router,
718+
TrickleDelay: trickleDelay,
719+
RetransmitDelay: retransmitDelay,
720+
ProofMatureDelta: proofMatureDelta,
721+
WaitingProofStore: waitingProofStore,
722+
MessageStore: newMockMessageStore(),
723+
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
724+
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
725+
ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout),
726+
NumActiveSyncers: 3,
722727
}, nodeKeyPub1)
723728

724729
if err := gossiper.Start(); err != nil {
@@ -1447,16 +1452,20 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
14471452
// the message to the peer.
14481453
ctx.gossiper.Stop()
14491454
gossiper := New(Config{
1450-
Notifier: ctx.gossiper.cfg.Notifier,
1451-
Broadcast: ctx.gossiper.cfg.Broadcast,
1452-
NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
1453-
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
1454-
Router: ctx.gossiper.cfg.Router,
1455-
TrickleDelay: trickleDelay,
1456-
RetransmitDelay: retransmitDelay,
1457-
ProofMatureDelta: proofMatureDelta,
1458-
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
1459-
MessageStore: ctx.gossiper.cfg.MessageStore,
1455+
Notifier: ctx.gossiper.cfg.Notifier,
1456+
Broadcast: ctx.gossiper.cfg.Broadcast,
1457+
NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
1458+
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
1459+
Router: ctx.gossiper.cfg.Router,
1460+
TrickleDelay: trickleDelay,
1461+
RetransmitDelay: retransmitDelay,
1462+
ProofMatureDelta: proofMatureDelta,
1463+
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
1464+
MessageStore: ctx.gossiper.cfg.MessageStore,
1465+
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
1466+
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
1467+
ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout),
1468+
NumActiveSyncers: 3,
14601469
}, ctx.gossiper.selfKey)
14611470
if err != nil {
14621471
t.Fatalf("unable to recreate gossiper: %v", err)

0 commit comments

Comments
 (0)