Skip to content

Commit d9febbb

Browse files
authored
Merge pull request #7186 from yyforyongyu/fix-missing-channel-updates
Fix potential channel announcements missing
2 parents 72d97e9 + 1206174 commit d9febbb

File tree

8 files changed

+270
-184
lines changed

8 files changed

+270
-184
lines changed

discovery/gossiper.go

+119-92
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ func (d *AuthenticatedGossiper) Stop() error {
658658

659659
func (d *AuthenticatedGossiper) stop() {
660660
log.Info("Authenticated Gossiper is stopping")
661+
defer log.Info("Authenticated Gossiper stopped")
661662

662663
d.blockEpochs.Cancel()
663664

@@ -1187,12 +1188,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
11871188
announcement.msg.MsgType(),
11881189
announcement.isRemote)
11891190

1190-
// We should only broadcast this message forward if it
1191-
// originated from us or it wasn't received as part of
1192-
// our initial historical sync.
1193-
shouldBroadcast := !announcement.isRemote ||
1194-
d.syncMgr.IsGraphSynced()
1195-
11961191
switch announcement.msg.(type) {
11971192
// Channel announcement signatures are amongst the only
11981193
// messages that we'll process serially.
@@ -1231,70 +1226,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
12311226
validationBarrier.InitJobDependencies(announcement.msg)
12321227

12331228
d.wg.Add(1)
1234-
go func() {
1235-
defer d.wg.Done()
1236-
defer validationBarrier.CompleteJob()
1237-
1238-
// If this message has an existing dependency,
1239-
// then we'll wait until that has been fully
1240-
// validated before we proceed.
1241-
err := validationBarrier.WaitForDependants(
1242-
announcement.msg,
1243-
)
1244-
if err != nil {
1245-
if !routing.IsError(
1246-
err,
1247-
routing.ErrVBarrierShuttingDown,
1248-
routing.ErrParentValidationFailed,
1249-
) {
1250-
1251-
log.Warnf("unexpected error "+
1252-
"during validation "+
1253-
"barrier shutdown: %v",
1254-
err)
1255-
}
1256-
announcement.err <- err
1257-
return
1258-
}
1259-
1260-
// Process the network announcement to
1261-
// determine if this is either a new
1262-
// announcement from our PoV or an edges to a
1263-
// prior vertex/edge we previously proceeded.
1264-
emittedAnnouncements, allowDependents := d.processNetworkAnnouncement(
1265-
announcement,
1266-
)
1267-
1268-
log.Tracef("Processed network message %s, "+
1269-
"returned len(announcements)=%v, "+
1270-
"allowDependents=%v",
1271-
announcement.msg.MsgType(),
1272-
len(emittedAnnouncements),
1273-
allowDependents)
1274-
1275-
// If this message had any dependencies, then
1276-
// we can now signal them to continue.
1277-
validationBarrier.SignalDependants(
1278-
announcement.msg, allowDependents,
1279-
)
1280-
1281-
// If the announcement was accepted, then add
1282-
// the emitted announcements to our announce
1283-
// batch to be broadcast once the trickle timer
1284-
// ticks gain.
1285-
if emittedAnnouncements != nil && shouldBroadcast {
1286-
// TODO(roasbeef): exclude peer that
1287-
// sent.
1288-
announcements.AddMsgs(
1289-
emittedAnnouncements...,
1290-
)
1291-
} else if emittedAnnouncements != nil {
1292-
log.Trace("Skipping broadcast of " +
1293-
"announcements received " +
1294-
"during initial graph sync")
1295-
}
1296-
1297-
}()
1229+
go d.handleNetworkMessages(
1230+
announcement, &announcements, validationBarrier,
1231+
)
12981232

12991233
// The trickle timer has ticked, which indicates we should
13001234
// flush to the network the pending batch of new announcements
@@ -1359,6 +1293,67 @@ func (d *AuthenticatedGossiper) networkHandler() {
13591293
}
13601294
}
13611295

1296+
// handleNetworkMessages is responsible for waiting for dependencies for a
1297+
// given network message and processing the message. Once processed, it will
1298+
// signal its dependants and add the new announcements to the announce batch.
1299+
//
1300+
// NOTE: must be run as a goroutine.
1301+
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1302+
deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier) {
1303+
1304+
defer d.wg.Done()
1305+
defer vb.CompleteJob()
1306+
1307+
// We should only broadcast this message forward if it originated from
1308+
// us or it wasn't received as part of our initial historical sync.
1309+
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
1310+
1311+
// If this message has an existing dependency, then we'll wait until
1312+
// that has been fully validated before we proceed.
1313+
err := vb.WaitForDependants(nMsg.msg)
1314+
if err != nil {
1315+
log.Debugf("Validating network message %s got err: %v",
1316+
nMsg.msg.MsgType(), err)
1317+
1318+
if !routing.IsError(
1319+
err,
1320+
routing.ErrVBarrierShuttingDown,
1321+
routing.ErrParentValidationFailed,
1322+
) {
1323+
1324+
log.Warnf("unexpected error during validation "+
1325+
"barrier shutdown: %v", err)
1326+
}
1327+
nMsg.err <- err
1328+
1329+
return
1330+
}
1331+
1332+
// Process the network announcement to determine if this is either a
1333+
// new announcement from our PoV or an edges to a prior vertex/edge we
1334+
// previously proceeded.
1335+
newAnns, allow := d.processNetworkAnnouncement(nMsg)
1336+
1337+
log.Tracef("Processed network message %s, returned "+
1338+
"len(announcements)=%v, allowDependents=%v",
1339+
nMsg.msg.MsgType(), len(newAnns), allow)
1340+
1341+
// If this message had any dependencies, then we can now signal them to
1342+
// continue.
1343+
vb.SignalDependants(nMsg.msg, allow)
1344+
1345+
// If the announcement was accepted, then add the emitted announcements
1346+
// to our announce batch to be broadcast once the trickle timer ticks
1347+
// gain.
1348+
if newAnns != nil && shouldBroadcast {
1349+
// TODO(roasbeef): exclude peer that sent.
1350+
deDuped.AddMsgs(newAnns...)
1351+
} else if newAnns != nil {
1352+
log.Trace("Skipping broadcast of announcements received " +
1353+
"during initial graph sync")
1354+
}
1355+
}
1356+
13621357
// TODO(roasbeef): d/c peers that send updates not on our chain
13631358

