Skip to content

Commit c039132

Browse files
yurahaidhiimjako
andauthored
Fix panic when response failed by timeout (#396)
* Fix panic when response failed by timeout * Remove unnecessary new line in pkg/stream/server_frame.go Co-authored-by: Alberto Moretti <[email protected]> * Fix handling error if response not found in coordinator --------- Co-authored-by: Alberto Moretti <[email protected]>
1 parent aad983b commit c039132

File tree

3 files changed

+71
-16
lines changed

3 files changed

+71
-16
lines changed

pkg/stream/client_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,4 +213,21 @@ var _ = Describe("Streaming testEnvironment", func() {
213213
Expect(res).To(BeNil())
214214
})
215215

216+
It("Client.handleGenericResponse handles timeout and missing response gracefully", func() {
217+
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
218+
219+
// Simulate timeout: create a response and remove it immediately
220+
res := cli.coordinator.NewResponse(commandDeclarePublisher, "Simulated Test")
221+
err := cli.coordinator.RemoveResponseById(res.correlationid)
222+
Expect(err).To(BeNil())
223+
224+
// Simulate receiving a response for the removed correlation ID
225+
readerProtocol := &ReaderProtocol{
226+
CorrelationId: uint32(res.correlationid),
227+
ResponseCode: responseCodeStreamNotAvailable,
228+
}
229+
cli.handleGenericResponse(readerProtocol, bufio.NewReader(bytes.NewBuffer([]byte{})))
230+
231+
})
232+
216233
})

pkg/stream/coordinator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error) {
221221
if err != nil {
222222
return nil, err
223223
}
224-
return v.(*Response), err
224+
return v.(*Response), nil
225225
}
226226

227227
func (coordinator *Coordinator) ConsumersCount() int {

pkg/stream/server_frame.go

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (c *Client) handleResponse() {
146146
}
147147
}
148148

