Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,22 +250,24 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
return 0, 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, 0, inErr
}
<<<<<<< HEAD
=======
// We have to wait the writer to close to complete the upload
// If failed to close writer, some DMLs may not be upload successfully
if inErr = writer.Close(ctx); inErr != nil {
log.Error("failed to close writer", zap.Error(inErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
return 0, 0, inErr
}

d.metricFlushDuration.Observe(time.Since(start).Seconds())
>>>>>>> 1ea739d924 (sink(ticdc): fix a bug that may cause data loss while closing Writer failed (#12437))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This block of code contains git conflict markers (<<<<<<<, =======, >>>>>>>) which will cause a compilation failure. Additionally, it references an undefined variable start and an undefined field d.metricFlushDuration, which are also compilation errors.

The core logic to handle the error from writer.Close() is correct, but these issues must be resolved. The suggested code below removes the conflict markers and the problematic metric observation, leaving only the necessary bug fix.

                if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
                        return 0, 0, inErr
                }

                // We have to wait for the writer to close to complete the upload.
                // If it fails to close, some DMLs may not be uploaded successfully.
                if inErr = writer.Close(ctx); inErr != nil {
                        log.Error("failed to close writer", zap.Error(inErr),
                                zap.Int("workerID", d.id),
                                zap.Any("table", task.tableInfo.TableName),
                                zap.String("namespace", d.changeFeedID.Namespace),
                                zap.String("changefeed", d.changeFeedID.ID))
                        return 0, 0, inErr
                }

return rowsCnt, bytesCnt, nil
}); err != nil {
return err
Expand Down
Loading