Skip to content

Consumer improvements for 1.5 #371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
check-latest: true
- run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }}
- uses: actions/checkout@main
- uses: codecov/codecov-action@v4
- uses: codecov/codecov-action@v5
with:
fail_ci_if_error: false # optional (default = false)
files: ./coverage.txt
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
### Overview

Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream)
The client contains all features to interact with the RabbitMQ Stream Queues. </br>

The main structure is the `Environment` that contains the `Producer` and `Consumer` interfaces. </br>

`Producer` and `Consumer` are the main interfaces to interact with the RabbitMQ Stream Queues. </br>
They don't support the auto-reconnect in case of disconnection but have the events to detect it.</br>

The client provides the `ReliableProducer` and `ReliableConsumer` that support the auto-reconnect in case of disconnection.</br>
See also the [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer) section.

### Installing

Expand Down
22 changes: 1 addition & 21 deletions pkg/amqp/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,6 @@ type Message struct {
//receiver *Receiver // Receiver the message was received from
settled bool // whether transfer was settled by sender

// doneSignal is a channel that indicate when a message is considered acted upon by downstream handler
doneSignal chan struct{}
}

// AMQP10 is an AMQP 1.0 message with the necessary fields to work with the
Expand Down Expand Up @@ -501,16 +499,7 @@ func (amqp *AMQP10) GetAMQPValue() interface{} {
// more complex usages.
func newMessage(data []byte) *Message {
return &Message{
Data: [][]byte{data},
doneSignal: make(chan struct{}),
}
}

// done closes the internal doneSignal channel to let the receiver know that this message has been acted upon
func (m *Message) done() {
// TODO: move initialization in ctor and use ctor everywhere?
if m.doneSignal != nil {
close(m.doneSignal)
Data: [][]byte{data},
}
}

Expand All @@ -523,15 +512,6 @@ func (m *Message) GetData() []byte {
return m.Data[0]
}

// Ignore notifies the amqp message pump that the message has been handled
// without any disposition. It frees the amqp receiver to get the next message
// this is implicitly done after calling message dispositions (Accept/Release/Reject/Modify)
func (m *Message) Ignore() {
if m.shouldSendDisposition() {
m.done()
}
}

// MarshalBinary encodes the message into binary form.
func (m *Message) MarshalBinary() ([]byte, error) {
buf := new(buffer)
Expand Down
1 change: 0 additions & 1 deletion pkg/ha/ha_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func NewReliableConsumer(env *stream.Environment, streamName string,
logs.LogDebug("[Reliable] - creating %s", res.getInfo())
err := res.newConsumer()
if err == nil {

res.setStatus(StatusOpen)
}
logs.LogDebug("[Reliable] - created %s", res.getInfo())
Expand Down
5 changes: 2 additions & 3 deletions pkg/stream/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ var _ = Describe("Compression algorithms", func() {
}

message := &messageSequence{
messageBytes: messagePayload,
unCompressedSize: len(messagePayload),
publishingId: 0,
messageBytes: messagePayload,
publishingId: 0,
}

entries = &subEntries{
Expand Down
26 changes: 6 additions & 20 deletions pkg/stream/blocking_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
var ErrBlockingQueueStopped = errors.New("blocking queue stopped")

type BlockingQueue[T any] struct {
queue chan T
status int32
queue chan T
status int32
lastUpdate int64
}

// NewBlockingQueue initializes a new BlockingQueue with the given capacity
Expand All @@ -27,28 +28,13 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error {
if bq.IsStopped() {
return ErrBlockingQueueStopped
}
atomic.StoreInt64(&bq.lastUpdate, time.Now().UnixNano())
bq.queue <- item

return nil
}

// Dequeue removes an item from the queue with a timeout
func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T {
if bq.IsStopped() {
var zeroValue T // Zero value of type T
return zeroValue
}
select {
case item, ok := <-bq.queue:
if !ok {
var zeroValue T // Zero value of type T
return zeroValue
}
return item
case <-time.After(timeout):
var zeroValue T // Zero value of type T
return zeroValue
}
func (bq *BlockingQueue[T]) GetChannel() chan T {
return bq.queue
}

func (bq *BlockingQueue[T]) Size() int {
Expand Down
22 changes: 7 additions & 15 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type Client struct {
saslConfiguration *SaslConfiguration

mutex *sync.Mutex
metadataListener metadataListener
lastHeartBeat HeartBeat
socketCallTimeout time.Duration
availableFeatures *availableFeatures
Expand Down Expand Up @@ -112,14 +111,10 @@ func newClient(connectionName string, broker *Broker,
}

func (c *Client) getSocket() *socket {
//c.mutex.Lock()
//defer c.mutex.Unlock()
return &c.socket
}

func (c *Client) setSocketConnection(connection net.Conn) {
//c.mutex.Lock()
//defer c.mutex.Unlock()
c.socket.connection = connection
c.socket.writer = bufio.NewWriter(connection)
}
Expand Down Expand Up @@ -432,7 +427,7 @@ func (c *Client) heartBeat() {
tickerHeartbeat.Stop()
return
case <-tickerHeartbeat.C:
for c.socket.isOpen() {
if c.socket.isOpen() {
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
v := atomic.AddInt32(&heartBeatMissed, 1)
logs.LogWarn("Missing heart beat: %d", v)
Expand Down Expand Up @@ -512,10 +507,6 @@ func (c *Client) Close() error {
}
}

if c.metadataListener != nil {
close(c.metadataListener)
c.metadataListener = nil
}
c.closeHartBeat()
if c.getSocket().isOpen() {

Expand Down Expand Up @@ -747,6 +738,7 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {

streamMetadata := streamsMetadata.Get(stream)
if streamMetadata.responseCode != responseCodeOk {

return nil, lookErrorCode(streamMetadata.responseCode)
}

Expand Down Expand Up @@ -883,6 +875,10 @@ func (c *Client) DeclareSubscriber(streamName string,
return nil, fmt.Errorf("message count before storage must be bigger than one")
}

if messagesHandler == nil {
return nil, fmt.Errorf("messages Handler must be set")
}

if options.Offset.isLastConsumed() {
lastOffset, err := c.queryOffset(options.ConsumerName, streamName)
switch {
Expand Down Expand Up @@ -992,12 +988,8 @@ func (c *Client) DeclareSubscriber(streamName string,
go func() {
for {
select {
case code := <-consumer.response.code:
if code.id == closeChannel {
return
}

case chunk, ok := <-consumer.response.chunkForConsumer:
case chunk, ok := <-consumer.chunkForConsumer:
if !ok {
return
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ const (
///
defaultSocketCallTimeout = 10 * time.Second

defaultHeartbeat = 60 * time.Second
defaultHeartbeat = 60 * time.Second
defaultMaxFrameSize = 1048574

//
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"
Expand Down Expand Up @@ -119,6 +120,7 @@ const (
LeaderLocatorBalanced = "balanced"
LeaderLocatorClientLocal = "client-local"
DeletePublisher = "deletePublisher"
UnSubscribe = "unSubscribe"

StreamTcpPort = "5552"

Expand Down
86 changes: 50 additions & 36 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
)

type Consumer struct {
ID uint8
response *Response
options *ConsumerOptions
onClose onInternalClose
mutex *sync.Mutex
MessagesHandler MessagesHandler
ID uint8
response *Response
options *ConsumerOptions
onClose onInternalClose
mutex *sync.Mutex
chunkForConsumer chan chunkInfo
MessagesHandler MessagesHandler
// different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration
// and won't change. currentOffset is the status of the offset
currentOffset int64
Expand Down Expand Up @@ -312,60 +313,73 @@ func (consumer *Consumer) Close() error {
if consumer.getStatus() == closed {
return AlreadyClosed
}
consumer.cacheStoreOffset()
return consumer.close(Event{
Command: CommandUnsubscribe,
StreamName: consumer.GetStreamName(),
Name: consumer.GetName(),
Reason: UnSubscribe,
Err: nil,
})
}

consumer.setStatus(closed)
_, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID)
if errGet != nil {
func (consumer *Consumer) close(reason Event) error {

if consumer.options == nil {
// the config is usually set. this check is just to avoid panic and to make some test
// easier to write
logs.LogDebug("consumer options is nil, the close will be ignored")
return nil
}

length := 2 + 2 + 4 + 1
resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe)
correlationId := resp.correlationid
var b = bytes.NewBuffer(make([]byte, 0, length+4))
writeProtocolHeader(b, length, CommandUnsubscribe,
correlationId)
consumer.cacheStoreOffset()
consumer.setStatus(closed)

writeByte(b, consumer.ID)
err := consumer.options.client.handleWrite(b.Bytes(), resp)
if err.Err != nil && err.isTimeout {
return err.Err
if closeHandler := consumer.GetCloseHandler(); closeHandler != nil {
closeHandler <- reason
close(consumer.closeHandler)
consumer.closeHandler = nil
}

errC := consumer.options.client.coordinator.RemoveConsumerById(consumer.ID, Event{
Command: CommandUnsubscribe,
StreamName: consumer.GetStreamName(),
Name: consumer.GetName(),
Reason: "unSubscribe",
Err: nil,
})

if errC != nil {
logs.LogWarn("Error during remove consumer id:%s", errC)
close(consumer.chunkForConsumer)

if consumer.response.data != nil {
close(consumer.response.data)
consumer.response.data = nil
}

if consumer.options.client.coordinator.ConsumersCount() == 0 {
err := consumer.options.client.Close()
if err != nil {
return err
if reason.Reason == UnSubscribe {
length := 2 + 2 + 4 + 1
resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe)
correlationId := resp.correlationid
var b = bytes.NewBuffer(make([]byte, 0, length+4))
writeProtocolHeader(b, length, CommandUnsubscribe,
correlationId)

writeByte(b, consumer.ID)
err := consumer.options.client.handleWrite(b.Bytes(), resp)
if err.Err != nil && err.isTimeout {
logs.LogWarn("error during consumer unsubscribe:%s", err.Err)
}
}
_, _ = consumer.options.client.coordinator.ExtractConsumerById(consumer.ID)

if consumer.options != nil && consumer.options.client.coordinator.ConsumersCount() == 0 {
_ = consumer.options.client.Close()
}

ch := make(chan uint8, 1)
ch <- consumer.ID
consumer.onClose(ch)
close(ch)
return err.Err
return nil
}

func (consumer *Consumer) cacheStoreOffset() {
if consumer.options.autocommit {
consumer.mutex.Lock()
consumer.lastAutoCommitStored = time.Now()
consumer.messageCountBeforeStorage = 0
consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutex, so not using defer for unlock
consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutexMessageMap, so not using defer for unlock

err := consumer.internalStoreOffset()
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var _ = Describe("Streaming Consumers", func() {
Eventually(func() int32 {
return atomic.LoadInt32(&commandIdRecv)
}, 5*time.Second).Should(Equal(int32(CommandUnsubscribe)),
"command received should be CommandMetadataUpdate ")
"command received should be unSubscribe ")

Expect(err).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -641,6 +641,13 @@ var _ = Describe("Streaming Consumers", func() {
NewAutoCommitStrategy().SetFlushInterval(10*time.Millisecond)))
Expect(err).To(HaveOccurred())

// message handler must be set
_, err = env.NewConsumer(streamName,
nil, &ConsumerOptions{
Offset: OffsetSpecification{},
})
Expect(err).To(HaveOccurred())

})

It("Sub Batch consumer with different publishers GZIP and Not", func() {
Expand Down
Loading
Loading