Skip to content

Commit 26f68da

Browse files
authored
Merge pull request #1785 from Roasbeef/ensure-peer-quit
peer+server: ensure the peer is always able to quit even mid msgStream application
2 parents 5cf911a + f2db187 commit 26f68da

11 files changed

+104
-26
lines changed

Gopkg.lock

+3-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484

8585
[[constraint]]
8686
name = "golang.org/x/crypto"
87-
revision = "49796115aa4b964c318aad4f3084fdb41e9aa067"
87+
revision = "614d502a4dac94afa3a6ce146bd1736da82514c6"
8888

8989
[[constraint]]
9090
name = "golang.org/x/net"

discovery/gossiper.go

+5
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,11 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
481481

482482
select {
483483
case d.networkMsgs <- nMsg:
484+
485+
// If the peer that sent us this error is quitting, then we don't need
486+
// to send back an error and can return immediately.
487+
case <-peer.QuitSignal():
488+
return nil
484489
case <-d.quit:
485490
nMsg.err <- ErrGossiperShuttingDown
486491
}

discovery/gossiper_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -2181,3 +2181,6 @@ func (p *mockPeer) PubKey() [33]byte {
21812181
return pubkey
21822182
}
21832183
func (p *mockPeer) Address() net.Addr { return nil }
2184+
func (p *mockPeer) QuitSignal() <-chan struct{} {
2185+
return p.quit
2186+
}

fundingmanager.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -2771,7 +2771,9 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
27712771
// waitUntilChannelOpen is designed to prevent other lnd subsystems from
27722772
// sending new update messages to a channel before the channel is fully
27732773
// opened.
2774-
func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) {
2774+
func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID,
2775+
quit <-chan struct{}) error {
2776+
27752777
f.barrierMtx.RLock()
27762778
barrier, ok := f.newChanBarriers[targetChan]
27772779
f.barrierMtx.RUnlock()
@@ -2781,12 +2783,17 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) {
27812783

27822784
select {
27832785
case <-barrier:
2784-
case <-f.quit: // TODO(roasbeef): add timer?
2785-
break
2786+
case <-quit:
2787+
return ErrFundingManagerShuttingDown
2788+
case <-f.quit:
2789+
return ErrFundingManagerShuttingDown
27862790
}
27872791

27882792
fndgLog.Tracef("barrier for ChanID(%v) closed", targetChan)
2793+
return nil
27892794
}
2795+
2796+
return nil
27902797
}
27912798

27922799
// processFundingError sends a message to the fundingManager allowing it to

fundingmanager_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ func (n *testNode) WipeChannel(_ *wire.OutPoint) error {
174174
return nil
175175
}
176176

