Skip to content
Merged
251 changes: 124 additions & 127 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,44 +333,30 @@ func (d *BasicDispatcher) InitializeTableSchemaStore(schemaInfo []*heartbeatpb.S
return true, nil
}

// AddBlockEventToSink writes a block event to downstream.
// Must make sure the previous events have been flushed to downstream before calling this function
func (d *BasicDispatcher) AddBlockEventToSink(event commonEvent.BlockEvent) error {
// For ddl event, we need to check whether it should be sent to downstream.
// It may be marked as not sync by filter when building the event.
if event.GetType() == commonEvent.TypeDDLEvent {
ddl := event.(*commonEvent.DDLEvent)
// If NotSync is true, it means the DDL should not be sent to downstream.
// So we just call PassBlockEventToSink to finish local bookkeeping:
// mark it passed in tableProgress and trigger flush callbacks to unblock
// dispatcher progress, without sending this DDL to sink.
// So we just call PassBlockEventToSink to update the table progress and call the postFlush func.
if ddl.NotSync {
log.Info("ignore DDL by NotSync", zap.Stringer("dispatcher", d.id), zap.Any("ddl", ddl))
return d.PassBlockEventToSink(event)
d.PassBlockEventToSink(event)
return nil
}
}
// Keep block-event write order with prior DML. For storage sink this may wait
// until prior DML are enqueued/flushed to the sink pipeline; for non-storage
// sinks it is usually a no-op.
if err := d.sink.FlushDMLBeforeBlock(event); err != nil {
return err
}

d.tableProgress.Add(event)
return d.sink.WriteBlockEvent(event)
}

