Skip to content

feat: kafka driver change#958

Open
saksham-datazip wants to merge 31 commits into
stagingfrom
feat/kafka-driver-change
Open

feat: kafka driver change#958
saksham-datazip wants to merge 31 commits into
stagingfrom
feat/kafka-driver-change

Conversation

@saksham-datazip

@saksham-datazip saksham-datazip commented May 20, 2026

Copy link
Copy Markdown
Collaborator

Description

Migrated the Kafka consumer implementation from Segment Kafka-Go to Franz-Go to leverage improved consumer group management, rebalance handling, and lifecycle APIs provided by Franz-Go.

As part of this migration:

  • Added support for static membership using instance.id to improve retry and reconnect behavior. This helps avoid unnecessary rebalances during transient failures or consumer restarts when the same instance rejoins the group.

  • Implemented rebalance detection using Franz-Go consumer group callbacks along with generation ID tracking stored in consumer metadata. During sync execution, the active generation is continuously validated against the latest assigned generation to detect stale consumers or lost partition ownership.

  • Added graceful shutdown handling on successful rebalance detection. Instead of continuing to process records with outdated assignments, the sync now exits cleanly to prevent duplicate processing and stale partition consumption during consumer group transitions.

Fixes #794

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

  • Tested retry scenarios by intentionally returning errors from StreamChanges. Verified that retries worked correctly and did not trigger unnecessary consumer group rebalances due to static membership support using instance.id.
  • Stopped the sync during active processing and triggered a consumer group rebalance. Verified that the sync detected the rebalance successfully and exited gracefully without processing duplicate data.

Screenshots or Recordings

N/A

Documentation

  • Documentation Link: [link to README, olake.io/docs, or olake-docs]
  • N/A (bug fix, refactor, or test changes only)

Related PR's (If Any):

@saksham-datazip saksham-datazip changed the title Feat/kafka driver change feat/kafka driver change May 20, 2026
@saksham-datazip saksham-datazip changed the title feat/kafka driver change feat/ kafka driver change May 20, 2026
@saksham-datazip saksham-datazip changed the title feat/ kafka driver change feat: kafka driver change May 20, 2026
Comment thread pkg/kafka/reader.go
Comment thread drivers/kafka/internal/cdc.go

@hash-data hash-data left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice Code

