diff --git a/pkg/ha/reliable_common.go b/pkg/ha/reliable_common.go index 2e0fb8b6..217038e2 100644 --- a/pkg/ha/reliable_common.go +++ b/pkg/ha/reliable_common.go @@ -59,6 +59,11 @@ func retry(backoff int, reliable IReliable) (error, bool) { return retry(backoff+1, reliable) } + if errors.Is(errS, stream.StreamMetadataFailure) { + logs.LogInfo("[Reliable] - Fail to retrieve the %s metadata for %s. Trying to reconnect", reliable.getStreamName(), reliable.getInfo()) + return retry(backoff+1, reliable) + } + var result error if streamMetaData != nil { logs.LogInfo("[Reliable] - The stream %s exists. Reconnecting the %s.", reliable.getStreamName(), reliable.getInfo()) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index bf676cf4..9220a550 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -418,56 +418,40 @@ func (c *Client) DeleteStream(streamName string) error { } func (c *Client) heartBeat() { - ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second) - tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second) + tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second) var heartBeatMissed int32 - doneSendingTimeoutTicker := make(chan struct{}, 1) - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - wg.Done() - select { - case <-c.doneTimeoutTicker: - doneSendingTimeoutTicker <- struct{}{} - ticker.Stop() - tickerHeartbeat.Stop() - return - case <-tickerHeartbeat.C: - if c.socket.isOpen() { - if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { - v := atomic.AddInt32(&heartBeatMissed, 1) - logs.LogWarn("Missing heart beat: %d", v) - if v >= 2 { - logs.LogWarn("Too many heartbeat missing: %d", v) - c.Close() - } - } else { - atomic.StoreInt32(&heartBeatMissed, 0) - } - } - } - - }() go func() { - wg.Done() for { select { - case <-doneSendingTimeoutTicker: - logs.LogDebug("Stopping sending heartbeat") + case <-c.doneTimeoutTicker: + tickerHeartbeat.Stop() return - case _, ok := <-ticker.C: - if !ok { + case <-tickerHeartbeat.C: + if c.socket.isOpen() { + logs.LogDebug("Heartbeat ticker is open, sending heartbeat") + c.sendHeartbeat() + if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { + v := atomic.AddInt32(&heartBeatMissed, 1) + logs.LogWarn("Missing heart beat: %d", v) + if v >= 2 { + logs.LogWarn("Too many heartbeat missing: %d", v) + c.Close() + } + } else { + atomic.StoreInt32(&heartBeatMissed, 0) + } + } else { + logs.LogDebug("Socket Heartbeat ticker is closed. Closing ticker") + tickerHeartbeat.Stop() return } - logs.LogDebug("Sending heart beat: %s", time.Now()) - c.sendHeartbeat() + } } - }() - wg.Wait() + }() } @@ -487,6 +471,8 @@ func (c *Client) closeHartBeat() { } func (c *Client) Close() error { + + c.closeHartBeat() for _, p := range c.coordinator.Producers() { err := c.coordinator.RemoveProducerById(p.(*Producer).id, Event{ Command: CommandClose, @@ -513,8 +499,6 @@ func (c *Client) Close() error { logs.LogWarn("error removing consumer: %s", err) } } - - c.closeHartBeat() if c.getSocket().isOpen() { res := c.coordinator.NewResponse(CommandClose) @@ -535,6 +519,10 @@ func (c *Client) Close() error { } func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error) { + return c.declarePublisher(streamName, options, nil) +} + +func (c *Client) declarePublisher(streamName string, options *ProducerOptions, cleanUp func()) (*Producer, error) { if options == nil { options = NewProducerOptions() } @@ -590,7 +578,7 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) ( ConfirmationTimeOut: options.ConfirmationTimeOut, ClientProvidedName: options.ClientProvidedName, Filter: options.Filter, - }) + }, cleanUp) if err != nil { return nil, err @@ -836,10 +824,15 @@ func (c *Client) StoreOffset(consumerName string, streamName string, offset int6 writeLong(b, offset) return c.socket.writeAndFlush(b.Bytes()) } - func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error) { + return c.declareSubscriber(streamName, messagesHandler, options, nil) +} + +func (c *Client) declareSubscriber(streamName string, + messagesHandler MessagesHandler, + options *ConsumerOptions, cleanUp func()) (*Consumer, error) { if options == nil { options = NewConsumerOptions() } @@ -919,7 +912,7 @@ func (c *Client) DeclareSubscriber(streamName string, options.client = c options.streamName = streamName - consumer := c.coordinator.NewConsumer(messagesHandler, options) + consumer := c.coordinator.NewConsumer(messagesHandler, options, cleanUp) length := 2 + 2 + 4 + 1 + 2 + len(streamName) + 2 + 2 if options.Offset.isOffset() || @@ -1004,7 +997,6 @@ func (c *Client) DeclareSubscriber(streamName string, } return true } - go func() { for { select { diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 938bf687..0ef3eef9 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -132,6 +132,7 @@ var AlreadyClosed = errors.New("Already Closed") var PreconditionFailed = errors.New("Precondition Failed") var AuthenticationFailure = errors.New("Authentication Failure") var StreamDoesNotExist = errors.New("Stream Does Not Exist") +var StreamMetadataFailure = errors.New("Stream Metadata Failure") var StreamAlreadyExists = errors.New("Stream Already Exists") var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure") var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist") diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 9add7319..1c94d857 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -13,7 +13,7 @@ type Consumer struct { ID uint8 response *Response options *ConsumerOptions - onClose onInternalClose + onClose func() mutex *sync.Mutex chunkForConsumer chan chunkInfo MessagesHandler MessagesHandler @@ -310,6 +310,7 @@ func (c *Client) credit(subscriptionId byte, credit int16) { } func (consumer *Consumer) Close() error { + if consumer.getStatus() == closed { return AlreadyClosed } @@ -367,10 +368,10 @@ func (consumer *Consumer) close(reason Event) error { _ = consumer.options.client.Close() } - ch := make(chan uint8, 1) - ch <- consumer.ID - consumer.onClose(ch) - close(ch) + if consumer.onClose != nil { + consumer.onClose() + } + return nil } diff --git a/pkg/stream/consumer_sac_test.go b/pkg/stream/consumer_sac_test.go index cb7cda83..ac44add9 100644 --- a/pkg/stream/consumer_sac_test.go +++ b/pkg/stream/consumer_sac_test.go @@ -60,19 +60,19 @@ var _ = Describe("Streaming Single Active Consumer", func() { Expect(err2).To(HaveOccurred()) /// check support for single active consumer is not enabled - client, err3 := testEnvironment.newReconnectClient() + err3 := testEnvironment.maybeReconnectLocator() Expect(err3).NotTo(HaveOccurred()) // here we inject a fake response to simulate a server that // does not support SAC . // so we can validate it // This method is not thread safe and should be used only for testing purposes - client.availableFeatures.brokerSingleActiveConsumerEnabled = false + testEnvironment.locator.client.availableFeatures.brokerSingleActiveConsumerEnabled = false handleMessages := func(consumerContext ConsumerContext, message *amqp.Message) { } - _, err = client.DeclareSubscriber(streamName, handleMessages, NewConsumerOptions().SetSingleActiveConsumer( + _, err = testEnvironment.locator.client.DeclareSubscriber(streamName, handleMessages, NewConsumerOptions().SetSingleActiveConsumer( NewSingleActiveConsumer(func(_ string, isActive bool) OffsetSpecification { return OffsetSpecification{}.Last() }), diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index e9dc6a43..6ffd203c 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -51,9 +51,9 @@ var _ = Describe("Streaming Consumers", func() { for _, consumer := range consumers { Expect(consumer.Close()).NotTo(HaveOccurred()) } - - Expect(len(env.consumers.getCoordinators()["localhost:5552"]. - getClientsPerContext())).To(Equal(0)) + //time.Sleep(1 * time.Second) + Eventually(len(env.consumers.getCoordinators()["localhost:5552"]. + getClientsPerContext())).ProbeEvery(100 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(0)) }) @@ -454,7 +454,7 @@ var _ = Describe("Streaming Consumers", func() { It("Check already closed", func() { producer, err := env.NewProducer(streamName, nil) Expect(err).NotTo(HaveOccurred()) - err = producer.BatchSend(CreateArrayMessagesForTesting(500)) + err = producer.BatchSend(CreateArrayMessagesForTesting(1)) Expect(err).NotTo(HaveOccurred()) defer func(producer *Producer) { @@ -463,17 +463,20 @@ var _ = Describe("Streaming Consumers", func() { }(producer) var messagesCount int32 = 0 + var signal = make(chan struct{}) consumer, err := env.NewConsumer(streamName, func(consumerContext ConsumerContext, message *amqp.Message) { if atomic.AddInt32(&messagesCount, 1) >= 1 { - err := consumerContext.Consumer.Close() - if err != nil { - return - } + time.Sleep(500 * time.Millisecond) + err1 := consumerContext.Consumer.Close() + Expect(err1).NotTo(HaveOccurred()) + signal <- struct{}{} } }, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()). SetConsumerName("consumer_test")) Expect(err).NotTo(HaveOccurred()) + + <-signal time.Sleep(200 * time.Millisecond) Expect(consumer.Close()).To(Equal(AlreadyClosed)) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 1060a66c..37c6fcb3 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -50,7 +50,7 @@ func NewCoordinator() *Coordinator { // producersEnvironment func (coordinator *Coordinator) NewProducer( - parameters *ProducerOptions) (*Producer, error) { + parameters *ProducerOptions, cleanUp func()) (*Producer, error) { coordinator.mutex.Lock() defer coordinator.mutex.Unlock() dynSize := 10000 @@ -75,6 +75,7 @@ func (coordinator *Coordinator) NewProducer( status: open, pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize), confirmMutex: &sync.Mutex{}, + onClose: cleanUp, } coordinator.producers[lastId] = producer return producer, err @@ -174,7 +175,7 @@ func (coordinator *Coordinator) RemoveResponseByName(id string) error { // Consumer functions func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, - parameters *ConsumerOptions) *Consumer { + parameters *ConsumerOptions, cleanUp func()) *Consumer { coordinator.mutex.Lock() defer coordinator.mutex.Unlock() var lastId, _ = coordinator.getNextConsumerItem() @@ -188,6 +189,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, isPromotedAsActive: true, lastAutoCommitStored: time.Now(), chunkForConsumer: make(chan chunkInfo, 100), + onClose: cleanUp, } coordinator.consumers[lastId] = item diff --git a/pkg/stream/coordinator_test.go b/pkg/stream/coordinator_test.go index 322de58a..89e17c81 100644 --- a/pkg/stream/coordinator_test.go +++ b/pkg/stream/coordinator_test.go @@ -23,7 +23,7 @@ var _ = Describe("Coordinator", func() { }) It("Add/Remove Producers ", func() { - p, err := client.coordinator.NewProducer(nil) + p, err := client.coordinator.NewProducer(nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(p.id).To(Equal(uint8(0))) err = client.coordinator.RemoveProducerById(p.id, Event{ @@ -46,7 +46,7 @@ var _ = Describe("Coordinator", func() { It("massive insert/delete coordinator ", func() { var producersId []uint8 for i := 0; i < 100; i++ { - p, err := client.coordinator.NewProducer(nil) + p, err := client.coordinator.NewProducer(nil, nil) producersId = append(producersId, p.id) Expect(err).NotTo(HaveOccurred()) } @@ -66,7 +66,7 @@ var _ = Describe("Coordinator", func() { // until reach 255 then start reusing the old // unused ids for i := 0; i < 250; i++ { - p, err := client.coordinator.NewProducer(nil) + p, err := client.coordinator.NewProducer(nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(p.id).To(Equal(uint8(i))) err = client.coordinator.RemoveProducerById(p.id, Event{ @@ -84,7 +84,7 @@ var _ = Describe("Coordinator", func() { var producersId []uint8 for i := 0; i < 500; i++ { - p, err := client.coordinator.NewProducer(nil) + p, err := client.coordinator.NewProducer(nil, nil) if i >= int(^uint8(0)) { Expect(fmt.Sprintf("%s", err)). To(ContainSubstring("No more items available")) @@ -109,7 +109,7 @@ var _ = Describe("Coordinator", func() { // raise an logError not found Expect(err).To(HaveOccurred()) - p, err := client.coordinator.NewProducer(nil) + p, err := client.coordinator.NewProducer(nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(p.id).To(Equal(v)) } @@ -140,7 +140,7 @@ var _ = Describe("Coordinator", func() { }) It("Add/Remove consumers ", func() { - p, err := client.coordinator.NewProducer(nil) + p, err := client.coordinator.NewProducer(nil, nil) Expect(err).NotTo(HaveOccurred()) Expect(p.id).To(Equal(uint8(0))) err = client.coordinator.RemoveProducerById(p.id, Event{ @@ -163,7 +163,7 @@ var _ = Describe("Coordinator", func() { It("massive insert/delete consumers ", func() { var consumersId []uint8 for i := 0; i < 100; i++ { - p := client.coordinator.NewConsumer(nil, nil) + p := client.coordinator.NewConsumer(nil, nil, nil) consumersId = append(consumersId, p.ID) } Expect(client.coordinator.ConsumersCount()).To(Equal(100)) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 907ba8da..3d777a5e 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -14,10 +14,23 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" ) +type locator struct { + client *Client + mutex sync.Mutex +} + +func newLocator(client *Client) *locator { + return &locator{ + client: client, + mutex: sync.Mutex{}, + } +} + type Environment struct { producers *producersEnvironment consumers *consumersEnvironment options *EnvironmentOptions + locator *locator closed bool } @@ -109,126 +122,115 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { producers: newProducers(options.MaxProducersPerClient), consumers: newConsumerEnvironment(options.MaxConsumersPerClient), closed: false, + locator: newLocator(client), }, connectionError } -func (env *Environment) newReconnectClient() (*Client, error) { +func (env *Environment) maybeReconnectLocator() error { + env.locator.mutex.Lock() + defer env.locator.mutex.Unlock() + if env.locator.client != nil && env.locator.client.socket.isOpen() { + return nil + } + broker := env.options.ConnectionParameters[0] - client := newClient("go-stream-locator", broker, env.options.TCPParameters, + c := newClient("go-stream-locator", broker, env.options.TCPParameters, env.options.SaslConfiguration, env.options.RPCTimeout) - err := client.connect() + env.locator.client = c + err := c.connect() tentatives := 1 for err != nil { sleepTime := rand.Intn(5000) + (tentatives * 1000) - brokerUri := fmt.Sprintf("%s://%s:***@%s:%s/%s", client.broker.Scheme, client.broker.User, client.broker.Host, client.broker.Port, client.broker.Vhost) + brokerUri := fmt.Sprintf("%s://%s:***@%s:%s/%s", c.broker.Scheme, c.broker.User, c.broker.Host, c.broker.Port, c.broker.Vhost) logs.LogError("Can't connect the locator client, error:%s, retry in %d milliseconds, broker: ", err, sleepTime, brokerUri) time.Sleep(time.Duration(sleepTime) * time.Millisecond) r := rand.New(rand.NewSource(time.Now().Unix())) n := r.Intn(len(env.options.ConnectionParameters)) - client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters, + c1 := newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters, env.options.SaslConfiguration, env.options.RPCTimeout) tentatives = tentatives + 1 - err = client.connect() + env.locator.client = c1 + err = c1.connect() } - return client, client.connect() + return env.locator.client.connect() } func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return err } - if err := client.DeclareStream(streamName, options); err != nil && err != StreamAlreadyExists { + if err := env.locator.client.DeclareStream(streamName, options); err != nil && err != StreamAlreadyExists { return err } return nil } func (env *Environment) DeleteStream(streamName string) error { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return err } - return client.DeleteStream(streamName) + return env.locator.client.DeleteStream(streamName) } func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() + if err != nil { return nil, err } - return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver, env.options.RPCTimeout) + return env.producers.newProducer(env.locator.client, streamName, producerOptions, env.options.AddressResolver, env.options.RPCTimeout) } func (env *Environment) StreamExists(streamName string) (bool, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return false, err } - return client.StreamExists(streamName), nil + return env.locator.client.StreamExists(streamName), nil } func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return 0, err } - return client.queryOffset(consumerName, streamName) + return env.locator.client.queryOffset(consumerName, streamName) } // QuerySequence gets the last id stored for a producer // you can also see producer.GetLastPublishingId() that is the easier way to get the last-id func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return 0, err } - return client.queryPublisherSequence(publisherReference, streamName) + return env.locator.client.queryPublisherSequence(publisherReference, streamName) } func (env *Environment) StreamStats(streamName string) (*StreamStats, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return nil, err } - return client.StreamStats(streamName) + return env.locator.client.StreamStats(streamName) } func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return nil, err } - streamsMetadata := client.metaData(streamName) + streamsMetadata := env.locator.client.metaData(streamName) + if streamsMetadata == nil { + return nil, StreamMetadataFailure + } streamMetadata := streamsMetadata.Get(streamName) if streamMetadata.responseCode != responseCodeOk { return nil, lookErrorCode(streamMetadata.responseCode) @@ -236,7 +238,7 @@ func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, erro tentatives := 0 for streamMetadata == nil || streamMetadata.Leader == nil && tentatives < 3 { - streamsMetadata = client.metaData(streamName) + streamsMetadata = env.locator.client.metaData(streamName) streamMetadata = streamsMetadata.Get(streamName) tentatives++ time.Sleep(100 * time.Millisecond) @@ -253,15 +255,12 @@ func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, erro func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return nil, err } - return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver, env.options.RPCTimeout) + return env.consumers.NewSubscriber(env.locator.client, streamName, messagesHandler, options, env.options.AddressResolver, env.options.RPCTimeout) } func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error) { @@ -275,6 +274,9 @@ func (env *Environment) NewSuperStreamProducer(superStream string, superStreamPr func (env *Environment) Close() error { _ = env.producers.close() _ = env.consumers.close() + if env.locator.client != nil { + _ = env.locator.client.Close() + } env.closed = true return nil } @@ -546,8 +548,7 @@ func (c *Client) maybeCleanConsumers(streamName string) { c.mutex.Unlock() } -func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, - options *ProducerOptions, rpcTimeout time.Duration) (*Producer, error) { +func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, options *ProducerOptions, rpcTimeout time.Duration, cleanUp func()) (*Producer, error) { cc.mutex.Lock() defer cc.mutex.Unlock() var clientResult *Client @@ -591,7 +592,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP time.Sleep(1 * time.Second) } - producer, err := clientResult.DeclarePublisher(streamName, options) + producer, err := clientResult.declarePublisher(streamName, options, cleanUp) if err != nil { return nil, err @@ -610,7 +611,7 @@ func (cc *environmentCoordinator) newClientForProducer(connectionName string, le func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, messagesHandler MessagesHandler, - options *ConsumerOptions, rpcTimeout time.Duration) (*Consumer, error) { + options *ConsumerOptions, rpcTimeout time.Duration, cleanUp func()) (*Consumer, error) { cc.mutex.Lock() defer cc.mutex.Unlock() var clientResult *Client @@ -634,8 +635,7 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro return nil, err } - subscriber, err := clientResult.DeclareSubscriber(streamName, messagesHandler, options) - + subscriber, err := clientResult.declareSubscriber(streamName, messagesHandler, options, cleanUp) if err != nil { return nil, err } @@ -701,16 +701,16 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st } leader.cloneFrom(clientLocator.broker, resolver) - producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters, - clientLocator.saslConfiguration, streamName, options, rpcTimeOut) - if err != nil { - return nil, err - } - producer.onClose = func(ch <-chan uint8) { + cleanUp := func() { for _, coordinator := range ps.producersCoordinator { coordinator.maybeCleanClients() } } + producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters, + clientLocator.saslConfiguration, streamName, options, rpcTimeOut, cleanUp) + if err != nil { + return nil, err + } return producer, err } @@ -768,18 +768,20 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName if consumerOptions != nil && consumerOptions.ClientProvidedName != "" { clientProvidedName = consumerOptions.ClientProvidedName } + cleanUp := func() { + for _, coordinator := range ps.consumersCoordinator { + coordinator.maybeCleanClients() + } + } + consumer, err := ps.consumersCoordinator[coordinatorKey]. newConsumer(clientProvidedName, consumerBroker, clientLocator.tcpParameters, clientLocator.saslConfiguration, - streamName, messagesHandler, consumerOptions, rpcTimeout) + streamName, messagesHandler, consumerOptions, rpcTimeout, cleanUp) if err != nil { return nil, err } - consumer.onClose = func(ch <-chan uint8) { - for _, coordinator := range ps.consumersCoordinator { - coordinator.maybeCleanClients() - } - } + return consumer, err } @@ -801,39 +803,30 @@ func (ps *consumersEnvironment) getCoordinators() map[string]*environmentCoordin // Super stream func (env *Environment) DeclareSuperStream(superStreamName string, options SuperStreamOptions) error { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return err } - if err := client.DeclareSuperStream(superStreamName, options); err != nil && !errors.Is(err, StreamAlreadyExists) { + if err := env.locator.client.DeclareSuperStream(superStreamName, options); err != nil && !errors.Is(err, StreamAlreadyExists) { return err } return nil } func (env *Environment) DeleteSuperStream(superStreamName string) error { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return err } - return client.DeleteSuperStream(superStreamName) + return env.locator.client.DeleteSuperStream(superStreamName) } func (env *Environment) QueryPartitions(superStreamName string) ([]string, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return nil, err } - return client.QueryPartitions(superStreamName) + return env.locator.client.QueryPartitions(superStreamName) } // StoreOffset stores the offset for a consumer for a stream @@ -844,25 +837,19 @@ func (env *Environment) QueryPartitions(superStreamName string) ([]string, error // StoreOffset does not return any application error, if the stream does not exist or the consumer does not exist // the error is logged in the server func (env *Environment) StoreOffset(consumerName string, streamName string, offset int64) error { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return err } - return client.StoreOffset(consumerName, streamName, offset) + return env.locator.client.StoreOffset(consumerName, streamName, offset) } func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error) { - client, err := env.newReconnectClient() - defer func(client *Client) { - _ = client.Close() - }(client) + err := env.maybeReconnectLocator() if err != nil { return nil, err } - return client.queryRoute(superStream, routingKey) + return env.locator.client.queryRoute(superStream, routingKey) } func (env *Environment) NewSuperStreamConsumer(superStream string, messagesHandler MessagesHandler, options *SuperStreamConsumerOptions) (*SuperStreamConsumer, error) { diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 359caa62..3520a3d7 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -490,10 +490,30 @@ var _ = Describe("Environment test", func() { Expect(env.Close()).NotTo(HaveOccurred()) }) - Describe("Query Offset should not return any error in case of stream does not exist", func() { + // PR:https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/388 + Describe("QueryOffset DeclareStream StoreOffset should reconnect the locator", func() { env, err := NewEnvironment(NewEnvironmentOptions()) Expect(err).NotTo(HaveOccurred()) - Expect(env.StoreOffset("my_consumer", "stream_doesnt_exist", 123)).NotTo(HaveOccurred()) + streamName := uuid.New().String() + // here we force the client closing + Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + const consumerName = "my_consumer_1" + // here we force the client closing + Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + // here we force the client closing + Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + off, err := env.QueryOffset(consumerName, streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + Expect(off).To(Equal(int64(123))) + // here we force the client closing + Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) Expect(env.Close()).NotTo(HaveOccurred()) }) diff --git a/pkg/stream/filtering_test.go b/pkg/stream/filtering_test.go index 803c9b3b..cc3b8dc7 100644 --- a/pkg/stream/filtering_test.go +++ b/pkg/stream/filtering_test.go @@ -46,7 +46,7 @@ var _ = Describe("Streaming Filtering", func() { It("Validate Producer/Consume Filtering not supported", func() { - client, err := testEnvironment.newReconnectClient() + err := testEnvironment.maybeReconnectLocator() Expect(err).NotTo(HaveOccurred()) @@ -54,9 +54,9 @@ var _ = Describe("Streaming Filtering", func() { // does not support broker filtering. // so we can validate it // This method is not thread safe and should be used only for testing purposes - client.availableFeatures.brokerFilterEnabled = false + testEnvironment.locator.client.availableFeatures.brokerFilterEnabled = false - _, err = client.DeclarePublisher(testProducerStream, NewProducerOptions().SetFilter( + _, err = testEnvironment.locator.client.DeclarePublisher(testProducerStream, NewProducerOptions().SetFilter( NewProducerFilter(func(message message.StreamMessage) string { return fmt.Sprintf("%s", message.GetApplicationProperties()["ID"]) }), @@ -71,7 +71,7 @@ var _ = Describe("Streaming Filtering", func() { handleMessages := func(consumerContext ConsumerContext, message *amqp.Message) { } - _, err = client.DeclareSubscriber(testProducerStream, handleMessages, NewConsumerOptions().SetFilter(filter)) + _, err = testEnvironment.locator.client.DeclareSubscriber(testProducerStream, handleMessages, NewConsumerOptions().SetFilter(filter)) Expect(err).To(Equal(FilterNotSupported)) }) diff --git a/pkg/stream/listeners.go b/pkg/stream/listeners.go index f0750226..98bfd962 100644 --- a/pkg/stream/listeners.go +++ b/pkg/stream/listeners.go @@ -8,7 +8,5 @@ type Event struct { Err error } -type onInternalClose func(ch <-chan uint8) - type ChannelClose = <-chan Event type ChannelPublishConfirm chan []*ConfirmationStatus diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index e7441c85..60c72486 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -70,7 +70,7 @@ type messageSequence struct { type Producer struct { id uint8 options *ProducerOptions - onClose onInternalClose + onClose func() unConfirmed *unConfirmed sequence int64 mutex *sync.RWMutex @@ -668,10 +668,7 @@ func (producer *Producer) close(reason Event) error { } if producer.onClose != nil { - ch := make(chan uint8, 1) - ch <- producer.id - producer.onClose(ch) - close(ch) + producer.onClose() } return nil diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index dcc98d5a..3c79eb89 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -317,22 +317,22 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() SetBalancedLeaderLocator(). SetMaxLengthBytes(ByteCapacity{}.KB(1024))), ).NotTo(HaveOccurred()) - client, err := env.newReconnectClient() + err = env.maybeReconnectLocator() Expect(err).NotTo(HaveOccurred()) - Expect(client).NotTo(BeNil()) - route, err := client.queryRoute(superStream, "italy") + Expect(env.locator.client).NotTo(BeNil()) + route, err := env.locator.client.queryRoute(superStream, "italy") Expect(err).NotTo(HaveOccurred()) Expect(route).NotTo(BeNil()) Expect(route).To(HaveLen(1)) Expect(route[0]).To(Equal(fmt.Sprintf("%s-italy", superStream))) - route, err = client.queryRoute(superStream, "spain") + route, err = env.locator.client.queryRoute(superStream, "spain") Expect(err).NotTo(HaveOccurred()) Expect(route).NotTo(BeNil()) Expect(route).To(HaveLen(1)) Expect(route[0]).To(Equal(fmt.Sprintf("%s-spain", superStream))) - route, err = client.queryRoute(superStream, "france") + route, err = env.locator.client.queryRoute(superStream, "france") Expect(err).NotTo(HaveOccurred()) Expect(route).NotTo(BeNil()) Expect(route).To(HaveLen(1)) @@ -340,11 +340,11 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() // here we test the case where the key is not found // the client should return an empty list - route, err = client.queryRoute(superStream, "NOT_EXIST") + route, err = env.locator.client.queryRoute(superStream, "NOT_EXIST") Expect(err).NotTo(HaveOccurred()) Expect(route).To(Equal([]string{})) - Expect(client.Close()).NotTo(HaveOccurred()) + Expect(env.locator.client.Close()).NotTo(HaveOccurred()) Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred()) Expect(env.Close()).NotTo(HaveOccurred()) }) @@ -352,13 +352,13 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() It("should return stream not found query route", func() { env, err := NewEnvironment(nil) Expect(err).NotTo(HaveOccurred()) - client, err := env.newReconnectClient() + err = env.maybeReconnectLocator() Expect(err).NotTo(HaveOccurred()) - Expect(client).NotTo(BeNil()) - route, err := client.queryRoute("not-found", "italy") + Expect(env.locator.client).NotTo(BeNil()) + route, err := env.locator.client.queryRoute("not-found", "italy") Expect(err).To(HaveOccurred()) Expect(route).To(BeNil()) - Expect(client.Close()).NotTo(HaveOccurred()) + Expect(env.locator.client.Close()).NotTo(HaveOccurred()) Expect(env.Close()).NotTo(HaveOccurred()) }) diff --git a/pkg/stream/super_stream_test.go b/pkg/stream/super_stream_test.go index 6c052d90..7181abed 100644 --- a/pkg/stream/super_stream_test.go +++ b/pkg/stream/super_stream_test.go @@ -50,71 +50,71 @@ var _ = Describe("Super Stream Client", Label("super-stream"), func() { }) It("Validate Super Stream Creation", Label("super-stream"), func() { - client, err := testEnvironment.newReconnectClient() + err := testEnvironment.maybeReconnectLocator() Expect(err).NotTo(HaveOccurred()) // empty name - err = client.DeclareSuperStream("", nil) + err = testEnvironment.locator.client.DeclareSuperStream("", nil) Expect(err).To(HaveOccurred()) // empty name with spaces - err = client.DeclareSuperStream(" ", nil) + err = testEnvironment.locator.client.DeclareSuperStream(" ", nil) Expect(err).To(HaveOccurred()) // partition nil and empty - err = client.DeclareSuperStream("valid name", nil) + err = testEnvironment.locator.client.DeclareSuperStream("valid name", nil) Expect(err).To(HaveOccurred()) // bindingskeys nil and empty - err = client.DeclareSuperStream("valid name", + err = testEnvironment.locator.client.DeclareSuperStream("valid name", newTestSuperStreamOption([]string{"some name"}, nil, nil)) Expect(err).To(HaveOccurred()) // partition empty - err = client.DeclareSuperStream("valid name", newTestSuperStreamOption([]string{""}, []string{"some key"}, nil)) + err = testEnvironment.locator.client.DeclareSuperStream("valid name", newTestSuperStreamOption([]string{""}, []string{"some key"}, nil)) Expect(err).To(HaveOccurred()) // partition key empty - err = client.DeclareSuperStream("valid name", newTestSuperStreamOption([]string{" "}, []string{"some key"}, nil)) + err = testEnvironment.locator.client.DeclareSuperStream("valid name", newTestSuperStreamOption([]string{" "}, []string{"some key"}, nil)) Expect(err).To(HaveOccurred()) // bindigs key empty - err = client.DeclareSuperStream("valid name", newTestSuperStreamOption([]string{"valid "}, []string{""}, nil)) + err = testEnvironment.locator.client.DeclareSuperStream("valid name", newTestSuperStreamOption([]string{"valid "}, []string{""}, nil)) Expect(err).To(HaveOccurred()) - Expect(client.Close()).NotTo(HaveOccurred()) + Expect(testEnvironment.locator.client.Close()).NotTo(HaveOccurred()) }) It("Create Super stream two times and delete it with client", Label("super-stream"), func() { - client, err := testEnvironment.newReconnectClient() + err := testEnvironment.maybeReconnectLocator() Expect(err).NotTo(HaveOccurred()) - err = client.DeclareSuperStream("go-my_super_stream_with_2_partitions", newTestSuperStreamOption([]string{"go-partition_0", "go-partition_1"}, []string{"0", "1"}, map[string]string{"queue-leader-locator": "least-leaders"})) + err = testEnvironment.locator.client.DeclareSuperStream("go-my_super_stream_with_2_partitions", newTestSuperStreamOption([]string{"go-partition_0", "go-partition_1"}, []string{"0", "1"}, map[string]string{"queue-leader-locator": "least-leaders"})) Expect(err).NotTo(HaveOccurred()) - err = client.DeclareSuperStream("go-my_super_stream_with_2_partitions", + err2 := testEnvironment.locator.client.DeclareSuperStream("go-my_super_stream_with_2_partitions", newTestSuperStreamOption([]string{"go-partition_0", "go-partition_1"}, []string{"0", "1"}, map[string]string{"queue-leader-locator": "least-leaders"})) - Expect(err).To(Equal(StreamAlreadyExists)) + Expect(err2).To(Equal(StreamAlreadyExists)) - err = client.DeleteSuperStream("go-my_super_stream_with_2_partitions") + err = testEnvironment.DeleteSuperStream("go-my_super_stream_with_2_partitions") Expect(err).NotTo(HaveOccurred()) - err = client.DeleteSuperStream("go-my_super_stream_with_2_partitions") + err = testEnvironment.DeleteSuperStream("go-my_super_stream_with_2_partitions") Expect(err).To(Equal(StreamDoesNotExist)) - Expect(client.Close()).NotTo(HaveOccurred()) + Expect(testEnvironment.Close()).NotTo(HaveOccurred()) }) It("Query Partitions With client/environment", Label("super-stream"), func() { - client, err := testEnvironment.newReconnectClient() + err := testEnvironment.maybeReconnectLocator() Expect(err).NotTo(HaveOccurred()) - err = client.DeclareSuperStream("go-my_super_stream_with_query_partitions", + err = testEnvironment.DeclareSuperStream("go-my_super_stream_with_query_partitions", NewPartitionsOptions(3)) Expect(err).NotTo(HaveOccurred()) - partitions, err := client.QueryPartitions("go-my_super_stream_with_query_partitions") + partitions, err := testEnvironment.QueryPartitions("go-my_super_stream_with_query_partitions") Expect(err).NotTo(HaveOccurred()) Expect(partitions).To(HaveLen(3)) @@ -130,8 +130,8 @@ var _ = Describe("Super Stream Client", Label("super-stream"), func() { Expect(partition).To(MatchRegexp("go-my_super_stream_with_query_partitions-\\d")) } - Expect(client.DeleteSuperStream("go-my_super_stream_with_query_partitions")).NotTo(HaveOccurred()) - Expect(client.Close()).NotTo(HaveOccurred()) + Expect(testEnvironment.DeleteSuperStream("go-my_super_stream_with_query_partitions")).NotTo(HaveOccurred()) + Expect(testEnvironment.Close()).NotTo(HaveOccurred()) })