13641359
// InitSyncState is called by outside sub-systems when a connection is
@@ -1824,10 +1819,6 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
18241819
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
18251820
nMsg *networkMsg) ([]networkMsg, bool) {
18261821

1827-
log.Debugf("Processing network message: peer=%v, source=%x, msg=%s, "+
1828-
"is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(),
1829-
nMsg.msg.MsgType(), nMsg.isRemote)
1830-
18311822
// If this is a remote update, we set the scheduler option to lazily
18321823
// add it to the graph.
18331824
var schedulerOp []batch.SchedulerOption
@@ -1947,7 +1938,7 @@ func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
19471938
}
19481939
if err != nil {
19491940
log.Debugf("Unable to retrieve channel=%v from graph: "+
1950-
"%v", err)
1941+
"%v", chanInfo.ChannelID, err)
19511942
return false
19521943
}
19531944

@@ -2145,6 +2136,9 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
21452136

21462137
timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
21472138

2139+
log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
2140+
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
2141+
21482142
// We'll quickly ask the router if it already has a newer update for
21492143
// this node so we can skip validating signatures if not required.
21502144
if d.cfg.Router.IsStaleNode(nodeAnn.NodeID, timestamp) {
@@ -2199,6 +2193,10 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
21992193

22002194
nMsg.err <- nil
22012195
// TODO(roasbeef): get rid of the above
2196+
2197+
log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
2198+
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
2199+
22022200
return announcements, true
22032201
}
22042202

@@ -2207,6 +2205,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
22072205
ann *lnwire.ChannelAnnouncement,
22082206
ops []batch.SchedulerOption) ([]networkMsg, bool) {
22092207

2208+
log.Debugf("Processing ChannelAnnouncement: peer=%v, short_chan_id=%v",
2209+
nMsg.peer, ann.ShortChannelID.ToUint64())
2210+
22102211
// We'll ignore any channel announcements that target any chain other
22112212
// than the set of chains we know of.
22122213
if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
@@ -2327,6 +2328,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
23272328
}
23282329
}
23292330

2331+
log.Debugf("Adding edge for short_chan_id: %v",
2332+
ann.ShortChannelID.ToUint64())
2333+
23302334
// We will add the edge to the channel router. If the nodes present in
23312335
// this channel are not present in the database, a partial node will be
23322336
// added to represent each node while we wait for a node announcement.
@@ -2338,6 +2342,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
23382342
d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
23392343
err := d.cfg.Router.AddEdge(edge, ops...)
23402344
if err != nil {
2345+
log.Debugf("Router rejected edge for short_chan_id(%v): %v",
2346+
ann.ShortChannelID.ToUint64(), err)
2347+
23412348
defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
23422349

23432350
// If the edge was rejected due to already being known, then it
@@ -2359,19 +2366,20 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
23592366
return nil, false
23602367
}
23612368

2369+
log.Debugf("Extracted %v announcements from rejected "+
2370+
"msgs", len(anns))
2371+
23622372
// If while processing this rejected edge, we realized
23632373
// there's a set of announcements we could extract,
23642374
// then we'll return those directly.
2365-
if len(anns) != 0 {
2366-
nMsg.err <- nil
2367-
return anns, true
2368-
}
2375+
//
2376+
// NOTE: since this is an ErrIgnored, we can return
2377+
// true here to signal "allow" to its dependants.
2378+
nMsg.err <- nil
23692379

