Skip to content

PubSub Kafka: Respect Subscribe context #3363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions bindings/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,26 @@ func (b *Binding) Read(ctx context.Context, handler bindings.Handler) error {
return nil
}

handlerConfig := kafka.SubscriptionHandlerConfig{
IsBulkSubscribe: false,
Handler: adaptHandler(handler),
}
for _, t := range b.topics {
b.kafka.AddTopicHandler(t, handlerConfig)
}
ctx, cancel := context.WithCancel(ctx)

b.wg.Add(1)
go func() {
defer b.wg.Done()
// Wait for context cancelation or closure.
select {
case <-ctx.Done():
case <-b.closeCh:
}

// Remove the topic handlers.
for _, t := range b.topics {
b.kafka.RemoveTopicHandler(t)
}
cancel()
b.wg.Done()
}()

return b.kafka.Subscribe(ctx)
handlerConfig := kafka.SubscriptionHandlerConfig{
IsBulkSubscribe: false,
Handler: adaptHandler(handler),
}

b.kafka.Subscribe(ctx, handlerConfig, b.topics...)

return nil
}

func adaptHandler(handler bindings.Handler) kafka.EventHandler {
Expand Down
148 changes: 2 additions & 146 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ limitations under the License.
package kafka

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/IBM/sarama"
Expand All @@ -29,15 +27,8 @@ import (
)

type consumer struct {
k *Kafka
ready chan bool
running chan struct{}
stopped atomic.Bool
once sync.Once
mutex sync.Mutex
skipConsume bool
consumeCtx context.Context
consumeCancel context.CancelFunc
k *Kafka
mutex sync.Mutex
}

func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
Expand Down Expand Up @@ -233,27 +224,9 @@ func (consumer *consumer) Cleanup(sarama.ConsumerGroupSession) error {
}

func (consumer *consumer) Setup(sarama.ConsumerGroupSession) error {
consumer.once.Do(func() {
close(consumer.ready)
})

return nil
}

// AddTopicHandler adds a handler and configuration for a topic
func (k *Kafka) AddTopicHandler(topic string, handlerConfig SubscriptionHandlerConfig) {
k.subscribeLock.Lock()
k.subscribeTopics[topic] = handlerConfig
k.subscribeLock.Unlock()
}

// RemoveTopicHandler removes a topic handler
func (k *Kafka) RemoveTopicHandler(topic string) {
k.subscribeLock.Lock()
delete(k.subscribeTopics, topic)
k.subscribeLock.Unlock()
}

// checkBulkSubscribe checks if a bulk handler and config are correctly registered for provided topic
func (k *Kafka) checkBulkSubscribe(topic string) bool {
if bulkHandlerConfig, ok := k.subscribeTopics[topic]; ok &&
Expand All @@ -275,120 +248,3 @@ func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig,
return SubscriptionHandlerConfig{},
fmt.Errorf("any handler for messages of topic %s not found", topic)
}

// Subscribe to topic in the Kafka cluster, in a background goroutine
func (k *Kafka) Subscribe(ctx context.Context) error {
if k.consumerGroup == "" {
return errors.New("kafka: consumerGroup must be set to subscribe")
}

k.subscribeLock.Lock()
defer k.subscribeLock.Unlock()

topics := k.subscribeTopics.TopicList()
if len(topics) == 0 {
// Nothing to subscribe to
return nil
}
k.consumer.skipConsume = true

ctxCreateFn := func() {
consumeCtx, cancel := context.WithCancel(context.Background())

k.consumer.consumeCtx = consumeCtx
k.consumer.consumeCancel = cancel

k.consumer.skipConsume = false
}

if k.cg == nil {
cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
if err != nil {
return err
}

k.cg = cg

ready := make(chan bool)
k.consumer = consumer{
k: k,
ready: ready,
running: make(chan struct{}),
}

ctxCreateFn()

go func() {
k.logger.Debugf("Subscribed and listening to topics: %s", topics)

for {
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
// us out of the consume loop
if ctx.Err() != nil {
k.logger.Info("Consume context cancelled")
break
}

k.logger.Debugf("Starting loop to consume.")

if k.consumer.skipConsume {
continue
}

topics = k.subscribeTopics.TopicList()

// Consume the requested topics
bo := backoff.WithContext(backoff.NewConstantBackOff(k.consumeRetryInterval), ctx)
innerErr := retry.NotifyRecover(func() error {
if ctxErr := ctx.Err(); ctxErr != nil {
return backoff.Permanent(ctxErr)
}
return k.cg.Consume(k.consumer.consumeCtx, topics, &(k.consumer))
}, bo, func(err error, t time.Duration) {
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
}, func() {
k.logger.Infof("Recovered consuming %v", topics)
})
if innerErr != nil && !errors.Is(innerErr, context.Canceled) {
k.logger.Errorf("Permanent error consuming %v: %v", topics, innerErr)
}
}

k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
err := k.cg.Close()
if err != nil {
k.logger.Errorf("Error closing consumer group: %v", err)
}

// Ensure running channel is only closed once.
if k.consumer.stopped.CompareAndSwap(false, true) {
close(k.consumer.running)
}
}()

<-ready
} else {
// The consumer group is already created and consuming topics. This means a new subscription is being added
k.consumer.consumeCancel()
ctxCreateFn()
}

return nil
}

// Close down consumer group resources, refresh once.
func (k *Kafka) closeSubscriptionResources() {
if k.cg != nil {
err := k.cg.Close()
if err != nil {
k.logger.Errorf("Error closing consumer group: %v", err)
}

k.consumer.once.Do(func() {
// Wait for shutdown to be complete
<-k.consumer.running
close(k.consumer.ready)
k.consumer.once = sync.Once{}
})
}
}
71 changes: 48 additions & 23 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/IBM/sarama"
Expand All @@ -34,19 +35,24 @@ import (

// Kafka allows reading/writing to a Kafka consumer group.
type Kafka struct {
producer sarama.SyncProducer
consumerGroup string
brokers []string
logger logger.Logger
authType string
saslUsername string
saslPassword string
initialOffset int64
producer sarama.SyncProducer
consumerGroup string
brokers []string
logger logger.Logger
authType string
saslUsername string
saslPassword string
initialOffset int64
config *sarama.Config

cg sarama.ConsumerGroup
consumer consumer
config *sarama.Config
subscribeTopics TopicHandlerConfig
subscribeLock sync.Mutex
consumerCancel context.CancelFunc
consumerWG sync.WaitGroup
closeCh chan struct{}
closed atomic.Bool
wg sync.WaitGroup

// schema registry settings
srClient srclient.ISchemaRegistryClient
Expand Down Expand Up @@ -106,7 +112,7 @@ func NewKafka(logger logger.Logger) *Kafka {
return &Kafka{
logger: logger,
subscribeTopics: make(TopicHandlerConfig),
subscribeLock: sync.Mutex{},
closeCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -184,11 +190,11 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {

// Default retry configuration is used if no
// backOff properties are set.
if err := retry.DecodeConfigWithPrefix(
if rerr := retry.DecodeConfigWithPrefix(
&k.backOffConfig,
metadata,
"backOff"); err != nil {
return err
"backOff"); rerr != nil {
return rerr
}
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
k.consumeRetryInterval = meta.ConsumeRetryInterval
Expand All @@ -207,22 +213,41 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
}
k.logger.Debug("Kafka message bus initialization complete")

k.cg, err = sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
if err != nil {
return err
}

return nil
}

func (k *Kafka) Close() (err error) {
k.closeSubscriptionResources()
func (k *Kafka) Close() error {
defer k.wg.Wait()
defer k.consumerWG.Wait()

if k.producer != nil {
err = k.producer.Close()
k.producer = nil
}
errs := make([]error, 2)
if k.closed.CompareAndSwap(false, true) {
close(k.closeCh)

if k.producer != nil {
errs[0] = k.producer.Close()
k.producer = nil
}

if k.internalContext != nil {
k.internalContextCancel()
}

k.subscribeLock.Lock()
if k.consumerCancel != nil {
k.consumerCancel()
}
k.subscribeLock.Unlock()

if k.internalContext != nil {
k.internalContextCancel()
errs[1] = k.cg.Close()
}

return err
return errors.Join(errs...)
}

func getSchemaSubject(topic string) string {
Expand Down
Loading