177+
func (n *testNode) QuitSignal() <-chan struct{} {
178+
return n.shutdownChannel
179+
}
180+
177181
func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel,
178182
quit <-chan struct{}) error {
179183

htlcswitch/link_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -1459,6 +1459,10 @@ type mockPeer struct {
14591459
quit chan struct{}
14601460
}
14611461

1462+
func (m *mockPeer) QuitSignal() <-chan struct{} {
1463+
return m.quit
1464+
}
1465+
14621466
var _ lnpeer.Peer = (*mockPeer)(nil)
14631467

14641468
func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error {

htlcswitch/mock.go

+4
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,10 @@ func (s *mockServer) Start() error {
238238
return nil
239239
}
240240

241+
func (s *mockServer) QuitSignal() <-chan struct{} {
242+
return s.quit
243+
}
244+
241245
// mockHopIterator represents the test version of hop iterator which instead
242246
// of encrypting the path in onion blob just stores the path as a list of hops.
243247
type mockHopIterator struct {

lnpeer/peer.go

+6
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,10 @@ type Peer interface {
3333

3434
// Address returns the network address of the remote peer.
3535
Address() net.Addr
36+
37+
// QuitSignal is a method that should return a channel which will be
38+
// sent upon or closed once the backing peer exits. This allows callers
39+
// using the interface to cancel any processing in the event the backing
40+
// implementation exits.
41+
QuitSignal() <-chan struct{}
3642
}

peer.go

+57-20
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,20 @@ func (p *peer) Start() error {
315315
return nil
316316
}
317317

318+
// QuitSignal is a method that should return a channel which will be sent upon
319+
// or closed once the backing peer exits. This allows callers using the
320+
// interface to cancel any processing in the event the backing implementation
321+
// exits.
322+
//
323+
// NOTE: Part of the lnpeer.Peer interface.
324+
func (p *peer) QuitSignal() <-chan struct{} {
325+
return p.quit
326+
}
327+
318328
// loadActiveChannels creates indexes within the peer for tracking all active
319329
// channels returned by the database.
320330
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
321-
var activeChans []wire.OutPoint
331+
var activePublicChans []wire.OutPoint
322332
for _, dbChan := range chans {
323333
lnChan, err := lnwallet.NewLightningChannel(
324334
p.server.cc.signer, p.server.witnessBeacon, dbChan,
@@ -431,14 +441,19 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
431441
p.activeChannels[chanID] = lnChan
432442
p.activeChanMtx.Unlock()
433443

434-
activeChans = append(activeChans, *chanPoint)
444+
// Only if the channel is public do we need to collect it for
445+
// sending out a new enable update.
446+
chanIsPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
447+
if chanIsPublic {
448+
activePublicChans = append(activePublicChans, *chanPoint)
449+
}
435450
}
436451

437-
// As a final measure we launch a goroutine that will ensure the
438-
// channels are not currently disabled, as that will make us skip it
439-
// during path finding.
452+
// As a final measure we launch a goroutine that will ensure the newly
453+
// loaded public channels are not currently disabled, as that will make
454+
// us skip it during path finding.
440455
go func() {
441-
for _, chanPoint := range activeChans {
456+
for _, chanPoint := range activePublicChans {
442457
// Set the channel disabled=false by sending out a new
443458
// ChannelUpdate. If this channel is already active,
444459
// the update won't be sent.
@@ -738,6 +753,7 @@ func (ms *msgStream) Stop() {
738753
func (ms *msgStream) msgConsumer() {
739754
defer ms.wg.Done()
740755
defer peerLog.Tracef(ms.stopMsg)
756+
defer atomic.StoreInt32(&ms.streamShutdown, 1)
741757

742758
peerLog.Tracef(ms.startMsg)
743759

@@ -752,9 +768,10 @@ func (ms *msgStream) msgConsumer() {
752768
// Otherwise, we'll check the message queue for any new
753769
// items.
754770
select {
771+
case <-ms.peer.quit:
772+
return
755773
case <-ms.quit:
756774
ms.msgCond.L.Unlock()
757-
atomic.StoreInt32(&ms.streamShutdown, 1)
758775
return
759776
default:
760777
}
@@ -777,8 +794,9 @@ func (ms *msgStream) msgConsumer() {
777794
// grow indefinitely.
778795
select {
779796
case ms.producerSema <- struct{}{}:
797+
case <-ms.peer.quit:
798+
return
780799
case <-ms.quit:
781-
atomic.StoreInt32(&ms.streamShutdown, 1)
782800
return
783801
}
784802
}
@@ -837,13 +855,29 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
837855
// to the other side, they immediately send a
838856
// channel update message, but we haven't yet
839857
// sent the channel to the channelManager.
840-
p.server.fundingMgr.waitUntilChannelOpen(cid)
858+
err := p.server.fundingMgr.waitUntilChannelOpen(
859+
cid, p.quit,
860+
)
861+
if err != nil {
862+
// If we have a non-nil error, then the
863+
// funding manager is shutting down, s
864+
// we can exit here without attempting
865+
// to deliver the message.
866+
return
867+
}
841868
}
842869

843-
// TODO(roasbeef): only wait if not chan sync
870+
// In order to avoid unnecessarily delivering message
871+
// as the peer is exiting, we'll check quickly to see
872+
// if we need to exit.
873+
select {
874+
case <-p.quit:
875+
return
876+
default:
877+
}
844878

845-
// Dispatch the commitment update message to the proper active
846-
// goroutine dedicated to this channel.
879+
// Dispatch the commitment update message to the proper
880+
// active goroutine dedicated to this channel.
847881
if chanLink == nil {
848882
link, err := p.server.htlcSwitch.GetLink(cid)
849883
if err != nil {
@@ -854,6 +888,15 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
854888
chanLink = link
855889
}
856890

891+
// In order to avoid unnecessarily delivering message
892+
// as the peer is exiting, we'll check quickly to see
893+
// if we need to exit.
894+
select {
895+
case <-p.quit:
896+
return
897+
default:
898+
}
899+
857900
chanLink.HandleChannelUpdate(msg)
858901
},
859902
)
@@ -878,6 +921,7 @@ func newDiscMsgStream(p *peer) *msgStream {
878921
//
879922
// NOTE: This method MUST be run as a goroutine.
880923
func (p *peer) readHandler() {
924+
defer p.wg.Done()
881925

882926
// We'll stop the timer after a new messages is received, and also
883927
// reset it after we process the next message.
@@ -1056,6 +1100,7 @@ out:
10561100
chanStream = newChanMsgStream(p, targetChan)
10571101
chanMsgStreams[targetChan] = chanStream
10581102
chanStream.Start()
1103+
defer chanStream.Stop()
10591104
}
10601105

10611106
// With the stream obtained, add the message to the
@@ -1066,16 +1111,8 @@ out:
10661111
idleTimer.Reset(idleTimeout)
10671112
}
10681113

1069-
p.wg.Done()
1070-
10711114
p.Disconnect(errors.New("read handler closed"))
10721115

1073-
for cid, chanStream := range chanMsgStreams {
1074-
chanStream.Stop()
1075-
1076-
delete(chanMsgStreams, cid)
1077-
}
1078-
10791116
peerLog.Tracef("readHandler for peer %v done", p)
10801117
}
10811118

server.go

+7
Original file line numberDiff line numberDiff line change
@@ -3090,6 +3090,13 @@ func (s *server) watchChannelStatus() {
30903090
// the status of closed channels around.
30913091
newStatus := make(map[wire.OutPoint]activeStatus)
30923092
for _, c := range channels {
3093+
// We'll skip any private channels, as they
3094+
// aren't used for routing within the network
3095+
// by other nodes.
3096+
if c.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
3097+
continue
3098+
}
3099+
30933100
chanID := lnwire.NewChanIDFromOutPoint(
30943101
&c.FundingOutpoint)
30953102

0 commit comments

Comments
 (0)