Skip to content

Commit 4b2fd9e

Browse files
chore: self-reviewed-1
1 parent 507d220 commit 4b2fd9e

7 files changed

Lines changed: 203 additions & 222 deletions

File tree

drivers/kafka/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ require (
1010
github.com/apache/arrow-go/v18 v18.2.0
1111
github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000
1212
github.com/linkedin/goavro/v2 v2.15.0
13-
github.com/segmentio/kafka-go v0.4.49
1413
github.com/stretchr/testify v1.11.1
1514
github.com/twmb/franz-go v1.21.1
1615
github.com/twmb/franz-go/pkg/kadm v1.18.0

drivers/kafka/internal/cdc.go

Lines changed: 10 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ import (
1616
"github.com/datazip-inc/olake/utils/logger"
1717
"github.com/datazip-inc/olake/utils/typeutils"
1818
"github.com/linkedin/goavro/v2"
19-
"github.com/twmb/franz-go/pkg/kadm"
2019
"github.com/twmb/franz-go/pkg/kgo"
21-
"github.com/twmb/franz-go/pkg/kmsg"
2220
)
2321

2422
// TODO: Add 2PC support for Kafka (difficulty: hard)
@@ -52,21 +50,19 @@ func (k *Kafka) PreCDC(ctx context.Context, streams []types.StreamInterface) err
5250
k.streams = streams
5351
logger.Infof("configured consumer group id: %s", k.consumerGroupID)
5452

55-
// remove existing consumers before creating new readers
56-
err := k.RemoveExistingConsumers(ctx)
57-
if err != nil {
58-
return fmt.Errorf("failed to remove existing consumers: %w", err)
59-
}
60-
6153
// create a reader manager for kafka
6254
k.readerManager = kafkapkg.NewReaderManager(kafkapkg.ReaderConfig{
63-
BootstrapServers: k.config.BootstrapServers,
6455
MaxThreads: k.config.MaxThreads,
6556
ConsumerGroupID: k.consumerGroupID,
6657
Dialer: k.dialer,
67-
Client: k.client,
58+
Admin: k.admin,
6859
ThreadsEqualTotalPartitions: k.config.ThreadsEqualTotalPartitions,
6960
})
61+
// remove stale consumers before creating new readers
62+
if err := k.readerManager.RemoveExistingConsumers(ctx, k.client); err != nil {
63+
return fmt.Errorf("failed to remove existing consumers: %v", err)
64+
}
65+
// create new readers
7066
return k.readerManager.CreateReaders(ctx, streams, k.consumerGroupID)
7167
}
7268

