Skip to content

Performance improvement on unconfirmed map #374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,8 @@ $(STATICCHECK):
check: $(STATICCHECK)
$(STATICCHECK) ./pkg/stream

GOMOCK ?= $(GOBIN)/mockgen
GOMOCK_VERSION ?= v1.6.0
$(GOMOCK):
go install github.com/golang/mock/mockgen@$(GOMOCK_VERSION)

.PHONY: gomock
gomock: $(GOMOCK)

NUM_PROCS ?= 2
TEST_TIMEOUT ?= 2m
TEST_TIMEOUT ?= 3m
test: vet fmt check
go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo -r --procs=$(NUM_PROCS) --compilers=$(NUM_PROCS) \
--randomize-all --randomize-suites \
Expand Down
4 changes: 1 addition & 3 deletions pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type ReliableProducer struct {
producerOptions *stream.ProducerOptions
count int32
confirmMessageHandler ConfirmMessageHandler
channelPublishConfirm stream.ChannelPublishConfirm
mutex *sync.Mutex
mutexStatus *sync.Mutex
status int
Expand Down Expand Up @@ -108,8 +107,7 @@ func (p *ReliableProducer) newProducer() error {

return err
}
p.channelPublishConfirm = producer.NotifyPublishConfirmation()
p.handlePublishConfirm(p.channelPublishConfirm)
p.handlePublishConfirm(producer.NotifyPublishConfirmation())
channelNotifyClose := producer.NotifyClose()
p.handleNotifyClose(channelNotifyClose)
p.producer = producer
Expand Down
31 changes: 13 additions & 18 deletions pkg/stream/blocking_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package stream

import (
"errors"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"sync/atomic"
"time"

"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
)

var ErrBlockingQueueStopped = errors.New("blocking queue stopped")

type BlockingQueue[T any] struct {
queue chan T
status int32
lastUpdate int64
queue chan T
status int32
}

// NewBlockingQueue initializes a new BlockingQueue with the given capacity
Expand All @@ -28,7 +28,6 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error {
if bq.IsStopped() {
return ErrBlockingQueueStopped
}
atomic.StoreInt64(&bq.lastUpdate, time.Now().UnixNano())
bq.queue <- item
return nil
}
Expand All @@ -50,26 +49,22 @@ func (bq *BlockingQueue[T]) IsEmpty() bool {
// Stop is different from Close in that it allows the
// existing items to be processed.
// Drain the queue to be sure there are not pending messages
func (bq *BlockingQueue[T]) Stop() {
func (bq *BlockingQueue[T]) Stop() []T {
atomic.StoreInt32(&bq.status, 1)
// drain the queue. To be sure there are not pending messages
// in the queue.
// it does not matter if we lose some messages here
// since there is the unConfirmed map to handle the messages
isActive := true
for isActive {
// in the queue and return to the caller the remaining pending messages
msgInQueue := make([]T, 0, len(bq.queue))
outer:
for {
select {
case <-bq.queue:
// do nothing
case msg := <-bq.queue:
msgInQueue = append(msgInQueue, msg)
case <-time.After(10 * time.Millisecond):
isActive = false
return
default:
isActive = false
return
break outer
}
}
logs.LogDebug("BlockingQueue stopped")
return msgInQueue
}

func (bq *BlockingQueue[T]) Close() {
Expand Down
8 changes: 5 additions & 3 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ const (
responseCodeNoOffset = uint16(19)

/// responses out of protocol
closeChannel = uint16(60)
connectionCloseError = uint16(61)
timeoutError = uint16(62)
timeoutError = uint16(62)
entityClosed = uint16(63)

///
defaultSocketCallTimeout = 10 * time.Second
Expand Down Expand Up @@ -194,6 +193,9 @@ func lookErrorCode(errorCode uint16) error {
return AuthenticationFailureLoopbackError
case timeoutError:
return ConfirmationTimoutError
case entityClosed:
return AlreadyClosed

default:
{
logs.LogWarn("Error not handled %d", errorCode)
Expand Down
101 changes: 74 additions & 27 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"bufio"
"bytes"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"sync"
"sync/atomic"
"time"

"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
)

type ConfirmationStatus struct {
Expand Down Expand Up @@ -50,7 +51,17 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 {
return cs.errorCode
}

func (cs *ConfirmationStatus) updateStatus(errorCode uint16, confirmed bool) {
cs.confirmed = confirmed
if confirmed {
return
}
cs.errorCode = errorCode
cs.err = lookErrorCode(errorCode)
}

type messageSequence struct {
sourceMsg message.StreamMessage
messageBytes []byte
publishingId int64
filterValue string
Expand Down Expand Up @@ -193,18 +204,10 @@ func NewProducerOptions() *ProducerOptions {
}
}

func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus {
return producer.unConfirmed.getAll()
}

func (po *ProducerOptions) isSubEntriesBatching() bool {
return po.SubEntrySize > 1
}

func (producer *Producer) lenUnConfirmed() int {
return producer.unConfirmed.size()
}

// NotifyPublishConfirmation returns a channel that receives the confirmation status of the messages sent by the producer.
func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm {
ch := make(chan []*ConfirmationStatus, 1)
Expand Down Expand Up @@ -292,13 +295,17 @@ func (producer *Producer) processPendingSequencesQueue() {
var lastError error

if producer.pendingSequencesQueue.IsStopped() {
// add also the last message to sequenceToSend
// otherwise it will be lost
sequenceToSend = append(sequenceToSend, msg)
break
}
// There is something in the queue. Checks the buffer is still less than the maxFrame
totalBufferToSend += len(msg.messageBytes)
if totalBufferToSend > maxFrame {
// if the totalBufferToSend is greater than the requestedMaxFrameSize
// the producer sends the messages and reset the buffer
producer.unConfirmed.addFromSequences(sequenceToSend, producer.GetID())
lastError = producer.internalBatchSend(sequenceToSend)
sequenceToSend = sequenceToSend[:0]
totalBufferToSend = initBufferPublishSize
Expand All @@ -310,6 +317,7 @@ func (producer *Producer) processPendingSequencesQueue() {
// the messages during the checks of the buffer. In this case
if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize {
if len(sequenceToSend) > 0 {
producer.unConfirmed.addFromSequences(sequenceToSend, producer.GetID())
lastError = producer.internalBatchSend(sequenceToSend)
sequenceToSend = sequenceToSend[:0]
totalBufferToSend += initBufferPublishSize
Expand All @@ -323,13 +331,36 @@ func (producer *Producer) processPendingSequencesQueue() {
// just in case there are messages in the buffer
// not matter is sent or not the messages will be timed out
if len(sequenceToSend) > 0 {
_ = producer.internalBatchSend(sequenceToSend)
producer.markUnsentAsUnconfirmed(sequenceToSend)
}

}()
logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id)
}

func (producer *Producer) markUnsentAsUnconfirmed(sequences []*messageSequence) {
if len(sequences) == 0 {
return
}

// Send as unconfirmed the messages in the pendingSequencesQueue,
// that have never been sent,
// with the "entityClosed" error.
confirms := make([]*ConfirmationStatus, 0, len(sequences))
for _, ps := range sequences {
cs := &ConfirmationStatus{
inserted: time.Now(),
message: ps.sourceMsg,
producerID: producer.GetID(),
publishingId: ps.publishingId,
confirmed: false,
}
cs.updateStatus(entityClosed, false)
confirms = append(confirms, cs)
}
producer.sendConfirmationStatus(confirms)
}

func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 {
sequence := message.GetPublishingId()
// in case of sub entry the deduplication is disabled
Expand All @@ -349,10 +380,12 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str
if producer.options.IsFilterEnabled() {
filterValue = producer.options.Filter.FilterValue(streamMessage)
}
msqSeq := &messageSequence{}
msqSeq.messageBytes = marshalBinary
msqSeq.publishingId = seq
msqSeq.filterValue = filterValue
msqSeq := &messageSequence{
sourceMsg: streamMessage,
messageBytes: marshalBinary,
publishingId: seq,
filterValue: filterValue,
}
return msqSeq, nil
}

Expand All @@ -366,9 +399,13 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
if err != nil {
return err
}
producer.unConfirmed.addFromSequence(messageSeq, &streamMessage, producer.GetID())
if producer.getStatus() == closed {
producer.markUnsentAsUnconfirmed([]*messageSequence{messageSeq})
return fmt.Errorf("producer id: %d closed", producer.id)
}

if len(messageSeq.messageBytes) > defaultMaxFrameSize {
producer.unConfirmed.addFromSequences([]*messageSequence{messageSeq}, producer.GetID())
tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge)
producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge})
return FrameTooLarge
Expand All @@ -377,7 +414,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
// se the processPendingSequencesQueue function
err = producer.pendingSequencesQueue.Enqueue(messageSeq)
if err != nil {
return fmt.Errorf("error during enqueue message: %s. Message will be in timed. Producer id: %d ", err, producer.id)
return fmt.Errorf("error during enqueue message: %s pending queue closed. Producer id: %d ", err, producer.id)
}
return nil
}
Expand All @@ -389,32 +426,40 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
// returns an error if the message could not be sent for marshal problems or if the buffer is too large
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error {
maxFrame := defaultMaxFrameSize
var messagesSequence = make([]*messageSequence, 0)
var messagesSequences = make([]*messageSequence, 0, len(batchMessages))
totalBufferToSend := 0

for _, batchMessage := range batchMessages {
messageSeq, err := producer.fromMessageToMessageSequence(batchMessage)
if err != nil {
return err
}
producer.unConfirmed.addFromSequence(messageSeq, &batchMessage, producer.GetID())

totalBufferToSend += len(messageSeq.messageBytes)
messagesSequence = append(messagesSequence, messageSeq)
messagesSequences = append(messagesSequences, messageSeq)
}

if producer.getStatus() == closed {
producer.markUnsentAsUnconfirmed(messagesSequences)
return fmt.Errorf("producer id: %d closed", producer.id)
}

if len(messagesSequences) > 0 {
producer.unConfirmed.addFromSequences(messagesSequences, producer.GetID())
}
//

if totalBufferToSend+initBufferPublishSize > maxFrame {
// if the totalBufferToSend is greater than the requestedMaxFrameSize
// all the messages are unconfirmed

for _, msg := range messagesSequence {
for _, msg := range messagesSequences {
m := producer.unConfirmed.extractWithError(msg.publishingId, responseCodeFrameTooLarge)
producer.sendConfirmationStatus([]*ConfirmationStatus{m})
}
return FrameTooLarge
}

return producer.internalBatchSend(messagesSequence)
return producer.internalBatchSend(messagesSequences)
}

func (producer *Producer) GetID() uint8 {
Expand Down Expand Up @@ -605,7 +650,6 @@ func (producer *Producer) close(reason Event) error {
logs.LogDebug("producer options is nil, the close will be ignored")
return nil
}
_, _ = producer.options.client.coordinator.ExtractProducerById(producer.id)

if !producer.options.client.socket.isOpen() {
return fmt.Errorf("tcp connection is closed")
Expand All @@ -616,6 +660,8 @@ func (producer *Producer) close(reason Event) error {
_ = producer.options.client.deletePublisher(producer.id)
}

_, _ = producer.options.client.coordinator.ExtractProducerById(producer.id)

if producer.options.client.coordinator.ProducersCount() == 0 {
_ = producer.options.client.Close()
}
Expand All @@ -635,7 +681,8 @@ func (producer *Producer) stopAndWaitPendingSequencesQueue() {

// Stop the pendingSequencesQueue, so the producer can't send messages anymore
// but the producer can still handle the inflight messages
producer.pendingSequencesQueue.Stop()
pendingSequences := producer.pendingSequencesQueue.Stop()
producer.markUnsentAsUnconfirmed(pendingSequences)

// Stop the confirmationTimeoutTicker. It will flush the unconfirmed messages
producer.confirmationTimeoutTicker.Stop()
Expand All @@ -657,9 +704,9 @@ func (producer *Producer) waitForInflightMessages() {

tentatives := 0

for (producer.lenUnConfirmed() > 0) && tentatives < 5 {
for (producer.unConfirmed.size() > 0) && tentatives < 5 {
logs.LogInfo("wait inflight messages - unconfirmed len: %d - retry: %d",
producer.lenUnConfirmed(), tentatives)
producer.unConfirmed.size(), tentatives)
producer.flushUnConfirmedMessages()
time.Sleep(time.Duration(500) * time.Millisecond)
tentatives++
Expand Down
Loading
Loading