149-
func (c *Client) handleSaslHandshakeResponse(streamingRes *ReaderProtocol, r *bufio.Reader) interface{} {
149+
func (c *Client) handleSaslHandshakeResponse(streamingRes *ReaderProtocol, r *bufio.Reader) {
150150
streamingRes.CorrelationId, _ = readUInt(r)
151151
streamingRes.ResponseCode = uShortExtractResponseCode(readUShort(r))
152152
mechanismsCount, _ := readUInt(r)
@@ -158,12 +158,11 @@ func (c *Client) handleSaslHandshakeResponse(streamingRes *ReaderProtocol, r *bu
158158

159159
res, err := c.coordinator.GetResponseById(streamingRes.CorrelationId)
160160
if err != nil {
161-
// TODO handle response
162-
return err
161+
logErrorCommand(err, "handleSaslHandshakeResponse")
162+
return
163163
}
164-
res.data <- mechanisms
165164

166-
return mechanisms
165+
res.data <- mechanisms
167166
}
168167

169168
func (c *Client) handlePeerProperties(readProtocol *ReaderProtocol, r *bufio.Reader) {
@@ -178,7 +177,11 @@ func (c *Client) handlePeerProperties(readProtocol *ReaderProtocol, r *bufio.Rea
178177
serverProperties[key] = value
179178
}
180179
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
181-
logErrorCommand(err, "handlePeerProperties")
180+
if err != nil {
181+
logErrorCommand(err, "handlePeerProperties")
182+
return
183+
}
184+
182185
res.code <- Code{id: readProtocol.ResponseCode}
183186
res.data <- serverProperties
184187

@@ -210,7 +213,11 @@ func (c *Client) handleGenericResponse(readProtocol *ReaderProtocol, r *bufio.Re
210213
readProtocol.CorrelationId, _ = readUInt(r)
211214
readProtocol.ResponseCode = uShortExtractResponseCode(readUShort(r))
212215
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
213-
logErrorCommand(err, "handleGenericResponse")
216+
if err != nil {
217+
logErrorCommand(err, "handleGenericResponse")
218+
return
219+
}
220+
214221
res.code <- Code{id: readProtocol.ResponseCode}
215222
}
216223

@@ -237,7 +244,11 @@ func (c *Client) commandOpen(readProtocol *ReaderProtocol, r *bufio.Reader) {
237244
}
238245

239246
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
240-
logErrorCommand(err, "commandOpen")
247+
if err != nil {
248+
logErrorCommand(err, "commandOpen")
249+
return
250+
}
251+
241252
res.code <- Code{id: readProtocol.ResponseCode}
242253
res.data <- clientProperties
243254

@@ -277,7 +288,11 @@ func (c *Client) queryPublisherSequenceFrameHandler(readProtocol *ReaderProtocol
277288
readProtocol.ResponseCode = uShortExtractResponseCode(readUShort(r))
278289
sequence := readInt64(r)
279290
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
280-
logErrorCommand(err, "queryPublisherSequenceFrameHandler")
291+
if err != nil {
292+
logErrorCommand(err, "queryPublisherSequenceFrameHandler")
293+
return
294+
}
295+
281296
res.code <- Code{id: readProtocol.ResponseCode}
282297
res.data <- sequence
283298
}
@@ -458,7 +473,11 @@ func (c *Client) queryOffsetFrameHandler(readProtocol *ReaderProtocol,
458473
c.handleGenericResponse(readProtocol, r)
459474
offset := readInt64(r)
460475
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
461-
logErrorCommand(err, "queryOffsetFrameHandler")
476+
if err != nil {
477+
logErrorCommand(err, "queryOffsetFrameHandler")
478+
return
479+
}
480+
462481
res.data <- offset
463482
}
464483

@@ -516,7 +535,11 @@ func (c *Client) streamStatusFrameHandler(readProtocol *ReaderProtocol,
516535
streamStatus[key] = value
517536
}
518537
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
519-
logErrorCommand(err, "streamStatusFrameHandler")
538+
if err != nil {
539+
logErrorCommand(err, "streamStatusFrameHandler")
540+
return
541+
}
542+
520543
res.code <- Code{id: readProtocol.ResponseCode}
521544
res.data <- streamStatus
522545

@@ -553,7 +576,10 @@ func (c *Client) metadataFrameHandler(readProtocol *ReaderProtocol,
553576
}
554577

555578
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
556-
logErrorCommand(err, "metadataFrameHandler")
579+
if err != nil {
580+
logErrorCommand(err, "metadataFrameHandler")
581+
return
582+
}
557583

558584
res.code <- Code{id: readProtocol.ResponseCode}
559585
res.data <- streamsMetadata
@@ -612,7 +638,11 @@ func (c *Client) handleQueryPartitions(readProtocol *ReaderProtocol, r *bufio.Re
612638
partitions = append(partitions, partition)
613639
}
614640
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
615-
logErrorCommand(err, "handleQueryPartitions")
641+
if err != nil {
642+
logErrorCommand(err, "handleQueryPartitions")
643+
return
644+
}
645+
616646
res.code <- Code{id: readProtocol.ResponseCode}
617647
res.data <- partitions
618648
}
@@ -629,7 +659,11 @@ func (c *Client) handleQueryRoute(readProtocol *ReaderProtocol, r *bufio.Reader)
629659
}
630660

631661
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
632-
logErrorCommand(err, "handleQueryRoute")
662+
if err != nil {
663+
logErrorCommand(err, "handleQueryRoute")
664+
return
665+
}
666+
633667
res.code <- Code{id: readProtocol.ResponseCode}
634668
res.data <- routes
635669
}
@@ -646,7 +680,11 @@ func (c *Client) handleExchangeVersionResponse(readProtocol *ReaderProtocol, r *
646680
commands = append(commands, newCommandVersionResponse(minVersion, maxVersion, commandKey))
647681
}
648682
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
649-
logErrorCommand(err, "handleExchangeVersionResponse")
683+
if err != nil {
684+
logErrorCommand(err, "handleExchangeVersionResponse")
685+
return
686+
}
687+
650688
res.code <- Code{id: readProtocol.ResponseCode}
651689
res.data <- commands
652690
}

0 commit comments

Comments
 (0)