Skip to content

Commit 31dbf96

Browse files
authored
Consumer improvements for 1.5 (#371)
* Refactor the consumer to deal with some edge cases whit the metadata update * Aggregate the confirmation responses to be faster during the unconfirmed extraction * refactor the message sequence struct to remove the reference to the original message ( it could increase the memory usage in some cases) --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 4082710 commit 31dbf96

19 files changed

+217
-274
lines changed

.github/workflows/build_and_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ jobs:
5454
check-latest: true
5555
- run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }}
5656
- uses: actions/checkout@main
57-
- uses: codecov/codecov-action@v4
57+
- uses: codecov/codecov-action@v5
5858
with:
5959
fail_ci_if_error: false # optional (default = false)
6060
files: ./coverage.txt

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
4848
### Overview
4949

5050
Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream)
51+
The client contains all features to interact with the RabbitMQ Stream Queues. </br>
52+
53+
The main structure is the `Environment` that contains the `Producer` and `Consumer` interfaces. </br>
54+
55+
`Producer` and `Consumer` are the main interfaces to interact with the RabbitMQ Stream Queues. </br>
56+
They don't support the auto-reconnect in case of disconnection but have the events to detect it.</br>
57+
58+
The client provides the `ReliableProducer` and `ReliableConsumer` that support the auto-reconnect in case of disconnection.</br>
59+
See also the [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer) section.
5160

5261
### Installing
5362

pkg/amqp/types.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,6 @@ type Message struct {
409409
//receiver *Receiver // Receiver the message was received from
410410
settled bool // whether transfer was settled by sender
411411

412-
// doneSignal is a channel that indicate when a message is considered acted upon by downstream handler
413-
doneSignal chan struct{}
414412
}
415413