// PassBlockEventToSink is used when block event handling result is "pass"
// (for example maintainer action=Pass or DDL NotSync).
//
// It intentionally does not call sink.WriteBlockEvent. Instead, it updates
// local progress as if the event had been handled, then fires PostFlush
// callbacks so wake/checkpoint logic can continue with consistent ordering.
func (d *BasicDispatcher) PassBlockEventToSink(event commonEvent.BlockEvent) error {
if err := d.sink.FlushDMLBeforeBlock(event); err != nil {
return err
}
// PassBlockEventToSink advances local progress for a block event without writing it downstream.
// Must make sure the previous events have been flushed to downstream before calling this function
func (d *BasicDispatcher) PassBlockEventToSink(event commonEvent.BlockEvent) {
d.tableProgress.Pass(event)
event.PostFlush()
return nil
}

// ensureActiveActiveTableInfo validates the table schema requirements for active-active mode.
Expand Down Expand Up @@ -757,7 +743,7 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream.
// 2. If the action is a pass, we just need to pass the event
//
// For block actions (write/pass), execution may involve downstream IO because we flush prior DML first.
// For block actions (write/pass), execution may involve downstream IO.
// To avoid blocking the dispatcher status dynamic stream handler, we execute the action asynchronously
// and return await=true.
// The status path will be waked up after the action finishes.
Expand Down Expand Up @@ -828,11 +814,6 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
d.PassBlockEvent(pendingEvent, actionCommitTs, actionIsSyncPoint)
})
return true
case heartbeatpb.Action_Flush:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
d.FlushBlockEvent(pendingEvent, actionCommitTs, actionIsSyncPoint)
})
return true
default:
log.Error("unsupported action type",
zap.Stringer("dispatcher", d.id),
Expand Down Expand Up @@ -884,37 +865,21 @@ func (d *BasicDispatcher) ExecuteBlockEventDDL(pendingEvent commonEvent.BlockEve
d.reportBlockedEventDone(actionCommitTs, actionIsSyncPoint)
}

// PassBlockEvent executes maintainer Action_Pass:
// It relies on PassBlockEventToSink to preserve ordering and mark the event passed.
// PassBlockEvent executes maintainer Action_Pass on a block event whose prior DMLs
// were already drained before it entered WAITING.
func (d *BasicDispatcher) PassBlockEvent(pendingEvent commonEvent.BlockEvent, actionCommitTs uint64, actionIsSyncPoint bool) {
failpoint.Inject("BlockOrWaitBeforePass", nil)
err := d.PassBlockEventToSink(pendingEvent)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between PassBlockEventToSink and the passPreparedBlockEvent function?

if err != nil {
d.HandleError(err)
return
}
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
d.reportBlockedEventDone(actionCommitTs, actionIsSyncPoint)
}

// FlushBlockEvent executes maintainer Action_Flush:
// It only flushes prior DML before the pending block event and keeps the pending
// event for subsequent Write/Pass action.
func (d *BasicDispatcher) FlushBlockEvent(pendingEvent commonEvent.BlockEvent, actionCommitTs uint64, actionIsSyncPoint bool) {
failpoint.Inject("BlockOrWaitBeforeFlush", nil)
err := d.sink.FlushDMLBeforeBlock(pendingEvent)
if err != nil {
d.HandleError(err)
return
}
d.blockEventStatus.updateBlockStage(heartbeatpb.BlockStage_WAITING)
failpoint.Inject("BlockAfterFlush", nil)
d.reportBlockedEventDone(actionCommitTs, actionIsSyncPoint)
}

// reportBlockedEventDone sends DONE status and wakes dispatcher-status stream path
// so the next status for this dispatcher can be handled.
func (d *BasicDispatcher) reportBlockedEventDone(actionCommitTs uint64, actionIsSyncPoint bool) {
func (d *BasicDispatcher) reportBlockedEventDone(
actionCommitTs uint64,
actionIsSyncPoint bool,
) {
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
Expand Down Expand Up @@ -960,91 +925,111 @@ func (d *BasicDispatcher) shouldBlock(event commonEvent.BlockEvent) bool {
return false
}

// Hold DB/All block events on the table trigger dispatcher until there are no pending
// resend tasks(by pendingACKCount, because some ddl's resend task set is after write downstream).
// This ensures maintainer observes all schedule-related DDLs (e.g. create table)
// and updates spanController tasks before it builds a DB/All range checker for this event.
//
// Note: We only hold InfluenceType_DB/All. InfluenceType_Normal does not require a global
// task snapshot to build its range checker.
func (d *BasicDispatcher) shouldHoldBlockEvent(event commonEvent.BlockEvent) bool {
blockedTables := event.GetBlockedTables()
return d.IsTableTriggerDispatcher() &&
d.pendingACKCount.Load() > 0 &&
blockedTables != nil &&
blockedTables.InfluenceType != commonEvent.InfluenceTypeNormal
}

// 1.If the event is a single table DDL, it will be added to the sink for writing to downstream.
// If the ddl leads to add new tables or drop tables, it should send heartbeat to maintainer
// 2. If the event is a multi-table DDL / sync point Event, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
if !d.shouldBlock(event) {
// Writing a block event may involve downstream IO (e.g. executing DDL), so it must not block
// the dynamic stream goroutine.
d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
noNeedAddAndDrop := event.GetNeedAddedTables() == nil && event.GetNeedDroppedTables() == nil
needsScheduleACKTracking := d.IsTableTriggerDispatcher() && !noNeedAddAndDrop
shouldBlock := d.shouldBlock(event)
shouldHoldBlocked := d.shouldHoldBlockEvent(event)
if shouldBlock && shouldHoldBlocked {
d.holdBlockEvent(event)
return
}
// Writing a block event may involve downstream IO (e.g. executing DDL), so it must not block
// the dynamic stream goroutine.
d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
noNeedAddAndDrop := event.GetNeedAddedTables() == nil && event.GetNeedDroppedTables() == nil
needsScheduleACKTracking := !shouldBlock && d.IsTableTriggerDispatcher() && !noNeedAddAndDrop
if needsScheduleACKTracking {
// If this is a table trigger dispatcher, and the DDL leads to add/drop tables,
// we track it as a pending schedule-related event until the maintainer ACKs it.
d.pendingACKCount.Add(1)
}
if shouldBlock {
failpoint.Inject("BlockOrWaitBeforeFlush", nil)
}
// Keep block-event write/pass order with prior DML.
// For storage sink this waits all previous enqueued DML events flushed.
// For non-storage sinks it is usually a no-op.
if err := d.sink.FlushDMLBeforeBlock(event); err != nil {
if needsScheduleACKTracking {
// If this is a table trigger dispatcher, and the DDL leads to add/drop tables,
// we track it as a pending schedule-related event until the maintainer ACKs it.
d.pendingACKCount.Add(1)
d.pendingACKCount.Add(-1)
}
err := d.AddBlockEventToSink(event)
if err != nil {
if needsScheduleACKTracking {
d.pendingACKCount.Add(-1)
}
d.HandleError(err)
return
}
if noNeedAddAndDrop {
return
d.HandleError(err)
return
}
if shouldBlock {
failpoint.Inject("BlockAfterFlush", nil)
d.reportBlockedEventToMaintainer(event)
return
}
err := d.AddBlockEventToSink(event)
if err != nil {
if needsScheduleACKTracking {
d.pendingACKCount.Add(-1)
}
d.HandleError(err)
return
}
if noNeedAddAndDrop {
return
}

message := &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: false,
BlockTs: event.GetCommitTs(),
NeedDroppedTables: event.GetNeedDroppedTables().ToPB(),
NeedAddedTables: commonEvent.ToTablesPB(event.GetNeedAddedTables()),
IsSyncPoint: false, // sync point event must should block
Stage: heartbeatpb.BlockStage_NONE,
},
Mode: d.GetMode(),
}
identifier := BlockEventIdentifier{
CommitTs: event.GetCommitTs(),
IsSyncPoint: false,
}
message := &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: false,
BlockTs: event.GetCommitTs(),
NeedDroppedTables: event.GetNeedDroppedTables().ToPB(),
NeedAddedTables: commonEvent.ToTablesPB(event.GetNeedAddedTables()),
IsSyncPoint: false, // sync point event must should block
Stage: heartbeatpb.BlockStage_NONE,
},
Mode: d.GetMode(),
}
identifier := BlockEventIdentifier{
CommitTs: event.GetCommitTs(),
IsSyncPoint: false,
}

if event.GetNeedAddedTables() != nil {
// When the ddl need add tables, we need the maintainer to block the forwarding of checkpointTs
// Because the new add table should join the calculation of checkpointTs
// So the forwarding of checkpointTs should be blocked until the new dispatcher is created.
// While there is a time gap between dispatcher send the block status and
// maintainer begin to create dispatcher(and block the forwaring checkpoint)
// in order to avoid the checkpointTs forward unexceptedly,
// we need to block the checkpoint forwarding in this dispatcher until receive the ack from maintainer.
//
// |----> block checkpointTs forwaring of this dispatcher ------>|-----> forwarding checkpointTs normally
// | send block stauts send ack |
// dispatcher -------------------> maintainer ----------------> dispatcher
// |
// |----------> Block CheckpointTs Forwarding and create new dispatcher
// Thus, we add the event to tableProgress again, and call event postFunc when the ack is received from maintainer.
event.ClearPostFlushFunc()
d.tableProgress.Add(event)
d.resendTaskMap.Set(identifier, newResendTask(message, d, event.PostFlush))
} else {
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
}
d.sharedInfo.blockStatusesChan <- message
})
} else {
// Hold DB/All block events on the table trigger dispatcher until there are no pending
// resend tasks(by pendingACKCount, because some ddl's resend task set is after write downstream).
// This ensures maintainer observes all schedule-related DDLs (e.g. create table)
// and updates spanController tasks before it builds a DB/All range checker for this event.
//
// Note: We only hold InfluenceType_DB/All. InfluenceType_Normal does not require a global
// task snapshot to build its range checker.
blockedTables := event.GetBlockedTables()
if d.IsTableTriggerDispatcher() &&
d.pendingACKCount.Load() > 0 &&
blockedTables != nil &&
blockedTables.InfluenceType != commonEvent.InfluenceTypeNormal {
d.holdBlockEvent(event)
return
if event.GetNeedAddedTables() != nil {
// When the ddl need add tables, we need the maintainer to block the forwarding of checkpointTs
// Because the new add table should join the calculation of checkpointTs
// So the forwarding of checkpointTs should be blocked until the new dispatcher is created.
// While there is a time gap between dispatcher send the block status and
// maintainer begin to create dispatcher(and block the forwaring checkpoint)
// in order to avoid the checkpointTs forward unexceptedly,
// we need to block the checkpoint forwarding in this dispatcher until receive the ack from maintainer.
//
// |----> block checkpointTs forwaring of this dispatcher ------>|-----> forwarding checkpointTs normally
// | send block stauts send ack |
// dispatcher -------------------> maintainer ----------------> dispatcher
// |
// |----------> Block CheckpointTs Forwarding and create new dispatcher
// Thus, we add the event to tableProgress again, and call event postFunc when the ack is received from maintainer.
event.ClearPostFlushFunc()
d.tableProgress.Add(event)
d.resendTaskMap.Set(identifier, newResendTask(message, d, event.PostFlush))
} else {
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
}
d.reportBlockedEventToMaintainer(event)
}
d.sharedInfo.blockStatusesChan <- message
})

