Skip to content

Commit dfd7b31

Browse files
chore: resolved-lint-error
1 parent a18d75e commit dfd7b31

5 files changed

Lines changed: 19 additions & 25 deletions

File tree

drivers/kafka/internal/cdc.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (k *Kafka) StreamChanges(ctx context.Context, readerID int, metadataStates
9191

9292
err = k.processKafkaMessages(ctx, reader, func(record types.KafkaRecord) (bool, error) {
9393
// get current partition metadata and key
94-
currentPartitionKey := types.PartitionKey{Topic: record.Message.Topic, Partition: int(record.Message.Partition)}
94+
currentPartitionKey := types.PartitionKey{Topic: record.Message.Topic, Partition: record.Message.Partition}
9595
currentPartitionMeta, exists := k.readerManager.GetPartitionIndex(fmt.Sprintf("%s:%d", record.Message.Topic, record.Message.Partition))
9696
if !exists {
9797
return false, fmt.Errorf("missing partition index for topic %s partition %d", record.Message.Topic, record.Message.Partition)
@@ -197,19 +197,19 @@ func (k *Kafka) PostCDC(ctx context.Context, readerIdx int) error {
197197
// for processing messages from a Kafka reader.
198198
func (k *Kafka) processKafkaMessages(ctx context.Context, reader *kgo.Client, stopProcessFn func(record types.KafkaRecord) (bool, error)) error {
199199
for {
200+
// Rebalance/exit checks must run even when PollFetches returns an empty batch;
201+
// otherwise a reader reassigned to empty partitions never enters the record loop.
202+
if stopProcessing, err := k.readerManager.FetchExitState(); stopProcessing {
203+
return err
204+
}
205+
200206
pollCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
201207
messages := reader.PollFetches(pollCtx)
202208
pollCtxErr := pollCtx.Err()
203209
cancel()
204210

205211
pollFetchErrors := messages.Errors()
206212

207-
// Rebalance/exit checks must run even when PollFetches returns an empty batch;
208-
// otherwise a reader reassigned to empty partitions never enters the record loop.
209-
if stopProcessing, err := k.readerManager.FetchExitState(); stopProcessing {
210-
return err
211-
}
212-
213213
// return early if poll context is deadline exceeded
214214
if errors.Is(pollCtxErr, context.DeadlineExceeded) {
215215
logger.Warnf("poll context deadline exceeded: %v", pollCtxErr)

drivers/kafka/internal/kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ func (k *Kafka) getReaderAssignedPartitions(ctx context.Context, readerIndex int
425425

426426
for _, topic := range assignment.Topics {
427427
for _, partition := range topic.Partitions {
428-
assigned = append(assigned, types.PartitionKey{Topic: topic.Topic, Partition: int(partition)})
428+
assigned = append(assigned, types.PartitionKey{Topic: topic.Topic, Partition: partition})
429429
}
430430
}
431431
}

pkg/kafka/balancer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (b *CustomGroupBalancer) Balance(consumerBalancer *kgo.ConsumerBalancer, pa
5959
for topic, partitionCount := range partitionsPerTopic {
6060
for partition := int32(0); partition < partitionCount; partition++ {
6161
if _, ok := b.partitionIndex[fmt.Sprintf("%s:%d", topic, partition)]; ok {
62-
activePartitions = append(activePartitions, types.PartitionKey{Topic: topic, Partition: int(partition)})
62+
activePartitions = append(activePartitions, types.PartitionKey{Topic: topic, Partition: partition})
6363
}
6464
}
6565
}
@@ -70,7 +70,7 @@ func (b *CustomGroupBalancer) Balance(consumerBalancer *kgo.ConsumerBalancer, pa
7070
})
7171

7272
for currentIndex, activePartition := range activePartitions {
73-
plan.AddPartition(&members[currentIndex%consumerCount], activePartition.Topic, int32(activePartition.Partition))
73+
plan.AddPartition(&members[currentIndex%consumerCount], activePartition.Topic, activePartition.Partition)
7474
}
7575
return plan
7676
}

pkg/kafka/reader.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ func (r *ReaderManager) CreateReaders(ctx context.Context, streams []types.Strea
6161
return r.waitForConsumerGroupJoin(consumerGroupID)
6262
}
6363

64-
// GetReaders returns the created readers
64+
// GetReader returns the created readers
6565
func (r *ReaderManager) GetReader(readerID int) *kgo.Client {
6666
return r.readers[readerID].reader
6767
}
6868

69-
// GetReaders returns the created readers
69+
// GetReaderCount returns the created readers count
7070
func (r *ReaderManager) GetReaderCount() int {
7171
return len(r.readers)
7272
}
@@ -125,7 +125,7 @@ func (r *ReaderManager) SetPartitions(ctx context.Context, stream types.StreamIn
125125
continue
126126
}
127127

128-
committedOffset, hasCommittedOffset := committedTopicOffsets[int(partition.Partition)]
128+
committedOffset, hasCommittedOffset := committedTopicOffsets[partition.Partition]
129129

130130
// check if the partition has any messages at all, if not then skip
131131
if startOffset.Offset >= endOffset.Offset {
@@ -141,7 +141,7 @@ func (r *ReaderManager) SetPartitions(ctx context.Context, stream types.StreamIn
141141

142142
r.partitionIndex[fmt.Sprintf("%s:%d", topic, partition.Partition)] = types.PartitionMetaData{
143143
Stream: stream,
144-
PartitionID: int(partition.Partition),
144+
PartitionID: partition.Partition,
145145
EndOffset: endOffset.Offset,
146146
}
147147
}
@@ -163,19 +163,19 @@ func (r *ReaderManager) GetTopicMetadata(ctx context.Context, topic string) (*ka
163163
}
164164

165165
// FetchCommittedOffsets fetches committed offsets for a topic
166-
func (r *ReaderManager) FetchCommittedOffsets(ctx context.Context, topic string, partitions map[int32]kadm.PartitionDetail) (map[int]int64, error) {
166+
func (r *ReaderManager) FetchCommittedOffsets(ctx context.Context, topic string, partitions map[int32]kadm.PartitionDetail) (map[int32]int64, error) {
167167
offsets, err := r.config.AdminClient.FetchOffsets(ctx, r.config.ConsumerGroupID)
168168
if err != nil {
169169
return nil, fmt.Errorf("could not fetch committed offsets for group %s", r.config.ConsumerGroupID)
170170
}
171171

172-
committedTopicOffsets := make(map[int]int64)
172+
committedTopicOffsets := make(map[int32]int64)
173173
for _, partitionDetail := range partitions {
174174
offset, exists := offsets.Lookup(topic, partitionDetail.Partition)
175175
if !exists {
176176
continue
177177
}
178-
committedTopicOffsets[int(partitionDetail.Partition)] = offset.At
178+
committedTopicOffsets[partitionDetail.Partition] = offset.At
179179
}
180180
return committedTopicOffsets, nil
181181
}
@@ -334,11 +334,6 @@ func (r *ReaderManager) CreateReader(streams []types.StreamInterface, consumerGr
334334
return reader, nil
335335
}
336336

337-
// GenerationID returns the consumer-group generation stored after CreateReaders join wait.
338-
func (r *ReaderManager) GenerationID() int32 {
339-
return r.generationID.Load()
340-
}
341-
342337
// RebalanceDetected is true when the client's group generation differs from the stored baseline.
343338
func (r *ReaderManager) RebalanceDetected(client *kgo.Client) bool {
344339
_, generationID := client.GroupMetadata()
@@ -383,7 +378,6 @@ func (r *ReaderManager) waitForConsumerGroupJoin(consumerGroupID string) error {
383378
} else if expectedGenerationID < 0 {
384379
expectedGenerationID = generationID
385380
}
386-
387381
}
388382

389383
if allReadersReady && expectedGenerationID >= 0 {

types/kafka_types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ const (
1717
type PartitionMetaData struct {
1818
ReaderID string
1919
Stream StreamInterface
20-
PartitionID int
20+
PartitionID int32
2121
EndOffset int64
2222
}
2323

2424
// PartitionKey represents a unique key for a Kafka partition and topic
2525
type PartitionKey struct {
2626
Topic string
27-
Partition int
27+
Partition int32
2828
}
2929

3030
// KafkaRecord represents a record (data + message) from a Kafka partition

0 commit comments

Comments
 (0)