Skip to content

Reuse the locator connection #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/ha/reliable_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func retry(backoff int, reliable IReliable) (error, bool) {
return retry(backoff+1, reliable)
}

if errors.Is(errS, stream.StreamMetadataFailure) {
logs.LogInfo("[Reliable] - Fail to retrieve the %s metadata for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo())
return retry(backoff+1, reliable)
}

var result error
if streamMetaData != nil {
logs.LogInfo("[Reliable] - The stream %s exists. Reconnecting the %s.", reliable.getStreamName(), reliable.getInfo())
Expand Down
80 changes: 36 additions & 44 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,56 +418,40 @@ func (c *Client) DeleteStream(streamName string) error {
}

func (c *Client) heartBeat() {
ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second)
tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second)
tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second)

var heartBeatMissed int32
doneSendingTimeoutTicker := make(chan struct{}, 1)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
wg.Done()
select {
case <-c.doneTimeoutTicker:
doneSendingTimeoutTicker <- struct{}{}
ticker.Stop()
tickerHeartbeat.Stop()
return
case <-tickerHeartbeat.C:
if c.socket.isOpen() {
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
v := atomic.AddInt32(&heartBeatMissed, 1)
logs.LogWarn("Missing heart beat: %d", v)
if v >= 2 {
logs.LogWarn("Too many heartbeat missing: %d", v)
c.Close()
}
} else {
atomic.StoreInt32(&heartBeatMissed, 0)
}
}
}

}()

go func() {
wg.Done()
for {
select {
case <-doneSendingTimeoutTicker:
logs.LogDebug("Stopping sending heartbeat")
case <-c.doneTimeoutTicker:
tickerHeartbeat.Stop()
return
case _, ok := <-ticker.C:
if !ok {
case <-tickerHeartbeat.C:
if c.socket.isOpen() {
logs.LogDebug("Heartbeat ticker is open, sending heartbeat")
c.sendHeartbeat()
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
v := atomic.AddInt32(&heartBeatMissed, 1)
logs.LogWarn("Missing heart beat: %d", v)
if v >= 2 {
logs.LogWarn("Too many heartbeat missing: %d", v)
c.Close()
}
} else {
atomic.StoreInt32(&heartBeatMissed, 0)
}
} else {
logs.LogDebug("Socket Heartbeat ticker is closed. Closing ticker")
tickerHeartbeat.Stop()
return
}
logs.LogDebug("Sending heart beat: %s", time.Now())
c.sendHeartbeat()

}
}
}()

wg.Wait()
}()

}

Expand All @@ -487,6 +471,8 @@ func (c *Client) closeHartBeat() {
}

func (c *Client) Close() error {

c.closeHartBeat()
for _, p := range c.coordinator.Producers() {
err := c.coordinator.RemoveProducerById(p.(*Producer).id, Event{
Command: CommandClose,
Expand All @@ -513,8 +499,6 @@ func (c *Client) Close() error {
logs.LogWarn("error removing consumer: %s", err)
}
}

c.closeHartBeat()
if c.getSocket().isOpen() {

res := c.coordinator.NewResponse(CommandClose)
Expand All @@ -535,6 +519,10 @@ func (c *Client) Close() error {
}

func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error) {
return c.declarePublisher(streamName, options, nil)
}

func (c *Client) declarePublisher(streamName string, options *ProducerOptions, cleanUp func()) (*Producer, error) {
if options == nil {
options = NewProducerOptions()
}
Expand Down Expand Up @@ -590,7 +578,7 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
ConfirmationTimeOut: options.ConfirmationTimeOut,
ClientProvidedName: options.ClientProvidedName,
Filter: options.Filter,
})
}, cleanUp)

