Skip to content

Commit 28a5936

Browse files
Roasbeefcfromknecht
authored andcommitted
peer: ensure readHandler doesn't block on AddMsg to msgStream
In this commit, we add a quit channel to the AddMsg method of the msgStream struct. Before this commit, if the queue was full, the readHandler would block and be unable to exit. We remedy this by leveraging the existing quit channel of the peer as an additional select case within the AddMsg method.
1 parent a5b9279 commit 28a5936

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

peer.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -804,14 +804,16 @@ func (ms *msgStream) msgConsumer() {
804804

805805
// AddMsg adds a new message to the msgStream. This function is safe for
806806
// concurrent access.
807-
func (ms *msgStream) AddMsg(msg lnwire.Message) {
807+
func (ms *msgStream) AddMsg(msg lnwire.Message, quit chan struct{}) {
808808
// First, we'll attempt to receive from the producerSema struct. This
809809
// acts as a sempahore to prevent us from indefinitely buffering
810810
// incoming items from the wire. Either the msg queue isn't full, and
811811
// we'll not block, or the queue is full, and we'll block until either
812812
// we're signalled to quit, or a slot is freed up.
813813
select {
814814
case <-ms.producerSema:
815+
case <-quit:
816+
return
815817
case <-ms.quit:
816818
return
817819
}
@@ -1020,7 +1022,7 @@ out:
10201022
// forward the error to all channels with this peer.
10211023
case msg.ChanID == lnwire.ConnectionWideID:
10221024
for chanID, chanStream := range chanMsgStreams {
1023-
chanStream.AddMsg(nextMsg)
1025+
chanStream.AddMsg(nextMsg, p.quit)
10241026

10251027
// Also marked this channel as failed,
10261028
// so we won't try to restart it on
@@ -1082,7 +1084,7 @@ out:
10821084
*lnwire.ReplyChannelRange,
10831085
*lnwire.ReplyShortChanIDsEnd:
10841086

1085-
discStream.AddMsg(msg)
1087+
discStream.AddMsg(msg, p.quit)
10861088

10871089
default:
10881090
peerLog.Errorf("unknown message %v received from peer "+
@@ -1105,7 +1107,7 @@ out:
11051107

11061108
// With the stream obtained, add the message to the
11071109
// stream so we can continue processing message.
1108-
chanStream.AddMsg(nextMsg)
1110+
chanStream.AddMsg(nextMsg, p.quit)
11091111
}
11101112

11111113
idleTimer.Reset(idleTimeout)

0 commit comments

Comments
 (0)