Open
Description
Expected behavior
After pendingItem.done()
is called, pendingItem.buffer
that are holding in connection.writeRequestsCh
should not be written to network by conneciton.internalWriteData()/conneciton.cnx.Write()
.
Actual behavior
- When we send message, we pass
pendingItem.buffer
toconnection.writeRequestsCh
; connection.writeRequestsCh
is a buffered channel(writeRequestsCh: make(chan Buffer, 256)
), which means the buffer will be written to network later/asnyc;- If
pendingItem
timeout happen beforependingItem.buffer
being sent to network,pendingItem.buffer
will be put back to the pool and realloced, butconnection.writeRequestsCh
still hold the reference of the OLD buffer; - Now, if we run to
conneciton.internalWriteData()/conneciton.cnx.Write()
, data race will happen, we intent to send the OLDpendingItem.buffer
, but the buffer holds the data of a NEWpendingItem
; - Because timeout is longer than sending, it hasn't happened in reality, but it is theoretically possible.
Steps to reproduce
Code review:
// writeData will pass `pendingItem.buffer` to `connection.writeRequestsCh`
func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) {
select {
case <-p.ctx.Done():
for _, cb := range callbacks {
if sr, ok := cb.(*sendRequest); ok {
sr.done(nil, ErrProducerClosed)
}
}
return
default:
now := time.Now()
p.pendingQueue.Put(&pendingItem{
createdAt: now,
sentAt: now,
buffer: buffer,
sequenceID: sequenceID,
sendRequests: callbacks,
})
p._getConn().WriteData(buffer)
}
}
// `connection.writeRequestsCh` is a buffer channel
func newConnection(opts connectionOptions) *connection {
cnx := &connection{
// This channel is used to pass data from producers to the connection
// go routine. It can become contended or blocking if we have multiple
// partition produces writing on a single connection. In general it's
// good to keep this above the number of partition producers assigned
// to a single connection.
writeRequestsCh: make(chan Buffer, 256),
}
return cnx
}
// `pendingItem.buffer` is held in `connection.writeRequestsCh` and will be sent in another goroutine
func (c *connection) WriteData(data Buffer) {
select {
case c.writeRequestsCh <- data:
// Channel is not full
return
default:
// Channel full, fallback to probe if connection is closed
}
for {
select {
case c.writeRequestsCh <- data:
// Successfully wrote on the channel
return
case <-time.After(100 * time.Millisecond):
// The channel is either:
// 1. blocked, in which case we need to wait until we have space
// 2. the connection is already closed, then we need to bail out
c.log.Debug("Couldn't write on connection channel immediately")
if c.getState() != connectionReady {
c.log.Debug("Connection was already closed")
return
}
}
}
}
// `pendingItem.buffer` will be written to network by c.internalWriteData(data)
func (c *connection) run() {
pingSendTicker := time.NewTicker(c.keepAliveInterval)
pingCheckTicker := time.NewTicker(c.keepAliveInterval)
defer func() {
// stop tickers
pingSendTicker.Stop()
pingCheckTicker.Stop()
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue
// https://github.com/apache/pulsar-client-go/issues/239
c.failPendingRequests(errConnectionClosed)
c.Close()
}()
// All reads come from the reader goroutine
go c.reader.readFromConnection()
go c.runPingCheck(pingCheckTicker)
c.log.Debugf("Connection run starting with request capacity=%d queued=%d",
cap(c.incomingRequestsCh), len(c.incomingRequestsCh))
for {
select {
case <-c.closeCh:
c.failLeftRequestsWhenClose()
return
case req := <-c.incomingRequestsCh:
if req == nil {
return // TODO: this never gonna be happen
}
c.internalSendRequest(req)
case cmd := <-c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
case data := <-c.writeRequestsCh:
if data == nil {
return
}
c.internalWriteData(data)
case <-pingSendTicker.C:
c.sendPing()
}
}
}
// when pendingItem is done, its buffer will be put back to the pool and realloced later
func (i *pendingItem) done(err error) {
if i.isDone {
return
}
i.isDone = true
buffersPool.Put(i.buffer)
if i.flushCallback != nil {
i.flushCallback(err)
}
}
// when `pendingItem.done()` is called before `c.internalWriteData(data)`, data race will happen
System configuration
Pulsar version: x.y
Metadata
Metadata
Assignees
Labels
No labels
Activity