2370-
// Otherwise, this is just a regular rejected edge.
2371-
log.Debugf("Router rejected channel edge: %v", err)
2380+
return anns, true
23722381
} else {
2373-
log.Debugf("Router rejected channel edge: %v", err)
2374-
2382+
// Otherwise, this is just a regular rejected edge.
23752383
key := newRejectCacheKey(
23762384
ann.ShortChannelID.ToUint64(),
23772385
sourceToPub(nMsg.source),
@@ -2386,6 +2394,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
23862394
// If err is nil, release the lock immediately.
23872395
d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
23882396

2397+
log.Debugf("Finish adding edge for short_chan_id: %v",
2398+
ann.ShortChannelID.ToUint64())
2399+
23892400
// If we earlier received any ChannelUpdates for this channel, we can
23902401
// now process them, as the channel is added to the graph.
23912402
shortChanID := ann.ShortChannelID.ToUint64()
@@ -2456,6 +2467,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
24562467
}
24572468

24582469
nMsg.err <- nil
2470+
2471+
log.Debugf("Processed ChannelAnnouncement: peer=%v, short_chan_id=%v",
2472+
nMsg.peer, ann.ShortChannelID.ToUint64())
2473+
24592474
return announcements, true
24602475
}
24612476

@@ -2464,6 +2479,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
24642479
upd *lnwire.ChannelUpdate,
24652480
ops []batch.SchedulerOption) ([]networkMsg, bool) {
24662481

2482+
log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
2483+
nMsg.peer, upd.ShortChannelID.ToUint64())
2484+
24672485
// We'll ignore any channel updates that target any chain other than
24682486
// the set of chains we know of.
24692487
if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
@@ -2523,10 +2541,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
25232541
graphScid, timestamp, upd.ChannelFlags,
25242542
) {
25252543

2526-
log.Debugf("Ignored stale edge policy: peer=%v, source=%x, "+
2527-
"msg=%s, is_remote=%v", nMsg.peer,
2528-
nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(),
2529-
nMsg.isRemote,
2544+
log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
2545+
"peer=%v, source=%x, msg=%s, is_remote=%v", shortChanID,
2546+
nMsg.peer, nMsg.source.SerializeCompressed(),
2547+
nMsg.msg.MsgType(), nMsg.isRemote,
25302548
)
25312549

25322550
nMsg.err <- nil
@@ -2649,6 +2667,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
26492667
edgeToUpdate = e2
26502668
}
26512669

2670+
log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
2671+
"edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
2672+
edgeToUpdate != nil)
2673+
26522674
// Validate the channel announcement with the expected public key and
26532675
// channel capacity. In the case of an invalid channel update, we'll
26542676
// return an error to the caller and exit early.
@@ -2743,7 +2765,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
27432765
routing.ErrVBarrierShuttingDown,
27442766
) {
27452767

2746-
log.Debug(err)
2768+
log.Debugf("Update edge for short_chan_id(%v) got: %v",
2769+
shortChanID, err)
27472770
} else {
27482771
// Since we know the stored SCID in the graph, we'll
27492772
// cache that SCID.
@@ -2753,7 +2776,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
27532776
)
27542777
_, _ = d.recentRejects.Put(key, &cachedReject{})
27552778

2756-
log.Error(err)
2779+
log.Errorf("Update edge for short_chan_id(%v) got: %v",
2780+
shortChanID, err)
27572781
}
27582782

27592783
nMsg.err <- err
@@ -2801,8 +2825,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
28012825
)
28022826

28032827
log.Debugf("The message %v has no AuthProof, sending the "+
2804-
"update to remote peer %x", upd.MsgType(),
2805-
remotePubKey)
2828+
"update to remote peer %x", upd.MsgType(), remotePubKey)
28062829

28072830
// Now we'll attempt to send the channel update message
28082831
// reliably to the remote peer in the background, so that we
@@ -2832,6 +2855,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
28322855
}
28332856

28342857
nMsg.err <- nil
2858+
2859+
log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
2860+
"timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
2861+
timestamp)
28352862
return announcements, true
28362863
}
28372864

@@ -2848,7 +2875,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
28482875
prefix = "remote"
28492876
}
28502877

2851-
log.Infof("Received new %v channel announcement for %v", prefix,
2878+
log.Infof("Received new %v announcement signature for %v", prefix,
28522879
ann.ShortChannelID)
28532880

28542881
// By the specification, channel announcement proofs should be sent

discovery/reliable_sender.go

+3
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ func (s *reliableSender) Start() error {
8484
// Stop halts the reliable sender from sending messages to peers.
8585
func (s *reliableSender) Stop() {
8686
s.stop.Do(func() {
87+
log.Debugf("reliableSender is stopping")
88+
defer log.Debugf("reliableSender stopped")
89+
8790
close(s.quit)
8891
s.wg.Wait()
8992
})

discovery/sync_manager.go

+3
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ func (m *SyncManager) Start() {
183183
// Stop stops the SyncManager from performing its duties.
184184
func (m *SyncManager) Stop() {
185185
m.stop.Do(func() {
186+
log.Debugf("SyncManager is stopping")
187+
defer log.Debugf("SyncManager stopped")
188+
186189
close(m.quit)
187190
m.wg.Wait()
188191

discovery/syncer.go

+3
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ func (g *GossipSyncer) Start() {
450450
// exited.
451451
func (g *GossipSyncer) Stop() {
452452
g.stopped.Do(func() {
453+
log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
454+
defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
455+
453456
close(g.quit)
454457
g.wg.Wait()
455458
})

0 commit comments

Comments
 (0)