if err != nil {
return nil, err
Expand Down Expand Up @@ -836,10 +824,15 @@ func (c *Client) StoreOffset(consumerName string, streamName string, offset int6
writeLong(b, offset)
return c.socket.writeAndFlush(b.Bytes())
}

func (c *Client) DeclareSubscriber(streamName string,
messagesHandler MessagesHandler,
options *ConsumerOptions) (*Consumer, error) {
return c.declareSubscriber(streamName, messagesHandler, options, nil)
}

func (c *Client) declareSubscriber(streamName string,
messagesHandler MessagesHandler,
options *ConsumerOptions, cleanUp func()) (*Consumer, error) {
if options == nil {
options = NewConsumerOptions()
}
Expand Down Expand Up @@ -919,7 +912,7 @@ func (c *Client) DeclareSubscriber(streamName string,

options.client = c
options.streamName = streamName
consumer := c.coordinator.NewConsumer(messagesHandler, options)
consumer := c.coordinator.NewConsumer(messagesHandler, options, cleanUp)

length := 2 + 2 + 4 + 1 + 2 + len(streamName) + 2 + 2
if options.Offset.isOffset() ||
Expand Down Expand Up @@ -1004,7 +997,6 @@ func (c *Client) DeclareSubscriber(streamName string,
}
return true
}

go func() {
for {
select {
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ var AlreadyClosed = errors.New("Already Closed")
var PreconditionFailed = errors.New("Precondition Failed")
var AuthenticationFailure = errors.New("Authentication Failure")
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
var StreamMetadataFailure = errors.New("Stream Metadata Failure")
var StreamAlreadyExists = errors.New("Stream Already Exists")
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")
Expand Down
11 changes: 6 additions & 5 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Consumer struct {
ID uint8
response *Response
options *ConsumerOptions
onClose onInternalClose
onClose func()
mutex *sync.Mutex
chunkForConsumer chan chunkInfo
MessagesHandler MessagesHandler
Expand Down Expand Up @@ -310,6 +310,7 @@ func (c *Client) credit(subscriptionId byte, credit int16) {
}

func (consumer *Consumer) Close() error {

if consumer.getStatus() == closed {
return AlreadyClosed
}
Expand Down Expand Up @@ -367,10 +368,10 @@ func (consumer *Consumer) close(reason Event) error {
_ = consumer.options.client.Close()
}

ch := make(chan uint8, 1)
ch <- consumer.ID
consumer.onClose(ch)
close(ch)
if consumer.onClose != nil {
consumer.onClose()
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/stream/consumer_sac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ var _ = Describe("Streaming Single Active Consumer", func() {
Expect(err2).To(HaveOccurred())

/// check support for single active consumer is not enabled
client, err3 := testEnvironment.newReconnectClient()
err3 := testEnvironment.maybeReconnectLocator()
Expect(err3).NotTo(HaveOccurred())

// here we inject a fake response to simulate a server that
// does not support SAC .
// so we can validate it
// This method is not thread safe and should be used only for testing purposes
client.availableFeatures.brokerSingleActiveConsumerEnabled = false
testEnvironment.locator.client.availableFeatures.brokerSingleActiveConsumerEnabled = false
handleMessages := func(consumerContext ConsumerContext, message *amqp.Message) {

}

_, err = client.DeclareSubscriber(streamName, handleMessages, NewConsumerOptions().SetSingleActiveConsumer(
_, err = testEnvironment.locator.client.DeclareSubscriber(streamName, handleMessages, NewConsumerOptions().SetSingleActiveConsumer(
NewSingleActiveConsumer(func(_ string, isActive bool) OffsetSpecification {
return OffsetSpecification{}.Last()
}),
Expand Down
19 changes: 11 additions & 8 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ var _ = Describe("Streaming Consumers", func() {
for _, consumer := range consumers {
Expect(consumer.Close()).NotTo(HaveOccurred())
}

Expect(len(env.consumers.getCoordinators()["localhost:5552"].
getClientsPerContext())).To(Equal(0))
//time.Sleep(1 * time.Second)
Eventually(len(env.consumers.getCoordinators()["localhost:5552"].
getClientsPerContext())).ProbeEvery(100 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(0))

})

Expand Down Expand Up @@ -454,7 +454,7 @@ var _ = Describe("Streaming Consumers", func() {
It("Check already closed", func() {
producer, err := env.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())
err = producer.BatchSend(CreateArrayMessagesForTesting(500))
err = producer.BatchSend(CreateArrayMessagesForTesting(1))
Expect(err).NotTo(HaveOccurred())

defer func(producer *Producer) {
Expand All @@ -463,17 +463,20 @@ var _ = Describe("Streaming Consumers", func() {
}(producer)

var messagesCount int32 = 0
var signal = make(chan struct{})
consumer, err := env.NewConsumer(streamName,
func(consumerContext ConsumerContext, message *amqp.Message) {
if atomic.AddInt32(&messagesCount, 1) >= 1 {
err := consumerContext.Consumer.Close()
if err != nil {
return
}
time.Sleep(500 * time.Millisecond)
err1 := consumerContext.Consumer.Close()
Expect(err1).NotTo(HaveOccurred())
signal <- struct{}{}
}
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).
SetConsumerName("consumer_test"))
Expect(err).NotTo(HaveOccurred())

<-signal
time.Sleep(200 * time.Millisecond)
Expect(consumer.Close()).To(Equal(AlreadyClosed))

Expand Down
6 changes: 4 additions & 2 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewCoordinator() *Coordinator {

// producersEnvironment
func (coordinator *Coordinator) NewProducer(
parameters *ProducerOptions) (*Producer, error) {
parameters *ProducerOptions, cleanUp func()) (*Producer, error) {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
dynSize := 10000
Expand All @@ -75,6 +75,7 @@ func (coordinator *Coordinator) NewProducer(
status: open,
pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize),
confirmMutex: &sync.Mutex{},
onClose: cleanUp,
}
coordinator.producers[lastId] = producer
return producer, err
Expand Down Expand Up @@ -174,7 +175,7 @@ func (coordinator *Coordinator) RemoveResponseByName(id string) error {

// Consumer functions
func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
parameters *ConsumerOptions) *Consumer {
parameters *ConsumerOptions, cleanUp func()) *Consumer {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
var lastId, _ = coordinator.getNextConsumerItem()
Expand All @@ -188,6 +189,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
isPromotedAsActive: true,
lastAutoCommitStored: time.Now(),
chunkForConsumer: make(chan chunkInfo, 100),
onClose: cleanUp,
}

coordinator.consumers[lastId] = item
Expand Down
14 changes: 7 additions & 7 deletions pkg/stream/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ = Describe("Coordinator", func() {
})

It("Add/Remove Producers ", func() {
p, err := client.coordinator.NewProducer(nil)
p, err := client.coordinator.NewProducer(nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(p.id).To(Equal(uint8(0)))
err = client.coordinator.RemoveProducerById(p.id, Event{
Expand All @@ -46,7 +46,7 @@ var _ = Describe("Coordinator", func() {
It("massive insert/delete coordinator ", func() {
var producersId []uint8
for i := 0; i < 100; i++ {
p, err := client.coordinator.NewProducer(nil)
p, err := client.coordinator.NewProducer(nil, nil)
producersId = append(producersId, p.id)
Expect(err).NotTo(HaveOccurred())
}
Expand All @@ -66,7 +66,7 @@ var _ = Describe("Coordinator", func() {
// until reach 255 then start reusing the old
// unused ids
for i := 0; i < 250; i++ {
p, err := client.coordinator.NewProducer(nil)
p, err := client.coordinator.NewProducer(nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(p.id).To(Equal(uint8(i)))
err = client.coordinator.RemoveProducerById(p.id, Event{
Expand All @@ -84,7 +84,7 @@ var _ = Describe("Coordinator", func() {
var producersId []uint8
for i := 0; i < 500; i++ {

p, err := client.coordinator.NewProducer(nil)
p, err := client.coordinator.NewProducer(nil, nil)
if i >= int(^uint8(0)) {
Expect(fmt.Sprintf("%s", err)).
To(ContainSubstring("No more items available"))
Expand All @@ -109,7 +109,7 @@ var _ = Describe("Coordinator", func() {
// raise an logError not found
Expect(err).To(HaveOccurred())

p, err := client.coordinator.NewProducer(nil)
p, err := client.coordinator.NewProducer(nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(p.id).To(Equal(v))
}
Expand Down Expand Up @@ -140,7 +140,7 @@ var _ = Describe("Coordinator", func() {
})

It("Add/Remove consumers ", func() {
p, err := client.coordinator.NewProducer(nil)
p, err := client.coordinator.NewProducer(nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(p.id).To(Equal(uint8(0)))
err = client.coordinator.RemoveProducerById(p.id, Event{
Expand All @@ -163,7 +163,7 @@ var _ = Describe("Coordinator", func() {
It("massive insert/delete consumers ", func() {
var consumersId []uint8
for i := 0; i < 100; i++ {
p := client.coordinator.NewConsumer(nil, nil)
p := client.coordinator.NewConsumer(nil, nil, nil)
consumersId = append(consumersId, p.ID)
}
Expect(client.coordinator.ConsumersCount()).To(Equal(100))
Expand Down
Loading
Loading