Comment thread pkg/kafka/reader.go Outdated
joinCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
for {
if joinCtx.Err() != nil {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err can be other reasons as well not only time out


// remove stale consumers before creating new readers
if err := k.readerManager.RemoveExistingConsumers(ctx, k.client); err != nil {
return fmt.Errorf("failed to remove existing consumers: %s", err)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove it

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not able to consistently reproduce this issue on my machine under normal conditions. However, after increasing the session timeout to 2 minutes, commenting out the removeExistingConsumer logic, and adding additional debug logging for investigation, I was able to observe that extra consumers were accumulating in the consumer group during rebalancing.

One possible outcome of this situation is that all partitions get assigned to the pre-existing consumers, leaving the newly created consumer with no partitions. In that case, PollFetches eventually times out because there are no assigned partitions to fetch from, causing the sync to exit without processing any records. However, this is only one possible explanation and not a confirmed root cause. There may be other unexpected behaviors or outcomes resulting from the presence of these extra consumers as well.

To further validate the behavior, I re-enabled the removeExistingConsumer logic and added additional logging inside the function to track how many consumers were being removed. Following the same reproduction steps, the logs showed that the function was working as expected and successfully removing the stale consumers.
Here is the reference link for the same :-
https://datazip.atlassian.net/wiki/x/AQCsI

Comment thread drivers/kafka/internal/cdc.go Outdated
continue
}

message := iter.Next()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many records will be there in a batch?
and also there was a case where message can be large then 1 mb?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check default timeout

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This batch size behavior already existed in Segment, where we had a 10 MB limit, so I kept the same limit in franz-go as well.

What this means is that once the broker has accumulated data up to this limit, it will return the batch. If a single message itself is larger than 10 MB, Kafka will still return that message in a single batch, ignoring the configured fetch limit for that request. I verified this by testing with a 61 MB message.

Regarding how these settings work:

  • MinFetchSize: Specifies the minimum amount of data the broker should try to accumulate before returning a fetch response.
  • MaxFetchSize: Sets an upper limit on the amount of data returned in a fetch response. However, if the first available message exceeds this limit, Kafka will still return that message in a single batch.
  • FetchMaxWaitMs: Specifies how long the broker should wait for data to become available. If the minimum fetch size is not reached within this timeout, the broker returns whatever data is available (or an empty response).

Regarding the default poll timeout we discussed earlier, that behavior is not supported in franz-go. Unlike the Java consumer, franz-go does not enforce a maximum time between polls because heartbeats are handled independently in the background.

Reference: twmb/franz-go#140

Comment thread pkg/kafka/balancer.go
func (b *CustomGroupBalancer) UserData() ([]byte, error) {
return nil, nil
// IsCooperative returns false to indicate that the balancer is not cooperative.
func (b *CustomGroupBalancer) IsCooperative() bool {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment why we are not using ?

@saksham-datazip saksham-datazip Jun 13, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Kafka, when isCooperative is set to false, any rebalance causes all consumers to first have their partitions revoked before receiving new assignments. However, in the Franz-go library, there are three rebalance callbacks. During experimentation, I observed that OnPartitionsAssigned is triggered for all consumers on every rebalance, regardless of whether any partitions are assigned to that particular consumer, even when cooperative mode is set to true. Because of this behavior, either approach works for our use case, as the callback is still invoked during rebalances.So due to this we dont need any comment as of odd behavior from franz-go side

Comment thread pkg/kafka/balancer.go Outdated

// number of consumers to use
consumerIDCount := min(b.requiredConsumerIDs, len(members))
consumerCount := min(b.requiredConsumerIDs, len(members))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this statement?

@saksham-datazip saksham-datazip Jun 14, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is not required it was just in staging so didnt removed but removed this now

Comment thread pkg/kafka/reader.go
// Exit gracefully when a rebalance is detected via assign/revoke callbacks.
onRebalance := func(_ context.Context, client *kgo.Client, _ map[string][]int32) {
if r.RebalanceDetected(client) {
r.exitMode.Store(gracefulExit)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will create problem in MO

@saksham-datazip saksham-datazip Jun 13, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a discussion with you regarding it, so no need for it now because rebalance detetection callbacks are only valid in streamchanges not in preCDC.

Comment thread pkg/kafka/reader.go Outdated
Comment on lines +380 to +384
// generation id -1 means not yet joined
// mismatch means readers are on different generations, partition assignment not yet completed
if currentReaderGenerationID < 0 || (expectedGenerationID >= 0 && expectedGenerationID != currentReaderGenerationID) {
allReadersJoined = false
break

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to understand again

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// waitForPartitionAssignment blocks until Kafka completes partition assignment
// for all readers in the consumer group.
func (r *ReaderManager) waitForPartitionAssignment(ctx context.Context) error {
	joinCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
	defer cancel()
	for {
		select {
			case <-joinCtx.Done():
				return fmt.Errorf("timed out waiting for partition assignment on consumer group %s: %s", r.config.ConsumerGroupID, joinCtx.Err())
			case <-time.After(2 * time.Second):
				var (
					allReadersJoined           = true
					expectedGenerationID int32 = -1
				)
				for _, kafkaReader := range r.readers {
					_, currentReaderGenerationID := kafkaReader.reader.GroupMetadata()
		
					// generation id -1 means not yet joined
					// mismatch means readers are on different generations, partition assignment not yet completed
					if currentReaderGenerationID < 0 || (expectedGenerationID >= 0 && expectedGenerationID != currentReaderGenerationID) {
						allReadersJoined = false
						break
					}
					if expectedGenerationID < 0 {
						expectedGenerationID = currentReaderGenerationID
					}
				}
		
				if allReadersJoined {
					r.generationID.Store(expectedGenerationID)
					// brief wait to let partition assignment fully propagate before fetching starts.
					time.Sleep(2 * time.Second)
					logger.Infof("consumer group %s stable: all readers assigned, generation id: %d", r.config.ConsumerGroupID, expectedGenerationID)
					return nil
				}
		}
	}
}

@saksham-datazip saksham-datazip Jun 14, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously on first run it use to first check all Readers readerId and than wait for 500ms now i changes it andi it will first wait 500ms and than start to check all readers readerId but since in other part of code we are using this and no big changes so i changed it.

Comment thread drivers/kafka/internal/cdc.go Outdated
// get current partition metadata and key
currentPartitionKey := types.PartitionKey{Topic: record.Message.Topic, Partition: record.Message.Partition}
currentPartitionMeta, exists := k.readerManager.GetPartitionIndex(fmt.Sprintf("%s:%d", record.Message.Topic, record.Message.Partition))
currentPartitionMeta, exists := k.readerManager.GetPartitionIndex(kafkapkg.PartitionIndexKey(record.Message.Topic, record.Message.Partition))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function name seem not conveying what actully it does

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

Comment thread drivers/kafka/internal/cdc.go Outdated
if err != nil {
return fmt.Errorf("error reading message in Kafka CDC sync: %s", err)
// checked before every poll and every record so a rebalance signal is never delayed by full batch processing.
if stopProcessing, err := k.readerManager.FetchExitState(); stopProcessing {

@hash-data hash-data Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use context here as well

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed


// Type assert and validate messages
lastMessages, isValid := lastMessagesMeta.(map[types.PartitionKey]kafka.Message)
lastMessages, isValid := lastMessagesMeta.(map[types.PartitionKey]*kgo.Record)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not it fail?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

@saksham-datazip

Copy link
Copy Markdown
Collaborator Author

Nice Code

Thnks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants