Skip to content

Commit 40d8931

Browse files
Reuse the locator connection (#388)
- Reuse the locator connection. In this PR, the locator connection is always open ( like most other clients ) and can be reused. It is useful when API like `storeOffset` or `queryOffest` are often called. The `maybeReconnectLocator` function reconnects the locator if/when needed ( in case of network problem|) - Remove one heartbeat Ticket. - Fix an edge case during reconnection when the [metadata fails](https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/388/files#diff-329120e7df50ce65620d4fdf4ce9644909588f2369570cd94ff289b77f3aca2fR62-R66) - closes #387 - Fix a race condition for producer and consumer during the internal close. The internal close now has a [cleanUp function](https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/388/files#diff-da66976f2cef8e369ba583aa16d59f1ea62f43c81eb4586e9d18badc4ed3a867L370-R373) --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Alberto Moretti <[email protected]>
1 parent ff3daec commit 40d8931

15 files changed

+216
-210
lines changed

pkg/ha/reliable_common.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ func retry(backoff int, reliable IReliable) (error, bool) {
5959
return retry(backoff+1, reliable)
6060
}
6161

62+
if errors.Is(errS, stream.StreamMetadataFailure) {
63+
logs.LogInfo("[Reliable] - Fail to retrieve the %s metadata for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo())
64+
return retry(backoff+1, reliable)
65+
}
66+
6267
var result error
6368
if streamMetaData != nil {
6469
logs.LogInfo("[Reliable] - The stream %s exists. Reconnecting the %s.", reliable.getStreamName(), reliable.getInfo())

pkg/stream/client.go

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -418,56 +418,40 @@ func (c *Client) DeleteStream(streamName string) error {
418418
}
419419

420420
func (c *Client) heartBeat() {
421-
ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second)
422-
tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second)
421+
tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second)
423422

424423
var heartBeatMissed int32
425-
doneSendingTimeoutTicker := make(chan struct{}, 1)
426-
wg := sync.WaitGroup{}
427-
wg.Add(2)
428-
go func() {
429-
wg.Done()
430-
select {
431-
case <-c.doneTimeoutTicker:
432-
doneSendingTimeoutTicker <- struct{}{}
433-
ticker.Stop()
434-
tickerHeartbeat.Stop()
435-
return
436-
case <-tickerHeartbeat.C:
437-
if c.socket.isOpen() {
438-
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
439-
v := atomic.AddInt32(&heartBeatMissed, 1)
440-
logs.LogWarn("Missing heart beat: %d", v)
441-
if v >= 2 {
442-
logs.LogWarn("Too many heartbeat missing: %d", v)
443-
c.Close()
444-
}
445-
} else {
446-
atomic.StoreInt32(&heartBeatMissed, 0)
447-
}
448-
}
449-
}
450-
451-
}()
452424

453425
go func() {
454-
wg.Done()
455426
for {
456427
select {
457-
case <-doneSendingTimeoutTicker:
458-
logs.LogDebug("Stopping sending heartbeat")
428+
case <-c.doneTimeoutTicker:
429+
tickerHeartbeat.Stop()
459430
return
460-
case _, ok := <-ticker.C:
461-
if !ok {
431+
case <-tickerHeartbeat.C:
432+
if c.socket.isOpen() {
433+
logs.LogDebug("Heartbeat ticker is open, sending heartbeat")
434+
c.sendHeartbeat()
435+
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
436+
v := atomic.AddInt32(&heartBeatMissed, 1)
437+
logs.LogWarn("Missing heart beat: %d", v)
438+
if v >= 2 {
439+
logs.LogWarn("Too many heartbeat missing: %d", v)
440+
c.Close()
441+
}
442+
} else {
443+
atomic.StoreInt32(&heartBeatMissed, 0)
444+
}
445+
} else {
446+
logs.LogDebug("Socket Heartbeat ticker is closed. Closing ticker")
447+
tickerHeartbeat.Stop()
462448
return
463449
}
464-
logs.LogDebug("Sending heart beat: %s", time.Now())
465-
c.sendHeartbeat()
450+
466451
}
467452
}
468-
}()
469453

470-
wg.Wait()
454+
}()
471455

472456
}
473457

