Skip to content

Enhancing xkafka.Consumer for Ordered Async Message Processing #30

@ajatprabha

Description

@ajatprabha

Proposal Overview

This proposal aims to modify the behavior of xkafka.Consumer in asynchronous consumption contexts to allow better control over the order of message consumption.


Problem Statement

The current implementation of xkafka.Concurrency lacks the capability to control when message processing starts. This limitation becomes significant in cases where messages must be processed in a specific order for correct application behavior. For instance, messages with the same Key require that while one message is under processing, the subsequent message should wait, ensuring in-order execution. This kind of synchronization is absent in the Async mode.

Proposed Solution

Introduce a semaphore-like mechanism that the asynchronous consumer can use to determine if it should start processing a message or wait.

Proposed API Changes

  1. Define a new structure MessageMetadata and an interface ConsumerSemaphore:

    type MessageMetadata struct {
    	Key		[]byte
    	Partition	int32
    	Offset	int64
    }
    
    type ConsumerSemaphore interface {
    	AcquireLock(*MessageMetadata)
    	ReleaseLock(*MessageMetadata)
    }
  2. Integrate the semaphore into the Consumer structure and modify the message processing flow to utilize this semaphore. Key changes include:

    • Adding a semaphore field to the Consumer struct.
    • Adjusting the async message processing routine to acquire and release locks based on message metadata.

    diff --git a/xkafka/consumer.go b/xkafka/consumer.go
    index d744033..72e99ba 100644
    --- a/xkafka/consumer.go
    +++ b/xkafka/consumer.go
    @@ -18,6 +18,7 @@ type Consumer struct {
     	handler     Handler
     	middlewares []middleware
     	config      options
    +	sema        ConsumerSemaphore
     }
    
    // NewConsumer creates a new Consumer instance.
    @@ -48,6 +49,7 @@ func NewConsumer(name string, handler Handler, opts ...Option) (*Consumer, error
     		config:  cfg,
     		kafka:   consumer,
     		handler: handler,
    +		sema:    cfg.sema,
     	}, nil
    }
    
    @@ -168,20 +170,20 @@ func (c *Consumer) runAsync(ctx context.Context) error {
    
    			msg := newMessage(c.name, km)
    
    -			st.Go(func() stream.Callback {
    -				err := c.handler.Handle(ctx, msg)
    -				if ferr := c.config.errorHandler(err); ferr != nil {
    -					cancel(ferr)
    -
    -					return func() {}
    +			if c.sema != nil {
    +				mm := &MessageMetadata{
    +					Key:       msg.Key,
    +					Partition: msg.Partition,
    +					Offset:    msg.Offset,
    				}
    
    -				return func() {
    -					if err := c.storeMessage(msg); err != nil {
    -						cancel(err)
    -					}
    -				}
    -			})
    +				c.sema.AcquireLock(mm)
    +				st.Go(c.taskWithLock(ctx, cancel, msg, mm))
    +
    +				continue
    +			}
    +
    +			st.Go(c.task(ctx, cancel, msg))
     		}
     	}
     }
    @@ -239,3 +241,41 @@ func (c *Consumer) Close() {
     
     	_ = c.kafka.Close()
     }
    +
    +func (c *Consumer) taskWithLock(ctx context.Context, cancel context.CancelCauseFunc, msg *Message, mm *MessageMetadata) stream.Task {
    +	return func() stream.Callback {
    +		err := c.handler.Handle(ctx, msg)
    +		if ferr := c.config.errorHandler(err); ferr != nil {
    +			cancel(ferr)
    +
    +			return func() {
    +				c.sema.ReleaseLock(mm)
    +			}
    +		}
    +
    +		return func() {
    +			if err := c.storeMessage(msg); err != nil {
    +				cancel(err)
    +			}
    +
    +			c.sema.ReleaseLock(mm)
    +		}
    +	}
    +}
    +
    +func (c *Consumer) task(ctx context.Context, cancel context.CancelCauseFunc, msg *Message) stream.Task {
    +	return func() stream.Callback {
    +		err := c.handler.Handle(ctx, msg)
    +		if ferr := c.config.errorHandler(err); ferr != nil {
    +			cancel(ferr)
    +
    +			return func() {}
    +		}
    +
    +		return func() {
    +			if err := c.storeMessage(msg); err != nil {
    +				cancel(err)
    +			}
    +		}
    +	}
    +}
    diff --git a/xkafka/options.go b/xkafka/options.go
    index f7e147d..a5f10e8 100644
    --- a/xkafka/options.go
    +++ b/xkafka/options.go
    @@ -84,6 +84,7 @@ type options struct {
     	pollTimeout     time.Duration
     	concurrency     int
     	manualCommit    bool
    +	sema            ConsumerSemaphore
     
     	// producer options
     	producerFn producerFunc

Expected Benefits

Implementing this solution would provide the needed control over message processing order in asynchronous scenarios, enhancing the reliability and correctness of the application using xkafka.Consumer.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions