Skip to content

Commit

Permalink
revert pendingItem.done() to its old position
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Mar 7, 2025
1 parent d005752 commit 2e7753b
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,21 +848,6 @@ type pendingItem struct {
flushCallback func(err error)
}

func (i *pendingItem) done(err error) {
if i.isDone {
return
}
i.isDone = true
buffersPool.Put(i.buffer)
if i.flushCallback != nil {
i.flushCallback(err)
}

if i.cancel != nil {
i.cancel()
}
}

func (p *partitionProducer) internalFlushCurrentBatch() {
if p.batchBuilder == nil {
// batch is not enabled
Expand Down Expand Up @@ -1752,6 +1737,21 @@ type flushRequest struct {
err error
}

func (i *pendingItem) done(err error) {
if i.isDone {
return
}
i.isDone = true
buffersPool.Put(i.buffer)
if i.flushCallback != nil {
i.flushCallback(err)
}

if i.cancel != nil {
i.cancel()
}
}

// _setConn sets the internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer when a new connection is available.
func (p *partitionProducer) _setConn(conn internal.Connection) {
Expand Down

0 comments on commit 2e7753b

Please sign in to comment.