@@ -487,6 +471,8 @@ func (c *Client) closeHartBeat() {
487471
}
488472

489473
func (c *Client) Close() error {
474+
475+
c.closeHartBeat()
490476
for _, p := range c.coordinator.Producers() {
491477
err := c.coordinator.RemoveProducerById(p.(*Producer).id, Event{
492478
Command: CommandClose,
@@ -513,8 +499,6 @@ func (c *Client) Close() error {
513499
logs.LogWarn("error removing consumer: %s", err)
514500
}
515501
}
516-
517-
c.closeHartBeat()
518502
if c.getSocket().isOpen() {
519503

520504
res := c.coordinator.NewResponse(CommandClose)
@@ -535,6 +519,10 @@ func (c *Client) Close() error {
535519
}
536520

537521
func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error) {
522+
return c.declarePublisher(streamName, options, nil)
523+
}
524+
525+
func (c *Client) declarePublisher(streamName string, options *ProducerOptions, cleanUp func()) (*Producer, error) {
538526
if options == nil {
539527
options = NewProducerOptions()
540528
}
@@ -590,7 +578,7 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
590578
ConfirmationTimeOut: options.ConfirmationTimeOut,
591579
ClientProvidedName: options.ClientProvidedName,
592580
Filter: options.Filter,
593-
})
581+
}, cleanUp)
594582

