-
Notifications
You must be signed in to change notification settings - Fork 296
Open
Labels
type/enhancementThe issue or PR belongs to an enhancement.The issue or PR belongs to an enhancement.
Description
The bootstrapWorker stores the tableInfo by the physicalTableID from dml event. It sends duplicate BOOTSTRAP events for the partitioned table because the table information is the same.
tiflow/pkg/sink/codec/bootstraper.go
Lines 105 to 124 in 7dda3e9
| func (b *bootstrapWorker) addEvent( | |
| ctx context.Context, | |
| key model.TopicPartitionKey, | |
| row *model.RowChangedEvent, | |
| ) error { | |
| table, ok := b.activeTables.Load(row.GetTableID()) | |
| if !ok { | |
| tb := newTableStatistic(key, row) | |
| b.activeTables.Store(tb.id, tb) | |
| // Send bootstrap message immediately when a new table is added | |
| err := b.sendBootstrapMsg(ctx, tb) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| } else { | |
| // If the table is already in the activeTables, update its status. | |
| table.(*tableStatistic).update(row, key.TotalPartition) | |
| } | |
| return nil | |
| } |
tiflow/pkg/sink/codec/bootstraper.go
Lines 126 to 150 in 7dda3e9
| // sendBootstrapMsg sends a bootstrap message if the table meets the condition | |
| // 1. The time since last bootstrap message sent is larger than sendBootstrapInterval | |
| // 2. The received row event count since last bootstrap message sent is larger than sendBootstrapInMsgCount | |
| // Note: It is a blocking method, it will block if the outCh is full. | |
| func (b *bootstrapWorker) sendBootstrapMsg(ctx context.Context, table *tableStatistic) error { | |
| if !table.shouldSendBootstrapMsg( | |
| b.sendBootstrapInterval, | |
| b.sendBootstrapInMsgCount) { | |
| return nil | |
| } | |
| table.reset() | |
| tableInfo := table.tableInfo.Load().(*model.TableInfo) | |
| events, err := b.generateEvents(table.topic, table.totalPartition.Load(), tableInfo) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| for _, e := range events { | |
| select { | |
| case <-ctx.Done(): | |
| return ctx.Err() | |
| case b.outCh <- e: | |
| } | |
| } | |
| return nil | |
| } |
Metadata
Metadata
Assignees
Labels
type/enhancementThe issue or PR belongs to an enhancement.The issue or PR belongs to an enhancement.