Skip to content

Commit a4831e7

Browse files
chore: added-logs-handled-infinite-loop
1 parent 2824e5a commit a4831e7

2 files changed

Lines changed: 31 additions & 23 deletions

File tree

drivers/kafka/internal/cdc.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ func (k *Kafka) PostCDC(ctx context.Context, readerIdx int) error {
183183
if reader == nil {
184184
return fmt.Errorf("reader %s not found for commit", readerID)
185185
}
186+
_, generationID := reader.GroupMetadata()
187+
logger.Debugf("reader %s post cdc: generation id: %d", readerID, generationID)
186188

187189
if err := reader.CommitRecords(ctx, messages...); err != nil {
188190
return fmt.Errorf("commit failed for reader %s: %s", readerID, err)
@@ -208,25 +210,28 @@ func (k *Kafka) PostCDC(ctx context.Context, readerIdx int) error {
208210
// for processing messages from a Kafka reader.
209211
func (k *Kafka) processKafkaMessages(ctx context.Context, reader *kgo.Client, stopProcessFn func(record types.KafkaRecord) (bool, error)) error {
210212
for {
211-
messages := reader.PollFetches(ctx)
213+
pollCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
214+
messages := reader.PollFetches(pollCtx)
215+
pollCtxErr := pollCtx.Err()
216+
cancel()
212217
errs := messages.Errors()
213-
if len(errs) > 0 {
218+
219+
// Rebalance/exit checks must run even when PollFetches returns an empty batch;
220+
// otherwise a reader reassigned to empty partitions never enters the record loop.
221+
if stopProcessing, err := k.readerManager.FetchExitState(); stopProcessing {
222+
return err
223+
}
224+
225+
if len(errs) > 0 && pollCtxErr == nil {
214226
return fmt.Errorf("%v: error reading message in Kafka CDC sync: %v", constants.ErrNonRetryable, errs[0].Err)
215227
}
216228

217229
records := messages.RecordIter()
218230

219231
for !records.Done() {
220-
// Discover/schema sampling uses standalone partition consumers without ReaderManager.
221-
if k.readerManager != nil {
222-
if k.readerManager.ShouldStopProcessing() {
223-
logger.Infof("stopping kafka CDC processing due to consumer group rebalance")
224-
return nil
225-
}
226-
if err := k.readerManager.ErrForExitMode(); err != nil {
227-
logger.Errorf("kafka consumer lost partition ownership and CDC processing can no longer continue safely: %s ", err)
228-
return err
229-
}
232+
// Re-check rebalance state before processing each record to stop immediately after partition revocation.
233+
if stopProcessing, err := k.readerManager.FetchExitState(); stopProcessing {
234+
return err
230235
}
231236
message := records.Next()
232237

pkg/kafka/reader.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -361,21 +361,24 @@ func (r *ReaderManager) RebalanceDetected(client *kgo.Client) bool {
361361
return generationID >= 0 && generationID != r.generationID.Load()
362362
}
363363

364-
// ShouldStopProcessing reports a consumer-group rebalance (assign/revoke during CDC).
365-
// The fetch loop must exit with nil — not an error — so abstract layer does not retry.
366-
func (r *ReaderManager) ShouldStopProcessing() bool {
367-
return r.exitMode.Load() == gracefulExit
368-
}
364+
// FetchExitState reports whether CDC processing should stop after PollFetches.
365+
// exitMode is updated by consumer group rebalance callbacks before PollFetches returns.
366+
func (r *ReaderManager) FetchExitState() (stop bool, err error) {
367+
// ReaderManager will be nil during discover mode.
368+
if r == nil {
369+
return false, nil
370+
}
369371

370-
// ErrForExitMode returns an error only for nonRetryableExit (e.g. partitions lost).
371-
func (r *ReaderManager) ErrForExitMode() error {
372372
switch r.exitMode.Load() {
373-
case normalProcessing, gracefulExit:
374-
return nil
373+
case normalProcessing:
374+
return false, nil
375+
case gracefulExit:
376+
logger.Infof("stopping kafka CDC processing gracefully due to consumer group rebalance")
377+
return true, nil
375378
case nonRetryableExit:
376-
return fmt.Errorf("%v: kafka sync aborted", constants.ErrNonRetryable)
379+
return true, fmt.Errorf("%v: kafka sync aborted due to partition loss during consumer group rebalance", constants.ErrNonRetryable)
377380
default:
378-
return fmt.Errorf("%v: kafka sync aborted", constants.ErrNonRetryable)
381+
return true, fmt.Errorf("%v: kafka sync aborted: unexpected exit mode", constants.ErrNonRetryable)
379382
}
380383
}
381384

0 commit comments

Comments
 (0)