Skip to content

Detect zombie consumer #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ha/ha_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func (c *ReliableConsumer) GetStatusAsString() string {
func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
go func() {
event := <-channelClose
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) || strings.EqualFold(event.Reason, stream.ZombieConsumer) {
c.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo())
logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", c.getInfo(), event.Reason)
c.bootstrap = false
err, reconnected := retry(1, c)
if err != nil {
Expand Down
47 changes: 35 additions & 12 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,19 @@ func (c *Client) Close() error {
}
}

for _, cs := range c.coordinator.consumers {
err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{
Command: CommandClose,
StreamName: cs.(*Consumer).GetStreamName(),
Name: cs.(*Consumer).GetName(),
Reason: SocketClosed,
Err: nil,
})
if err != nil {
logs.LogWarn("error removing consumer: %s", err)
for _, cs := range c.coordinator.GetConsumers() {
if cs != nil {
err := c.coordinator.RemoveConsumerById(cs.(*Consumer).ID, Event{
Command: CommandClose,
StreamName: cs.(*Consumer).GetStreamName(),
Name: cs.(*Consumer).GetName(),
Reason: SocketClosed,
Err: nil,
})

if err != nil {
logs.LogWarn("error removing consumer: %s", err)
}
}
}
if c.getSocket().isOpen() {
Expand Down Expand Up @@ -1019,10 +1022,30 @@ func (c *Client) declareSubscriber(streamName string,
}
}

case <-time.After(consumer.options.autoCommitStrategy.flushInterval):
consumer.cacheStoreOffset()
case <-time.After(1_000 * time.Millisecond):
if consumer.options.autocommit && time.Since(consumer.getLastAutoCommitStored()) >= consumer.options.autoCommitStrategy.flushInterval {
consumer.cacheStoreOffset()
}

// This is a very edge case where the consumer is not active anymore
// but the consumer is still in the list of consumers
// It can happen during the reconnection with load-balancing
// found this problem with a caos test where random killing the load-balancer and node where
// the client should be connected
if consumer.isZombie() {
logs.LogWarn("Detected zombie consumer for stream %s, closing", streamName)
consumer.close(Event{
Command: CommandUnsubscribe,
StreamName: consumer.GetStreamName(),
Name: consumer.GetName(),
Reason: ZombieConsumer,
Err: nil,
})
return
}
}
}

}()
return consumer, err.Err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ const (
defaultConfirmationTimeOut = 10 * time.Second
//

ZombieConsumer = "zombie-consumer"
SocketClosed = "socket client closed"
MetaDataUpdate = "metadata Data update"
LeaderLocatorBalanced = "balanced"
Expand Down
10 changes: 8 additions & 2 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func (consumer *Consumer) getStatus() int {
return consumer.status
}

func (consumer *Consumer) isZombie() bool {
consumer.mutex.Lock()
defer consumer.mutex.Unlock()
return consumer.status == open && !consumer.options.client.socket.isOpen()
}

func (consumer *Consumer) GetStreamName() string {
if consumer.options == nil {
return ""
Expand Down Expand Up @@ -341,9 +347,9 @@ func (consumer *Consumer) close(reason Event) error {
consumer.closeHandler = nil
}

close(consumer.chunkForConsumer)

if consumer.response.data != nil {
close(consumer.chunkForConsumer)

close(consumer.response.data)
consumer.response.data = nil
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,14 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
return consumer.close(reason)

}
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error {
func (coordinator *Coordinator) GetConsumers() map[interface{}]interface{} {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
return coordinator.consumers

}

func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error {
producer, err := coordinator.ExtractProducerById(id)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion pkg/stream/producer_unconfirmed.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func newUnConfirmed(maxSize int) *unConfirmed {
func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) {

if u.size() > u.maxSize {
logs.LogDebug("unConfirmed size: %d reached, producer blocked", u.maxSize)
u.blockSignal.L.Lock()
u.blockSignal.Wait()
u.blockSignal.L.Unlock()
Expand Down
Loading