File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 77 "fmt"
88 "log/slog"
99 "regexp"
10+ "slices"
1011 "sort"
1112 "sync"
1213 "sync/atomic"
@@ -579,6 +580,24 @@ func (s *Sink) updateTopicSubscription(ctx context.Context, metadataTimeout time
579580 return nil
580581 }
581582
583+ // Calling SubscribeTopics always triggers a consumer group rejoin in librdkafka,
584+ // even if the subscription is identical. As this function runs periodically
585+ // (NamespaceRefetch) on every sink-worker replica, re-subscribing unconditionally
586+ // keeps the consumer group in a perpetual rebalance, which blocks offset commits
587+ // (REBALANCE_IN_PROGRESS) and stalls consumption. Only re-subscribe on change.
588+ subscription , err := s .config .Consumer .Subscription ()
589+ if err != nil {
590+ return fmt .Errorf ("failed to get current topic subscription: %w" , err )
591+ }
592+
593+ slices .Sort (topics )
594+
595+ if slices .Equal (topics , slices .Sorted (slices .Values (subscription ))) {
596+ logger .Debug ("topic subscription is unchanged, skipping re-subscribe" , "topics" , topics )
597+
598+ return nil
599+ }
600+
582601 err = s .config .Consumer .SubscribeTopics (topics , s .rebalance )
583602 if err != nil {
584603 return fmt .Errorf ("failed to subscribe to topics: %s" , err )
You can’t perform that action at this time.
0 commit comments