// dealing with events which update schema ids
// Only rename table and rename tables may update schema ids(rename db1.table1 to db2.table2)
Expand Down Expand Up @@ -1102,7 +1087,7 @@ func (d *BasicDispatcher) tryDealWithHeldBlockEvent() {
// Thus, we ensure DB/All block events can generate correct range checkers.
if d.pendingACKCount.Load() == 0 {
if holding := d.popHoldingBlockEvent(); holding != nil {
d.reportBlockedEventToMaintainer(holding)
d.flushBlockedEventAndReportToMaintainer(holding)
}
} else if d.pendingACKCount.Load() < 0 {
d.HandleError(errors.ErrDispatcherFailed.GenWithStackByArgs(
Expand Down Expand Up @@ -1170,6 +1155,18 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block
d.sharedInfo.blockStatusesChan <- message
}

func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) {
d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
failpoint.Inject("BlockOrWaitBeforeFlush", nil)
if err := d.sink.FlushDMLBeforeBlock(event); err != nil {
d.HandleError(err)
return
}
failpoint.Inject("BlockAfterFlush", nil)
d.reportBlockedEventToMaintainer(event)
})
}

// GetBlockEventStatus returns the current in-flight *blocking* barrier state for bootstrap.
//
// We only report statuses for events that actually block the event stream (multi-table DDLs, split-span DDLs,
Expand Down
Loading