diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go index 4ceec9ea..0148d0ba 100644 --- a/pkg/ha/ha_consumer.go +++ b/pkg/ha/ha_consumer.go @@ -46,9 +46,9 @@ func (c *ReliableConsumer) GetStatusAsString() string { func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) { go func() { event := <-channelClose - if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { + if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) || strings.EqualFold(event.Reason, stream.ZombieConsumer) { c.setStatus(StatusReconnecting) - logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo()) + logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", c.getInfo(), event.Reason) c.bootstrap = false err, reconnected := retry(1, c) if err != nil { diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 9220a550..9b25ae54 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -487,16 +487,19 @@ func (c *Client) Close() error { } } - for _, cs := range c.coordinator.consumers { - err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{ - Command: CommandClose, - StreamName: cs.(*Consumer).GetStreamName(), - Name: cs.(*Consumer).GetName(), - Reason: SocketClosed, - Err: nil, - }) - if err != nil { - logs.LogWarn("error removing consumer: %s", err) + for _, cs := range c.coordinator.GetConsumers() { + if cs != nil { + err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{ + Command: CommandClose, + StreamName: cs.(*Consumer).GetStreamName(), + Name: cs.(*Consumer).GetName(), + Reason: SocketClosed, + Err: nil, + }) + + if err != nil { + logs.LogWarn("error removing consumer: %s", err) + } } } if c.getSocket().isOpen() { @@ -1019,10 +1022,30 @@ func (c *Client) declareSubscriber(streamName string, } } - case <-time.After(consumer.options.autoCommitStrategy.flushInterval): - consumer.cacheStoreOffset() + case <-time.After(1_000 * time.Millisecond): + if consumer.options.autocommit && time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval { + consumer.cacheStoreOffset() + } + + // This is a very edge case where the consumer is not active anymore + // but the consumer is still in the list of consumers + // It can happen during the reconnection with load-balancing + // found this problem with a caos test where random killing the load-balancer and node where + // the client should be connected + if consumer.isZombie() { + logs.LogWarn("Detected zombie consumer for stream %s, closing", streamName) + consumer.close(Event{ + Command: CommandUnsubscribe, + StreamName: consumer.GetStreamName(), + Name: consumer.GetName(), + Reason: ZombieConsumer, + Err: nil, + }) + return + } } } + }() return consumer, err.Err } diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 82c3ac38..3442fe66 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -114,6 +114,7 @@ const ( defaultConfirmationTimeOut = 10 * time.Second // + ZombieConsumer = "zombie-consumer" SocketClosed = "socket client closed" MetaDataUpdate = "metadata Data update" LeaderLocatorBalanced = "balanced" diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 1c94d857..f04bdbbe 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -54,6 +54,12 @@ func (consumer *Consumer) getStatus() int { return consumer.status } +func (consumer *Consumer) isZombie() bool { + consumer.mutex.Lock() + defer consumer.mutex.Unlock() + return consumer.status == open && !consumer.options.client.socket.isOpen() +} + func (consumer *Consumer) GetStreamName() string { if consumer.options == nil { return "" @@ -341,9 +347,9 @@ func (consumer *Consumer) close(reason Event) error { consumer.closeHandler = nil } - close(consumer.chunkForConsumer) - if consumer.response.data != nil { + close(consumer.chunkForConsumer) + close(consumer.response.data) consumer.response.data = nil } diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 37c6fcb3..18d952ea 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -89,8 +89,14 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) return consumer.close(reason) } -func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { +func (coordinator *Coordinator) GetConsumers() map[interface{}]interface{} { + coordinator.mutex.Lock() + defer coordinator.mutex.Unlock() + return coordinator.consumers +} + +func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { producer, err := coordinator.ExtractProducerById(id) if err != nil { return err diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index cad6c27d..3b2e0a10 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -34,7 +34,6 @@ func newUnConfirmed(maxSize int) *unConfirmed { func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) { if u.size() > u.maxSize { - logs.LogDebug("unConfirmed size: %d reached, producer blocked", u.maxSize) u.blockSignal.L.Lock() u.blockSignal.Wait() u.blockSignal.L.Unlock()