595583
if err != nil {
596584
return nil, err
@@ -836,10 +824,15 @@ func (c *Client) StoreOffset(consumerName string, streamName string, offset int6
836824
writeLong(b, offset)
837825
return c.socket.writeAndFlush(b.Bytes())
838826
}
839-
840827
func (c *Client) DeclareSubscriber(streamName string,
841828
messagesHandler MessagesHandler,
842829
options *ConsumerOptions) (*Consumer, error) {
830+
return c.declareSubscriber(streamName, messagesHandler, options, nil)
831+
}
832+
833+
func (c *Client) declareSubscriber(streamName string,
834+
messagesHandler MessagesHandler,
835+
options *ConsumerOptions, cleanUp func()) (*Consumer, error) {
843836
if options == nil {
844837
options = NewConsumerOptions()
845838
}
@@ -919,7 +912,7 @@ func (c *Client) DeclareSubscriber(streamName string,
919912

920913
options.client = c
921914
options.streamName = streamName
922-
consumer := c.coordinator.NewConsumer(messagesHandler, options)
915+
consumer := c.coordinator.NewConsumer(messagesHandler, options, cleanUp)
923916

924917
length := 2 + 2 + 4 + 1 + 2 + len(streamName) + 2 + 2
925918
if options.Offset.isOffset() ||
@@ -1004,7 +997,6 @@ func (c *Client) DeclareSubscriber(streamName string,
1004997
}
1005998
return true
1006999
}
1007-
10081000
go func() {
10091001
for {
10101002
select {

pkg/stream/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ var AlreadyClosed = errors.New("Already Closed")
132132
var PreconditionFailed = errors.New("Precondition Failed")
133133
var AuthenticationFailure = errors.New("Authentication Failure")
134134
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
135+
var StreamMetadataFailure = errors.New("Stream Metadata Failure")
135136
var StreamAlreadyExists = errors.New("Stream Already Exists")
136137
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")
137138
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")

pkg/stream/consumer.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type Consumer struct {
1313
ID uint8
1414
response *Response
1515
options *ConsumerOptions
16-
onClose onInternalClose
16+
onClose func()
1717
mutex *sync.Mutex
1818
chunkForConsumer chan chunkInfo
1919
MessagesHandler MessagesHandler
@@ -310,6 +310,7 @@ func (c *Client) credit(subscriptionId byte, credit int16) {
310310
}
311311

312312
func (consumer *Consumer) Close() error {
313+
313314
if consumer.getStatus() == closed {
314315
return AlreadyClosed
315316
}
@@ -367,10 +368,10 @@ func (consumer *Consumer) close(reason Event) error {
367368
_ = consumer.options.client.Close()
368369
}
369370

370-
ch := make(chan uint8, 1)
371-
ch <- consumer.ID
372-
consumer.onClose(ch)
373-
close(ch)
371+
if consumer.onClose != nil {
372+
consumer.onClose()
373+
}
374+
374375
return nil
375376
}
376377

pkg/stream/consumer_sac_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,19 @@ var _ = Describe("Streaming Single Active Consumer", func() {
6060
Expect(err2).To(HaveOccurred())
6161

6262
/// check support for single active consumer is not enabled
63-
client, err3 := testEnvironment.newReconnectClient()
63+
err3 := testEnvironment.maybeReconnectLocator()
6464
Expect(err3).NotTo(HaveOccurred())
6565

6666
// here we inject a fake response to simulate a server that
6767
// does not support SAC .
6868
// so we can validate it
6969
// This method is not thread safe and should be used only for testing purposes
70-
client.availableFeatures.brokerSingleActiveConsumerEnabled = false
70+
testEnvironment.locator.client.availableFeatures.brokerSingleActiveConsumerEnabled = false
7171
handleMessages := func(consumerContext ConsumerContext, message *amqp.Message) {
7272

7373
}
7474

75-
_, err = client.DeclareSubscriber(streamName, handleMessages, NewConsumerOptions().SetSingleActiveConsumer(
75+
_, err = testEnvironment.locator.client.DeclareSubscriber(streamName, handleMessages, NewConsumerOptions().SetSingleActiveConsumer(
7676
NewSingleActiveConsumer(func(_ string, isActive bool) OffsetSpecification {
7777
return OffsetSpecification{}.Last()
7878
}),

pkg/stream/consumer_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ var _ = Describe("Streaming Consumers", func() {
5151
for _, consumer := range consumers {
5252
Expect(consumer.Close()).NotTo(HaveOccurred())
5353
}
54-
55-
Expect(len(env.consumers.getCoordinators()["localhost:5552"].
56-
getClientsPerContext())).To(Equal(0))
54+
//time.Sleep(1 * time.Second)
55+
Eventually(len(env.consumers.getCoordinators()["localhost:5552"].
56+
getClientsPerContext())).ProbeEvery(100 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(0))
5757

5858
})
5959

@@ -454,7 +454,7 @@ var _ = Describe("Streaming Consumers", func() {
454454
It("Check already closed", func() {
455455
producer, err := env.NewProducer(streamName, nil)
456456
Expect(err).NotTo(HaveOccurred())
457-
err = producer.BatchSend(CreateArrayMessagesForTesting(500))
457+
err = producer.BatchSend(CreateArrayMessagesForTesting(1))
458458
Expect(err).NotTo(HaveOccurred())
459459

460460
defer func(producer *Producer) {
@@ -463,17 +463,20 @@ var _ = Describe("Streaming Consumers", func() {
463463
}(producer)
464464

465465
var messagesCount int32 = 0
466+
var signal = make(chan struct{})
466467
consumer, err := env.NewConsumer(streamName,
467468
func(consumerContext ConsumerContext, message *amqp.Message) {
468469
if atomic.AddInt32(&messagesCount, 1) >= 1 {
469-
err := consumerContext.Consumer.Close()
470-
if err != nil {
471-
return
472-
}
470+
time.Sleep(500 * time.Millisecond)
471+
err1 := consumerContext.Consumer.Close()
472+
Expect(err1).NotTo(HaveOccurred())
473+
signal <- struct{}{}
473474
}
474475
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).
475476
SetConsumerName("consumer_test"))
476477
Expect(err).NotTo(HaveOccurred())
478+
479+
<-signal
477480
time.Sleep(200 * time.Millisecond)
478481
Expect(consumer.Close()).To(Equal(AlreadyClosed))
479482

pkg/stream/coordinator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func NewCoordinator() *Coordinator {
5050

5151
// producersEnvironment
5252
func (coordinator *Coordinator) NewProducer(
53-
parameters *ProducerOptions) (*Producer, error) {
53+
parameters *ProducerOptions, cleanUp func()) (*Producer, error) {
5454
coordinator.mutex.Lock()
5555
defer coordinator.mutex.Unlock()
5656
dynSize := 10000
@@ -75,6 +75,7 @@ func (coordinator *Coordinator) NewProducer(
7575
status: open,
7676
pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize),
7777
confirmMutex: &sync.Mutex{},
78+
onClose: cleanUp,
7879
}
7980
coordinator.producers[lastId] = producer
8081
return producer, err
@@ -174,7 +175,7 @@ func (coordinator *Coordinator) RemoveResponseByName(id string) error {
174175

175176
// Consumer functions
176177
func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
177-
parameters *ConsumerOptions) *Consumer {
178+
parameters *ConsumerOptions, cleanUp func()) *Consumer {
178179
coordinator.mutex.Lock()
179180
defer coordinator.mutex.Unlock()
180181
var lastId, _ = coordinator.getNextConsumerItem()
@@ -188,6 +189,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
188189
isPromotedAsActive: true,
189190
lastAutoCommitStored: time.Now(),
190191
chunkForConsumer: make(chan chunkInfo, 100),
192+
onClose: cleanUp,
191193
}
192194

193195
coordinator.consumers[lastId] = item

pkg/stream/coordinator_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var _ = Describe("Coordinator", func() {
2323
})
2424

2525
It("Add/Remove Producers ", func() {
26-
p, err := client.coordinator.NewProducer(nil)
26+
p, err := client.coordinator.NewProducer(nil, nil)
2727
Expect(err).NotTo(HaveOccurred())
2828
Expect(p.id).To(Equal(uint8(0)))
2929
err = client.coordinator.RemoveProducerById(p.id, Event{
@@ -46,7 +46,7 @@ var _ = Describe("Coordinator", func() {
4646
It("massive insert/delete coordinator ", func() {
4747
var producersId []uint8
4848
for i := 0; i < 100; i++ {
49-
p, err := client.coordinator.NewProducer(nil)
49+
p, err := client.coordinator.NewProducer(nil, nil)
5050
producersId = append(producersId, p.id)
5151
Expect(err).NotTo(HaveOccurred())
5252
}
@@ -66,7 +66,7 @@ var _ = Describe("Coordinator", func() {
6666
// until reach 255 then start reusing the old
6767
// unused ids
6868
for i := 0; i < 250; i++ {
69-
p, err := client.coordinator.NewProducer(nil)
69+
p, err := client.coordinator.NewProducer(nil, nil)
7070
Expect(err).NotTo(HaveOccurred())
7171
Expect(p.id).To(Equal(uint8(i)))
7272
err = client.coordinator.RemoveProducerById(p.id, Event{
@@ -84,7 +84,7 @@ var _ = Describe("Coordinator", func() {
8484
var producersId []uint8
8585
for i := 0; i < 500; i++ {
8686

87-
p, err := client.coordinator.NewProducer(nil)
87+
p, err := client.coordinator.NewProducer(nil, nil)
8888
if i >= int(^uint8(0)) {
8989
Expect(fmt.Sprintf("%s", err)).
9090
To(ContainSubstring("No more items available"))
@@ -109,7 +109,7 @@ var _ = Describe("Coordinator", func() {
109109
// raise an logError not found
110110
Expect(err).To(HaveOccurred())
111111

112-
p, err := client.coordinator.NewProducer(nil)
112+
p, err := client.coordinator.NewProducer(nil, nil)
113113
Expect(err).NotTo(HaveOccurred())
114114
Expect(p.id).To(Equal(v))
115115
}
@@ -140,7 +140,7 @@ var _ = Describe("Coordinator", func() {
140140
})
141141

142142
It("Add/Remove consumers ", func() {
143-
p, err := client.coordinator.NewProducer(nil)
143+
p, err := client.coordinator.NewProducer(nil, nil)
144144
Expect(err).NotTo(HaveOccurred())
145145
Expect(p.id).To(Equal(uint8(0)))
146146
err = client.coordinator.RemoveProducerById(p.id, Event{
@@ -163,7 +163,7 @@ var _ = Describe("Coordinator", func() {
163163
It("massive insert/delete consumers ", func() {
164164
var consumersId []uint8
165165
for i := 0; i < 100; i++ {
166-
p := client.coordinator.NewConsumer(nil, nil)
166+
p := client.coordinator.NewConsumer(nil, nil, nil)
167167
consumersId = append(consumersId, p.ID)
168168
}
169169
Expect(client.coordinator.ConsumersCount()).To(Equal(100))

0 commit comments

Comments
 (0)