@@ -136,7 +132,7 @@ func (k *Kafka) StreamChanges(ctx context.Context, readerID int, metadataStates
136132
completedPartitions[currentPartitionKey] = struct{}{}
137133

138134
// check for all other assigned partitions to see if they are also completed
139-
shouldExit, err := k.checkPartitionCompletion(ctx, readerID, reader, completedPartitions, observedPartitions)
135+
shouldExit, err := k.checkPartitionCompletion(ctx, readerID, completedPartitions, observedPartitions)
140136
if err != nil || shouldExit {
141137
return shouldExit, err
142138
}
@@ -220,7 +216,7 @@ func (k *Kafka) processKafkaMessages(ctx context.Context, reader *kgo.Client, st
220216
messages := reader.PollFetches(ctx)
221217
errs := messages.Errors()
222218
if len(errs) > 0 {
223-
return fmt.Errorf("%w: error reading message in Kafka CDC sync: %w", constants.ErrNonRetryable, errs[0].Err)
219+
return fmt.Errorf("%v: error reading message in Kafka CDC sync: %v", constants.ErrNonRetryable, errs[0].Err)
224220
}
225221

226222
records := messages.RecordIter()
@@ -241,6 +237,7 @@ func (k *Kafka) processKafkaMessages(ctx context.Context, reader *kgo.Client, st
241237
key string
242238
data map[string]interface{}
243239
)
240+
244241
// parse message value and key
245242
data, key, err = k.parseKafkaData(message)
246243
if err != nil {
@@ -319,7 +316,7 @@ func (k *Kafka) parseKafkaData(message *kgo.Record) (map[string]interface{}, str
319316
parsedKey, err := parseData(message.Key)
320317
if err != nil {
321318
// standard fallback: raw key as string
322-
logger.Warnf("failed to parse key at offset %d: %s, using raw string", message.Offset, err)
319+
logger.Warnf("failed to parse key for topic=%s partition=%d offset=%d: %s, using raw string", message.Topic, message.Partition, message.Offset, err)
323320
keyValue = string(message.Key)
324321
} else {
325322
switch v := parsedKey.(type) {
@@ -368,49 +365,3 @@ func decodeAvroMessage(data []byte, codec *goavro.Codec) (interface{}, error) {
368365
}
369366
return nativeDatum, nil
370367
}
371-
372-
// RemoveExistingConsumers force removes all existing consumers from the consumer group.
373-
func (k *Kafka) RemoveExistingConsumers(ctx context.Context) error {
374-
admin := kadm.NewClient(k.client)
375-
376-
groups, err := admin.DescribeGroups(ctx, k.consumerGroupID)
377-
if err != nil {
378-
return fmt.Errorf("describe groups failed: %w", err)
379-
}
380-
381-
group, ok := groups[k.consumerGroupID]
382-
if !ok || len(group.Members) == 0 {
383-
return nil
384-
}
385-
386-
if group.Err != nil {
387-
return fmt.Errorf("describe groups error: %w", group.Err)
388-
}
389-
390-
req := kmsg.NewPtrLeaveGroupRequest()
391-
req.Group = k.consumerGroupID
392-
393-
for _, member := range group.Members {
394-
req.Members = append(req.Members, kmsg.LeaveGroupRequestMember{
395-
MemberID: member.MemberID,
396-
InstanceID: func() *string {
397-
if member.InstanceID == nil {
398-
return nil
399-
}
400-
v := *member.InstanceID
401-
return &v
402-
}(),
403-
})
404-
}
405-
406-
resp, err := req.RequestWith(ctx, k.client)
407-
if err != nil {
408-
return fmt.Errorf("leave group request failed: %w", err)
409-
}
410-
411-
if resp.ErrorCode != 0 {
412-
return fmt.Errorf("leave group error code: %d", resp.ErrorCode)
413-
}
414-
415-
return nil
416-
}

drivers/kafka/internal/kafka.go

Lines changed: 34 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"regexp"
99
"slices"
10-
"strings"
1110
"sync"
1211
"time"
1312

@@ -48,6 +47,7 @@ type Kafka struct {
4847
readerManager *kafkapkg.ReaderManager
4948
checkpointMessage sync.Map // last message for each reader w.r.t. partition to be used for checkpointing
5049
schemaRegistryClient *kafkapkg.SchemaRegistryClient
50+
admin *kadm.Client
5151
}
5252

5353
func (k *Kafka) GetConfigRef() abstract.Config {
@@ -100,9 +100,11 @@ func (k *Kafka) Setup(ctx context.Context) error {
100100
}
101101

102102
k.client = client
103+
k.admin = kadm.NewClient(client)
103104

104105
// Test connectivity by fetching metadata
105-
if err := client.Ping(ctx); err != nil {
106+
err = client.Ping(ctx)
107+
if err != nil {
106108
return fmt.Errorf("failed to ping kafka brokers: %s", err)
107109
}
108110

@@ -125,16 +127,22 @@ func (k *Kafka) Setup(ctx context.Context) error {
125127
}
126128

127129
func (k *Kafka) Close() error {
130+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
131+
defer cancel()
132+
133+
if k.readerManager != nil {
134+
k.readerManager.RemoveExistingConsumers(ctx, k.client)
135+
}
136+
128137
if k.client != nil {
129138
k.client.Close()
130139
}
131-
k.readerManager.Close()
132140
return nil
133141
}
134142

135143
func (k *Kafka) GetStreamNames(ctx context.Context) ([]string, error) {
136144
logger.Infof("Starting discover for Kafka")
137-
metadata, err := kadm.NewClient(k.client).ListTopics(ctx)
145+
metadata, err := k.admin.ListTopics(ctx)
138146
if err != nil {
139147
return nil, fmt.Errorf("failed to list topics: %s", err)
140148
}
@@ -160,9 +168,8 @@ func (k *Kafka) ProduceSchema(ctx context.Context, streamName string) (*types.St
160168

161169
// create reader manager for schema discovery
162170
readerManager := kafkapkg.NewReaderManager(kafkapkg.ReaderConfig{
163-
BootstrapServers: k.config.BootstrapServers,
164-
Dialer: k.dialer,
165-
Client: k.client,
171+
Dialer: k.dialer,
172+
Admin: k.admin,
166173
})
167174

168175
// get the topic metadata
@@ -171,21 +178,19 @@ func (k *Kafka) ProduceSchema(ctx context.Context, streamName string) (*types.St
171178
return nil, fmt.Errorf("failed to fetch topic metadata for topic %s: %s", streamName, err)
172179
}
173180

174-
admin := kadm.NewClient(k.client)
175-
176181
// get offsets for all partitions
177-
startOffsets, err := admin.ListStartOffsets(ctx, streamName)
182+
startOffsets, err := k.admin.ListStartOffsets(ctx, streamName)
178183
if err != nil {
179184
return nil, fmt.Errorf("failed to list start offsets for topic %s: %s", streamName, err)
180185
}
181186

182-
endOffsets, err := admin.ListEndOffsets(ctx, streamName)
187+
endOffsets, err := k.admin.ListEndOffsets(ctx, streamName)
183188
if err != nil {
184189
return nil, fmt.Errorf("failed to list end offsets for topic %s: %s", streamName, err)
185190
}
186191

187192
if topicDetail.Err != nil {
188-
return nil, fmt.Errorf("topic metadata for %s: %w", streamName, topicDetail.Err)
193+
return nil, fmt.Errorf("topic metadata for %s: %v", streamName, topicDetail.Err)
189194
}
190195

191196
partitionList := topicDetail.Partitions.Sorted()
@@ -194,7 +199,7 @@ func (k *Kafka) ProduceSchema(ctx context.Context, streamName string) (*types.St
194199
// get messages from partitions for schema discovery
195200
err = utils.Concurrent(ctx, partitionList, len(partitionList), func(ctx context.Context, partitionDetail kadm.PartitionDetail, _ int) error {
196201
if partitionDetail.Err != nil {
197-
return fmt.Errorf("partition %d: %w", partitionDetail.Partition, partitionDetail.Err)
202+
return fmt.Errorf("partition %d: %v", partitionDetail.Partition, partitionDetail.Err)
198203
}
199204

200205
startOffset, exists := startOffsets.Lookup(streamName, partitionDetail.Partition)
@@ -232,7 +237,7 @@ func (k *Kafka) ProduceSchema(ctx context.Context, streamName string) (*types.St
232237
fetchCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
233238
defer cancel()
234239

235-
return k.processKafkaMessages(fetchCtx, reader, func(record types.KafkaRecord) (bool, error) {
240+
_ = k.processKafkaMessages(fetchCtx, reader, func(record types.KafkaRecord) (bool, error) {
236241
messageCount++
237242
if record.Data != nil {
238243
mu.Lock()
@@ -248,6 +253,7 @@ func (k *Kafka) ProduceSchema(ctx context.Context, streamName string) (*types.St
248253
shouldExit := messageCount >= 10000 || record.Message.Offset >= endOffset.Offset-1
249254
return shouldExit, nil
250255
})
256+
return nil
251257
})
252258
if err != nil {
253259
return nil, fmt.Errorf("failed to fetch schema for topic %s: %s", streamName, err)
@@ -363,7 +369,7 @@ func (k *Kafka) buildTLSConfig() (*tls.Config, error) {
363369
}
364370

365371
// checkPartitionCompletion checks if a partition is complete and handles loop termination
366-
func (k *Kafka) checkPartitionCompletion(ctx context.Context, readerID int, reader *kgo.Client, completedPartitions, observedPartitions map[types.PartitionKey]struct{}) (bool, error) {
372+
func (k *Kafka) checkPartitionCompletion(ctx context.Context, readerID int, completedPartitions, observedPartitions map[types.PartitionKey]struct{}) (bool, error) {
367373
// cache observed partitions
368374
if len(observedPartitions) == 0 {
369375
// Ensure we have all assigned partitions tracked
@@ -377,53 +383,37 @@ func (k *Kafka) checkPartitionCompletion(ctx context.Context, readerID int, read
377383
observedPartitions[assignedPk] = struct{}{}
378384
}
379385
}
380-
// DescribeGroups member matching can miss the live consumer; fall back to what this client is actually consuming.
381-
if len(observedPartitions) == 0 && reader != nil {
382-
for topic, parts := range reader.UncommittedOffsets() {
383-
for part := range parts {
384-
pk := types.PartitionKey{Topic: topic, Partition: int(part)}
385-
if _, exists := k.readerManager.GetPartitionIndex(fmt.Sprintf("%s:%d", pk.Topic, pk.Partition)); exists {
386-
observedPartitions[pk] = struct{}{}
387-
}
388-
}
389-
}
390-
}
391386
}
392387

393-
// Require a non-empty observed set so 0==0 is not treated as "done" before we know assignments.
394-
if len(observedPartitions) == 0 {
395-
return false, nil
396-
}
388+
// exit when all partitions are done
397389
return len(completedPartitions) == len(observedPartitions), nil
398390
}
399391

400392
// getReaderAssignedPartitions queries the consumer group and returns topic/partition pairs
401-
// assigned to the reader identified by readerIndex. We match on the per-reader ClientID.
393+
// assigned to the reader identified by readerIndex. We match on the per-reader readerID.
402394
func (k *Kafka) getReaderAssignedPartitions(ctx context.Context, readerIndex int) ([]types.PartitionKey, error) {
403-
readerID, clientID := k.readerManager.GetReaderIDAndClientID(readerIndex)
404-
if clientID == "" {
405-
return nil, fmt.Errorf("clientID not found for reader %s", readerID)
395+
readerID, _ := k.readerManager.GetReaderIDAndClientID(readerIndex)
396+
if readerID == "" {
397+
return nil, fmt.Errorf("readerID not found for reader index %d", readerIndex)
406398
}
407399

408-
admin := kadm.NewClient(k.client)
409-
410-
resp, err := admin.DescribeGroups(ctx, k.consumerGroupID)
400+
response, err := k.admin.DescribeGroups(ctx, k.consumerGroupID)
411401
if err != nil {
412-
return nil, fmt.Errorf("DescribeGroups failed: %w", err)
402+
return nil, fmt.Errorf("DescribeGroups failed: %s", err)
413403
}
414404

415-
if err := resp.Error(); err != nil {
416-
return nil, fmt.Errorf("DescribeGroups response error: %w", err)
405+
if err := response.Error(); err != nil {
406+
return nil, fmt.Errorf("DescribeGroups response error: %s", err)
417407
}
418408

419409
var assigned []types.PartitionKey
420-
for _, group := range resp {
410+
for _, group := range response {
421411
if group.Group != k.consumerGroupID || group.Err != nil {
422412
continue
423413
}
424414
for _, member := range group.Members {
425-
// try to match the client we created: primary on ClientID, fallback to MemberID or suffix match
426-
if member.ClientID != clientID && member.MemberID != clientID && !strings.Contains(member.ClientID, readerID) && !strings.Contains(member.MemberID, readerID) {
415+
// try to match the reader we created: primary on readerID
416+
if member.InstanceID == nil || *member.InstanceID != readerID {
427417
continue
428418
}
429419

@@ -434,10 +424,7 @@ func (k *Kafka) getReaderAssignedPartitions(ctx context.Context, readerIndex int
434424

435425
for _, topic := range assignment.Topics {
436426
for _, partition := range topic.Partitions {
437-
assigned = append(assigned, types.PartitionKey{
438-
Topic: topic.Topic,
439-
Partition: int(partition),
440-
})
427+
assigned = append(assigned, types.PartitionKey{Topic: topic.Topic, Partition: int(partition)})
441428
}
442429
}
443430
}

0 commit comments

Comments
 (0)