416414
// AMQP10 is an AMQP 1.0 message with the necessary fields to work with the
@@ -501,16 +499,7 @@ func (amqp *AMQP10) GetAMQPValue() interface{} {
501499
// more complex usages.
502500
func newMessage(data []byte) *Message {
503501
return &Message{
504-
Data: [][]byte{data},
505-
doneSignal: make(chan struct{}),
506-
}
507-
}
508-
509-
// done closes the internal doneSignal channel to let the receiver know that this message has been acted upon
510-
func (m *Message) done() {
511-
// TODO: move initialization in ctor and use ctor everywhere?
512-
if m.doneSignal != nil {
513-
close(m.doneSignal)
502+
Data: [][]byte{data},
514503
}
515504
}
516505

@@ -523,15 +512,6 @@ func (m *Message) GetData() []byte {
523512
return m.Data[0]
524513
}
525514

526-
// Ignore notifies the amqp message pump that the message has been handled
527-
// without any disposition. It frees the amqp receiver to get the next message
528-
// this is implicitly done after calling message dispositions (Accept/Release/Reject/Modify)
529-
func (m *Message) Ignore() {
530-
if m.shouldSendDisposition() {
531-
m.done()
532-
}
533-
}
534-
535515
// MarshalBinary encodes the message into binary form.
536516
func (m *Message) MarshalBinary() ([]byte, error) {
537517
buf := new(buffer)

pkg/ha/ha_consumer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func NewReliableConsumer(env *stream.Environment, streamName string,
8888
logs.LogDebug("[Reliable] - creating %s", res.getInfo())
8989
err := res.newConsumer()
9090
if err == nil {
91-
9291
res.setStatus(StatusOpen)
9392
}
9493
logs.LogDebug("[Reliable] - created %s", res.getInfo())

pkg/stream/aggregation_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ var _ = Describe("Compression algorithms", func() {
1818
}
1919

2020
message := &messageSequence{
21-
messageBytes: messagePayload,
22-
unCompressedSize: len(messagePayload),
23-
publishingId: 0,
21+
messageBytes: messagePayload,
22+
publishingId: 0,
2423
}
2524

2625
entries = &subEntries{

pkg/stream/blocking_queue.go

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import (
1010
var ErrBlockingQueueStopped = errors.New("blocking queue stopped")
1111

1212
type BlockingQueue[T any] struct {
13-
queue chan T
14-
status int32
13+
queue chan T
14+
status int32
15+
lastUpdate int64
1516
}
1617

1718
// NewBlockingQueue initializes a new BlockingQueue with the given capacity
@@ -27,28 +28,13 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error {
2728
if bq.IsStopped() {
2829
return ErrBlockingQueueStopped
2930
}
31+
atomic.StoreInt64(&bq.lastUpdate, time.Now().UnixNano())
3032
bq.queue <- item
31-
3233
return nil
3334
}
3435

35-
// Dequeue removes an item from the queue with a timeout
36-
func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T {
37-
if bq.IsStopped() {
38-
var zeroValue T // Zero value of type T
39-
return zeroValue
40-
}
41-
select {
42-
case item, ok := <-bq.queue:
43-
if !ok {
44-
var zeroValue T // Zero value of type T
45-
return zeroValue
46-
}
47-
return item
48-
case <-time.After(timeout):
49-
var zeroValue T // Zero value of type T
50-
return zeroValue
51-
}
36+
func (bq *BlockingQueue[T]) GetChannel() chan T {
37+
return bq.queue
5238
}
5339

5440
func (bq *BlockingQueue[T]) Size() int {

pkg/stream/client.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ type Client struct {
6464
saslConfiguration *SaslConfiguration
6565

6666
mutex *sync.Mutex
67-
metadataListener metadataListener
6867
lastHeartBeat HeartBeat
6968
socketCallTimeout time.Duration
7069
availableFeatures *availableFeatures
@@ -112,14 +111,10 @@ func newClient(connectionName string, broker *Broker,
112111
}
113112

114113
func (c *Client) getSocket() *socket {
115-
//c.mutex.Lock()
116-
//defer c.mutex.Unlock()
117114
return &c.socket
118115
}
119116

120117
func (c *Client) setSocketConnection(connection net.Conn) {
121-
//c.mutex.Lock()
122-
//defer c.mutex.Unlock()
123118
c.socket.connection = connection
124119
c.socket.writer = bufio.NewWriter(connection)
125120
}
@@ -432,7 +427,7 @@ func (c *Client) heartBeat() {
432427
tickerHeartbeat.Stop()
433428
return
434429
case <-tickerHeartbeat.C:
435-
for c.socket.isOpen() {
430+
if c.socket.isOpen() {
436431
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
437432
v := atomic.AddInt32(&heartBeatMissed, 1)
438433
logs.LogWarn("Missing heart beat: %d", v)
@@ -512,10 +507,6 @@ func (c *Client) Close() error {
512507
}
513508
}
514509

515-
if c.metadataListener != nil {
516-
close(c.metadataListener)
517-
c.metadataListener = nil
518-
}
519510
c.closeHartBeat()
520511
if c.getSocket().isOpen() {
521512

@@ -747,6 +738,7 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
747738

748739
streamMetadata := streamsMetadata.Get(stream)
749740
if streamMetadata.responseCode != responseCodeOk {
741+
750742
return nil, lookErrorCode(streamMetadata.responseCode)
751743
}
752744

@@ -883,6 +875,10 @@ func (c *Client) DeclareSubscriber(streamName string,
883875
return nil, fmt.Errorf("message count before storage must be bigger than one")
884876
}
885877

878+
if messagesHandler == nil {
879+
return nil, fmt.Errorf("messages Handler must be set")
880+
}
881+
886882
if options.Offset.isLastConsumed() {
887883
lastOffset, err := c.queryOffset(options.ConsumerName, streamName)
888884
switch {
@@ -992,12 +988,8 @@ func (c *Client) DeclareSubscriber(streamName string,
992988
go func() {
993989
for {
994990
select {
995-
case code := <-consumer.response.code:
996-
if code.id == closeChannel {
997-
return
998-
}
999991

1000-
case chunk, ok := <-consumer.response.chunkForConsumer:
992+
case chunk, ok := <-consumer.chunkForConsumer:
1001993
if !ok {
1002994
return
1003995
}

pkg/stream/constants.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ const (
8888
///
8989
defaultSocketCallTimeout = 10 * time.Second
9090

91-
defaultHeartbeat = 60 * time.Second
91+
defaultHeartbeat = 60 * time.Second
92+
defaultMaxFrameSize = 1048574
9293

9394
//
9495
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"
@@ -119,6 +120,7 @@ const (
119120
LeaderLocatorBalanced = "balanced"
120121
LeaderLocatorClientLocal = "client-local"
121122
DeletePublisher = "deletePublisher"
123+
UnSubscribe = "unSubscribe"
122124

123125
StreamTcpPort = "5552"
124126

pkg/stream/consumer.go

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ import (
1010
)
1111

1212
type Consumer struct {
13-
ID uint8
14-
response *Response
15-
options *ConsumerOptions
16-
onClose onInternalClose
17-
mutex *sync.Mutex
18-
MessagesHandler MessagesHandler
13+
ID uint8
14+
response *Response
15+
options *ConsumerOptions
16+
onClose onInternalClose
17+
mutex *sync.Mutex
18+
chunkForConsumer chan chunkInfo
19+
MessagesHandler MessagesHandler
1920
// different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration
2021
// and won't change. currentOffset is the status of the offset
2122
currentOffset int64
@@ -312,60 +313,73 @@ func (consumer *Consumer) Close() error {
312313
if consumer.getStatus() == closed {
313314
return AlreadyClosed
314315
}
315-
consumer.cacheStoreOffset()
316+
return consumer.close(Event{
317+
Command: CommandUnsubscribe,
318+
StreamName: consumer.GetStreamName(),
319+
Name: consumer.GetName(),
320+
Reason: UnSubscribe,
321+
Err: nil,
322+
})
323+
}
316324

317-
consumer.setStatus(closed)
318-
_, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID)
319-
if errGet != nil {
325+
func (consumer *Consumer) close(reason Event) error {
326+
327+
if consumer.options == nil {
328+
// the config is usually set. this check is just to avoid panic and to make some test
329+
// easier to write
330+
logs.LogDebug("consumer options is nil, the close will be ignored")
320331
return nil
321332
}
322333

323-
length := 2 + 2 + 4 + 1
324-
resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe)
325-
correlationId := resp.correlationid
326-
var b = bytes.NewBuffer(make([]byte, 0, length+4))
327-
writeProtocolHeader(b, length, CommandUnsubscribe,
328-
correlationId)
334+
consumer.cacheStoreOffset()
335+
consumer.setStatus(closed)
329336

330-
writeByte(b, consumer.ID)
331-
err := consumer.options.client.handleWrite(b.Bytes(), resp)
332-
if err.Err != nil && err.isTimeout {
333-
return err.Err
337+
if closeHandler := consumer.GetCloseHandler(); closeHandler != nil {
338+
closeHandler <- reason
339+
close(consumer.closeHandler)
340+
consumer.closeHandler = nil
334341
}
335342

336-
errC := consumer.options.client.coordinator.RemoveConsumerById(consumer.ID, Event{
337-
Command: CommandUnsubscribe,
338-
StreamName: consumer.GetStreamName(),
339-
Name: consumer.GetName(),
340-
Reason: "unSubscribe",
341-
Err: nil,
342-
})
343-
344-
if errC != nil {
345-
logs.LogWarn("Error during remove consumer id:%s", errC)
343+
close(consumer.chunkForConsumer)
346344

345+
if consumer.response.data != nil {
346+
close(consumer.response.data)
347+
consumer.response.data = nil
347348
}
348349

349-
if consumer.options.client.coordinator.ConsumersCount() == 0 {
350-
err := consumer.options.client.Close()
351-
if err != nil {
352-
return err
350+
if reason.Reason == UnSubscribe {
351+
length := 2 + 2 + 4 + 1
352+
resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe)
353+
correlationId := resp.correlationid
354+
var b = bytes.NewBuffer(make([]byte, 0, length+4))
355+
writeProtocolHeader(b, length, CommandUnsubscribe,
356+
correlationId)
357+
358+
writeByte(b, consumer.ID)
359+
err := consumer.options.client.handleWrite(b.Bytes(), resp)
360+
if err.Err != nil && err.isTimeout {
361+
logs.LogWarn("error during consumer unsubscribe:%s", err.Err)
353362
}
354363
}
364+
_, _ = consumer.options.client.coordinator.ExtractConsumerById(consumer.ID)
365+
366+
if consumer.options != nil && consumer.options.client.coordinator.ConsumersCount() == 0 {
367+
_ = consumer.options.client.Close()
368+
}
355369

356370
ch := make(chan uint8, 1)
357371
ch <- consumer.ID
358372
consumer.onClose(ch)
359373
close(ch)
360-
return err.Err
374+
return nil
361375
}
362376

363377
func (consumer *Consumer) cacheStoreOffset() {
364378
if consumer.options.autocommit {
365379
consumer.mutex.Lock()
366380
consumer.lastAutoCommitStored = time.Now()
367381
consumer.messageCountBeforeStorage = 0
368-
consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutex, so not using defer for unlock
382+
consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutexMessageMap, so not using defer for unlock
369383

370384
err := consumer.internalStoreOffset()
371385
if err != nil {

pkg/stream/consumer_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ var _ = Describe("Streaming Consumers", func() {
123123
Eventually(func() int32 {
124124
return atomic.LoadInt32(&commandIdRecv)
125125
}, 5*time.Second).Should(Equal(int32(CommandUnsubscribe)),
126-
"command received should be CommandMetadataUpdate ")
126+
"command received should be unSubscribe ")
127127

128128
Expect(err).NotTo(HaveOccurred())
129129
})
@@ -641,6 +641,13 @@ var _ = Describe("Streaming Consumers", func() {
641641
NewAutoCommitStrategy().SetFlushInterval(10*time.Millisecond)))
642642
Expect(err).To(HaveOccurred())
643643

644+
// message handler must be set
645+
_, err = env.NewConsumer(streamName,
646+
nil, &ConsumerOptions{
647+
Offset: OffsetSpecification{},
648+
})
649+
Expect(err).To(HaveOccurred())
650+
644651
})
645652

646653
It("Sub Batch consumer with different publishers GZIP and Not", func() {

0 commit comments

Comments
 (0)