Skip to content

Commit d746a9b

Browse files
committed
fix potential data/write conflicts
1 parent 020b55c commit d746a9b

File tree

1 file changed

+6
-17
lines changed

1 file changed

+6
-17
lines changed

pulsar/internal/connection.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -418,27 +418,16 @@ func (c *connection) run() {
418418
c.log.Debugf("Connection run starting with request capacity=%d queued=%d",
419419
cap(c.incomingRequestsCh), len(c.incomingRequestsCh))
420420

421-
go func() {
422-
for {
423-
select {
424-
case <-c.closeCh:
425-
c.failLeftRequestsWhenClose()
426-
return
427-
428-
case req := <-c.incomingRequestsCh:
429-
if req == nil {
430-
return // TODO: this never gonna be happen
431-
}
432-
c.internalSendRequest(req)
433-
}
434-
}
435-
}()
436-
437421
for {
438422
select {
439423
case <-c.closeCh:
424+
c.failLeftRequestsWhenClose()
440425
return
441-
426+
case req := <-c.incomingRequestsCh:
427+
if req == nil {
428+
return // TODO: this never gonna be happen
429+
}
430+
c.internalSendRequest(req)
442431
case cmd := <-c.incomingCmdCh:
443432
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
444433
case data := <-c.writeRequestsCh:

0 commit comments

Comments
 (0)