Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a2e63f9
chore: custom-balancer
saksham-datazip May 18, 2026
8450f96
chore: added rebalancing detection
saksham-datazip May 20, 2026
f75bf6e
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip May 20, 2026
507d220
chore: added-rebalancing-detection
saksham-datazip May 20, 2026
4b2fd9e
chore: self-reviewed-1
saksham-datazip May 21, 2026
8ea52d2
chore: resolved-lint-error
saksham-datazip May 21, 2026
5bb0008
chore: refractored-warmupConsumerGroup-function
saksham-datazip May 22, 2026
9e30f71
chore: improved-comment
saksham-datazip May 22, 2026
bcecada
chore: resolved-2
saksham-datazip May 22, 2026
4b55a71
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip May 22, 2026
54f54bb
chore: added-retry-logic
saksham-datazip May 22, 2026
2824e5a
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip May 24, 2026
a4831e7
chore: added-logs-handled-infinite-loop
saksham-datazip May 24, 2026
a18d75e
chore: added-custom-balancer
saksham-datazip May 28, 2026
dfd7b31
chore: resolved-lint-error
saksham-datazip May 28, 2026
3fa479b
chore: refractor-code
saksham-datazip May 28, 2026
6dce626
chore: refractor-code-2
saksham-datazip May 29, 2026
f6b1e1f
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip May 29, 2026
184ab58
chore: review-resolved-2
saksham-datazip Jun 2, 2026
2bf5a59
chore: lint-error-resolved
saksham-datazip Jun 2, 2026
ff0e5b2
chore: go-mod-tidy
saksham-datazip Jun 2, 2026
7d89ee1
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip Jun 4, 2026
8a1ba24
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip Jun 5, 2026
28f87ff
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip Jun 5, 2026
b80c8ff
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip Jun 5, 2026
f2012cb
chore: resolved-comments-3
saksham-datazip Jun 6, 2026
964e19b
chore: skipped-GroupIDNotFound
saksham-datazip Jun 8, 2026
1a574e8
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip Jun 9, 2026
f280872
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip Jun 13, 2026
83106a9
Merge branch 'staging' into feat/kafka-driver-change
saksham-datazip Jun 15, 2026
d0cfcf9
chore: refractored-code
saksham-datazip Jun 15, 2026
2bca2c6
Merge branch 'staging' into feat/kafka-driver-change
hash-data Jun 19, 2026
02667db
chore: reverted-lastMessages-log
saksham-datazip Jun 19, 2026
4bbb21c
Merge branch 'staging' into feat/kafka-driver-change
hash-data Jun 19, 2026
ac4978e
chore: added warn log
saksham-datazip Jun 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions drivers/db2/go.mod
Comment thread
ImDoubD-datazip marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,13 @@ require (
github.com/parquet-go/parquet-go v0.25.0 // indirect
github.com/paulmach/orb v0.12.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pierrec/lz4/v4 v4.1.26 // indirect
Comment thread
ImDoubD-datazip marked this conversation as resolved.
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/rs/zerolog v1.34.0 // indirect
github.com/sagikazarmark/locafero v0.8.0 // indirect
github.com/segmentio/kafka-go v0.4.49 // indirect
github.com/shirou/gopsutil/v4 v4.26.3 // indirect
github.com/sirupsen/logrus v1.9.4 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand All @@ -122,6 +121,8 @@ require (
github.com/testcontainers/testcontainers-go v0.42.0 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/twmb/franz-go v1.21.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.13.1 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/xitongsys/parquet-go v1.6.2 // indirect
github.com/xitongsys/parquet-go-source v0.0.0-20241021075129-b732d2ac9c9b // indirect
Expand Down
9 changes: 4 additions & 5 deletions drivers/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ require (
github.com/apache/arrow-go/v18 v18.2.0
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
github.com/linkedin/goavro/v2 v2.15.0
github.com/segmentio/kafka-go v0.4.49
github.com/stretchr/testify v1.11.1
github.com/twmb/franz-go v1.21.1
github.com/twmb/franz-go/pkg/kadm v1.18.0
)

require (
Expand Down Expand Up @@ -104,7 +105,7 @@ require (
github.com/parquet-go/parquet-go v0.25.0 // indirect
github.com/paulmach/orb v0.12.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
Expand All @@ -123,10 +124,8 @@ require (
github.com/testcontainers/testcontainers-go v0.42.0 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.13.1 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xitongsys/parquet-go v1.6.2 // indirect
github.com/xitongsys/parquet-go-source v0.0.0-20241021075129-b732d2ac9c9b // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
Expand Down
96 changes: 67 additions & 29 deletions drivers/kafka/internal/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/datazip-inc/olake/constants"
"github.com/datazip-inc/olake/drivers/abstract"
kafkapkg "github.com/datazip-inc/olake/pkg/kafka"
"github.com/datazip-inc/olake/types"
"github.com/datazip-inc/olake/utils"
"github.com/datazip-inc/olake/utils/logger"
"github.com/datazip-inc/olake/utils/typeutils"
"github.com/linkedin/goavro/v2"
"github.com/segmentio/kafka-go"
"github.com/twmb/franz-go/pkg/kgo"
)

// TODO: Add 2PC support for Kafka (difficulty: hard)
Expand Down Expand Up @@ -50,25 +52,34 @@ func (k *Kafka) PreCDC(ctx context.Context, streams []types.StreamInterface) err

// create a reader manager for kafka
k.readerManager = kafkapkg.NewReaderManager(kafkapkg.ReaderConfig{
BootstrapServers: k.config.BootstrapServers,
MaxThreads: k.config.MaxThreads,
ConsumerGroupID: k.consumerGroupID,
Dialer: k.dialer,
AdminClient: k.adminClient,
ThreadsEqualTotalPartitions: k.config.ThreadsEqualTotalPartitions,
})
Comment thread
ImDoubD-datazip marked this conversation as resolved.
return k.readerManager.CreateReaders(ctx, streams, k.consumerGroupID)

// remove stale consumers before creating new readers
if err := k.readerManager.RemoveExistingConsumers(ctx, k.client); err != nil {
Comment thread
vikaxsh marked this conversation as resolved.
Comment thread
vikaxsh marked this conversation as resolved.
return fmt.Errorf("failed to remove existing consumers: %s", err)
Comment thread
hash-data marked this conversation as resolved.
}

// create new readers and wait for partition assignment
return k.readerManager.CreateReaders(ctx, streams)
}

func (k *Kafka) StreamChanges(ctx context.Context, readerID int, metadataStates map[string]any, processFn abstract.CDCMsgFn) (any, error) {
// get reader
reader := k.readerManager.GetReader(readerID)
if reader == nil {
return nil, fmt.Errorf("reader not found for readerID %d", readerID)
// Restart the reader to create a fresh franz-go client for each StreamChanges attempt.
// franz-go keeps uncommitted offsets in memory, so restarting clears that state and
// ensures retries resume from the last committed offset.
// Note: Since a static instance ID is used, this restart does not trigger a consumer group rebalance.
reader, err := k.readerManager.RestartReader(readerID)
if err != nil {
return nil, fmt.Errorf("failed to restart reader %d: %s", readerID, err)
}

// track processing state
lastMessages := make(map[types.PartitionKey]kafka.Message)
lastMessages := make(map[types.PartitionKey]*kgo.Record)
// maintain completed partitions and observed partitions to track loop termination (for the current reader)
completedPartitions := make(map[types.PartitionKey]struct{}) // completed partitions by the current reader
observedPartitions := make(map[types.PartitionKey]struct{}) // cached partitions which are observed by the current reader
Expand All @@ -79,7 +90,7 @@ func (k *Kafka) StreamChanges(ctx context.Context, readerID int, metadataStates
}
}()

err := k.processKafkaMessages(ctx, reader, func(record types.KafkaRecord) (bool, error) {
err = k.processKafkaMessages(ctx, reader, func(record types.KafkaRecord) (bool, error) {
// get current partition metadata and key
currentPartitionKey := types.PartitionKey{Topic: record.Message.Topic, Partition: record.Message.Partition}
currentPartitionMeta, exists := k.readerManager.GetPartitionIndex(fmt.Sprintf("%s:%d", record.Message.Topic, record.Message.Partition))
Expand All @@ -91,7 +102,7 @@ func (k *Kafka) StreamChanges(ctx context.Context, readerID int, metadataStates
if record.Data != nil {
err := processFn(ctx, abstract.CDCChange{
Stream: currentPartitionMeta.Stream,
Timestamp: record.Message.Time,
Timestamp: record.Message.Timestamp,
Kind: "create",
Data: record.Data,
})
Expand Down Expand Up @@ -133,14 +144,14 @@ func (k *Kafka) PostCDC(ctx context.Context, readerIdx int) error {
}

// Type assert and validate messages
lastMessages, isValid := lastMessagesMeta.(map[types.PartitionKey]kafka.Message)
lastMessages, isValid := lastMessagesMeta.(map[types.PartitionKey]*kgo.Record)
Comment thread
hash-data marked this conversation as resolved.
if !isValid || len(lastMessages) == 0 {
logger.Infof("reader %s has no accumulated offsets to commit", readerID)
return nil
}

// Prepare messages for commit and track affected streams
messages := make([]kafka.Message, 0, len(lastMessages))
messages := make([]*kgo.Record, 0, len(lastMessages))
syncedStreams := make(map[string]types.StreamInterface)

for partitionKey, message := range lastMessages {
Expand All @@ -160,7 +171,10 @@ func (k *Kafka) PostCDC(ctx context.Context, readerIdx int) error {
return fmt.Errorf("reader %s not found for commit", readerID)
}

if err := reader.CommitMessages(ctx, messages...); err != nil {
_, generationID := reader.GroupMetadata()
Comment thread
ImDoubD-datazip marked this conversation as resolved.
logger.Debugf("reader %s post cdc: generation id: %d", readerID, generationID)

if err := reader.CommitRecords(ctx, messages...); err != nil {
return fmt.Errorf("commit failed for reader %s: %s", readerID, err)
}

Expand All @@ -181,29 +195,54 @@ func (k *Kafka) PostCDC(ctx context.Context, readerIdx int) error {
}
}

// for processing messages from a Kafka reader.
func (k *Kafka) processKafkaMessages(ctx context.Context, reader *kafka.Reader, stopProcessFn func(record types.KafkaRecord) (bool, error)) error {
// processKafkaMessages processes messages from a Kafka reader
// until stopProcessFn signals stop, a rebalance is detected, or the poll times out (reader caught up).
func (k *Kafka) processKafkaMessages(ctx context.Context, reader *kgo.Client, stopProcessFn func(record types.KafkaRecord) (bool, error)) error {
Comment thread
ImDoubD-datazip marked this conversation as resolved.
var iter *kgo.FetchesRecordIter

for {
message, err := reader.FetchMessage(ctx)
if err != nil {
return fmt.Errorf("error reading message in Kafka CDC sync: %s", err)
// checked before every poll and every record so a rebalance signal is never delayed by full batch processing.
if stopProcessing, err := k.readerManager.FetchExitState(); stopProcessing {
Comment thread
hash-data marked this conversation as resolved.
Outdated
return err
}

var (
key string
data map[string]interface{}
)
// iter being nil triggers first poll.
// iter.Done() being true triggers next polls when the current batch is fully consumed.
if iter == nil || iter.Done() {
pollCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
Comment thread
vikaxsh marked this conversation as resolved.
Outdated
// poll for new messages
fetches := reader.PollFetches(pollCtx)
pollCtxErr := pollCtx.Err()
cancel()

// no new messages for 10s means reader has caught up; exit cleanly without error.
if errors.Is(pollCtxErr, context.DeadlineExceeded) {
logger.Warnf("poll context deadline exceeded: %s", pollCtxErr)
return nil
}

// any fetch error (including parent ctx cancellation) is non-retryable.
// For more info, go through the documentation: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Fetches.Errors
if errs := fetches.Errors(); len(errs) > 0 {
return fmt.Errorf("%w: error reading message in Kafka CDC sync: %s", constants.ErrNonRetryable, errs[0].Err)
Comment thread
vikaxsh marked this conversation as resolved.
Outdated
}

// parse message value and key
data, key, err = k.parseKafkaData(message)
// wrap batch into iterator
iter = fetches.RecordIter()
// check exit state again before processing the batch
continue
}

message := iter.Next()
Comment thread
hash-data marked this conversation as resolved.
Outdated
data, key, err := k.parseKafkaData(message)
if err != nil {
logger.Warnf("failed to parse message of topic: %s, partition: %d, offset %d, error: %s", message.Topic, message.Partition, message.Offset, err)
} else if data != nil {
// data map will be nil (in cases like null and unparseable message values) so nil check is required
data[Partition] = message.Partition
data[Offset] = message.Offset
data[Key] = key
data[KafkaTimestamp], err = typeutils.ReformatDate(message.Time, true)
data[KafkaTimestamp], err = typeutils.ReformatDate(message.Timestamp, true)
if err != nil {
return fmt.Errorf("failed to reformat date: %s", err)
}
Expand All @@ -214,13 +253,12 @@ func (k *Kafka) processKafkaMessages(ctx context.Context, reader *kafka.Reader,
return err
}
if stopProcessing {
break
return nil
}
}
return nil
}

func (k *Kafka) parseKafkaData(message kafka.Message) (map[string]interface{}, string, error) {
func (k *Kafka) parseKafkaData(message *kgo.Record) (map[string]interface{}, string, error) {
// helper to parse data bytes (value or key)
parseData := func(data []byte) (interface{}, error) {
// if data is not in confluent wire format, it is assumed to be standard json currently
Expand Down Expand Up @@ -272,7 +310,7 @@ func (k *Kafka) parseKafkaData(message kafka.Message) (map[string]interface{}, s
parsedKey, err := parseData(message.Key)
if err != nil {
// standard fallback: raw key as string
logger.Warnf("failed to parse key at offset %d: %s, using raw string", message.Offset, err)
logger.Warnf("failed to parse key for topic=%s partition=%d offset=%d: %s, using raw string", message.Topic, message.Partition, message.Offset, err)
keyValue = string(message.Key)
} else {
switch v := parsedKey.(type) {
Expand Down
Loading
Loading