Skip to content
Merged
270 changes: 140 additions & 130 deletions downstreamadapter/dispatcher/basic_dispatcher.go

Large diffs are not rendered by default.

121 changes: 121 additions & 0 deletions downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,100 @@ func TestDispatcherHandleEvents(t *testing.T) {
t.Run("cloud storage wake callback after batch enqueue", verifyDMLWakeCallbackStorageAfterBatchEnqueue)
}

func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) {
keyspaceID := getTestingKeyspaceID()
tableSpan := getUncompleteTableSpan()
tableSpan.KeyspaceID = keyspaceID
mockSink := newDispatcherTestSink(t, common.MysqlSinkType)

var flushCalls atomic.Int32
flushStarted := make(chan struct{}, 1)
flushRelease := make(chan struct{})
mockSink.SetFlushBeforeBlockHook(func(event commonEvent.BlockEvent) error {
if event.GetCommitTs() != 10 {
return nil
}
if flushCalls.Add(1) == 1 {
select {
case flushStarted <- struct{}{}:
default:
}
<-flushRelease
}
return nil
})

dispatcher := newDispatcherForTest(mockSink.Sink(), tableSpan)
nodeID := node.NewID()
ddlEvent := &commonEvent.DDLEvent{
FinishedTs: 10,
StartTs: 10,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{1},
},
}

block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent)}, func() {})
require.True(t, block)

select {
case <-flushStarted:
case <-time.After(time.Second):
require.FailNow(t, "expected local flush to start for blocking DDL")
}

pendingEvent, blockStage := dispatcher.blockEventStatus.getEventAndStage()
require.Nil(t, pendingEvent)
require.Equal(t, heartbeatpb.BlockStage_NONE, blockStage)

select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.FailNow(t, "unexpected block status before local flush finishes", "received=%v", msg)
case <-time.After(200 * time.Millisecond):
}

close(flushRelease)

select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.True(t, msg.State.IsBlocked)
require.Equal(t, uint64(10), msg.State.BlockTs)
require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage)
case <-time.After(time.Second):
require.FailNow(t, "expected blocking DDL to enter WAITING after local flush")
}

pendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage()
require.Same(t, ddlEvent, pendingEvent)
require.Equal(t, heartbeatpb.BlockStage_WAITING, blockStage)
require.Equal(t, int32(1), flushCalls.Load())

await := dispatcher.HandleDispatcherStatus(&heartbeatpb.DispatcherStatus{
Action: &heartbeatpb.DispatcherAction{
Action: heartbeatpb.Action_Write,
CommitTs: ddlEvent.FinishedTs,
IsSyncPoint: false,
},
})
require.True(t, await)

select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.True(t, msg.State.IsBlocked)
require.Equal(t, uint64(10), msg.State.BlockTs)
require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage)
case <-time.After(time.Second):
require.FailNow(t, "expected DONE after write action")
}

require.Eventually(t, func() bool {
pendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage()
return pendingEvent == nil && blockStage == heartbeatpb.BlockStage_NONE
}, time.Second, 10*time.Millisecond)
require.Equal(t, int32(1), flushCalls.Load())
}

// test uncompelete table span can correctly handle the ddl events
func TestUncompeleteTableSpanDispatcherHandleEvents(t *testing.T) {
count.Swap(0)
Expand Down Expand Up @@ -1131,6 +1225,19 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) {
keyspaceID := getTestingKeyspaceID()
ddlTableSpan := common.KeyspaceDDLSpan(keyspaceID)
mockSink := newDispatcherTestSink(t, common.MysqlSinkType)
flushStarted := make(chan struct{}, 1)
flushRelease := make(chan struct{})
mockSink.SetFlushBeforeBlockHook(func(event commonEvent.BlockEvent) error {
if event.GetCommitTs() != 20 {
return nil
}
select {
case flushStarted <- struct{}{}:
default:
}
<-flushRelease
return nil
})
dispatcher := newDispatcherForTest(mockSink.Sink(), ddlTableSpan)

nodeID := node.NewID()
Expand Down Expand Up @@ -1188,6 +1295,20 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) {
},
})

select {
case <-flushStarted:
case <-time.After(time.Second):
require.FailNow(t, "expected deferred DB-level flush to start")
}

select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.FailNow(t, "unexpected block status before local flush finishes", "received=%v", msg)
case <-time.After(200 * time.Millisecond):
}

close(flushRelease)

select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.True(t, msg.State.IsBlocked)
Expand Down
19 changes: 18 additions & 1 deletion downstreamadapter/dispatcher/mock_sink_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type dispatcherTestSink struct {
mu sync.Mutex
dmls []*commonEvent.DMLEvent
isNormal atomic.Bool

flushMu sync.Mutex
flushBeforeBlockHook func(commonEvent.BlockEvent) error
}

func newDispatcherTestSink(t *testing.T, sinkType common.SinkType) *dispatcherTestSink {
Expand All @@ -58,7 +61,15 @@ func newDispatcherTestSink(t *testing.T, sinkType common.SinkType) *dispatcherTe
event.PostFlush()
return nil
}).AnyTimes()
testSink.sink.EXPECT().FlushDMLBeforeBlock(gomock.Any()).Return(nil).AnyTimes()
testSink.sink.EXPECT().FlushDMLBeforeBlock(gomock.Any()).DoAndReturn(func(event commonEvent.BlockEvent) error {
testSink.flushMu.Lock()
hook := testSink.flushBeforeBlockHook
testSink.flushMu.Unlock()
if hook != nil {
return hook(event)
}
return nil
}).AnyTimes()
testSink.sink.EXPECT().AddCheckpointTs(gomock.Any()).AnyTimes()
testSink.sink.EXPECT().SetTableSchemaStore(gomock.Any()).AnyTimes()
testSink.sink.EXPECT().Close(gomock.Any()).AnyTimes()
Expand All @@ -74,6 +85,12 @@ func (s *dispatcherTestSink) SetIsNormal(isNormal bool) {
s.isNormal.Store(isNormal)
}

func (s *dispatcherTestSink) SetFlushBeforeBlockHook(hook func(commonEvent.BlockEvent) error) {
s.flushMu.Lock()
defer s.flushMu.Unlock()
s.flushBeforeBlockHook = hook
}

func (s *dispatcherTestSink) GetDMLs() []*commonEvent.DMLEvent {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
9 changes: 5 additions & 4 deletions heartbeatpb/heartbeat.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions heartbeatpb/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ enum Action {
// Pass means influenced dispatchers mark this barrier as passed after the
// writer has finished Write for the same (commitTs, isSyncPoint).
Pass = 1;
// Flush means influenced dispatchers flush all DML before this barrier and
// keep the pending barrier event for the later Write/Pass steps.
// It is required for storage sink when split-table is enabled: one table may have multiple
// dispatchers on different nodes, and every dispatcher must flush pre-barrier
// DML first to prevent it from being reordered after the writer executes DDL.
Flush = 2;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause compatibility issues during the upgrade process?

}

message DispatcherAction {
Expand Down
97 changes: 22 additions & 75 deletions maintainer/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Barrier struct {
spanController *span.Controller
operatorController *operator.Controller
splitTableEnabled bool
flushEnabled bool
// mode identifies which replication pipeline this barrier belongs to
// (common.DefaultMode or common.RedoMode). Barrier state, resend messages,
// and logs must stay in the same mode.
Expand All @@ -55,27 +54,13 @@ func NewBarrier(spanController *span.Controller,
splitTableEnabled bool,
bootstrapRespMap map[node.ID]*heartbeatpb.MaintainerBootstrapResponse,
mode int64,
) *Barrier {
return NewBarrierWithFlush(spanController, operatorController, splitTableEnabled, true, bootstrapRespMap, mode)
}

// NewBarrierWithFlush creates a barrier with an explicit flush phase switch.
// flushEnabled must be decided before restoring bootstrap events so restored
// events use the right phase machine from the beginning.
func NewBarrierWithFlush(spanController *span.Controller,
operatorController *operator.Controller,
splitTableEnabled bool,
flushEnabled bool,
bootstrapRespMap map[node.ID]*heartbeatpb.MaintainerBootstrapResponse,
mode int64,
) *Barrier {
barrier := Barrier{
blockedEvents: NewBlockEventMap(),
pendingEvents: newPendingScheduleEventMap(),
spanController: spanController,
operatorController: operatorController,
splitTableEnabled: splitTableEnabled,
flushEnabled: flushEnabled,
mode: mode,
}
barrier.handleBootstrapResponse(bootstrapRespMap)
Expand Down Expand Up @@ -188,8 +173,6 @@ func (b *Barrier) handleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea
event, ok := b.blockedEvents.Get(key)
if !ok {
event = NewBlockEvent(common.NewChangefeedIDFromPB(resp.ChangefeedID), common.NewDispatcherIDFromPB(span.ID), b.spanController, b.operatorController, blockState, b.splitTableEnabled, b.mode)
event.flushEnabled = b.flushEnabled
event.flushDispatcherAdvanced = !b.flushEnabled
b.blockedEvents.Set(key, event)
}
switch blockState.Stage {
Expand Down Expand Up @@ -320,58 +303,29 @@ func (b *Barrier) handleEventDone(changefeedID common.ChangeFeedID, dispatcherID
return nil
}

// We should only see DONE after the event has been selected.
if !event.selected.Load() {
return event
}

// Phase 1 (Flush, storage split-table only): all influenced dispatchers must flush pre-barrier DML first.
// This prevents pre-DDL data from overtaking the barrier write when a table is
// split into multiple dispatchers across nodes.
// DONE from all influenced dispatchers advances flushDispatcherAdvanced = true.
if !event.flushDispatcherAdvanced {
event.markDispatcherEventDone(dispatcherID)
if !event.allDispatcherReported() {
return event
}

event.flushDispatcherAdvanced = true
event.rangeChecker.Reset()
event.reportedDispatchers = make(map[common.DispatcherID]struct{})
event.lastResendTime = time.Now().Add(-20 * time.Second)
return event
}

// Phase 2 (Write): only writerDispatcher executes Action_Write.
// DONE from writerDispatcher advances writerDispatcherAdvanced = true.
if !event.writerDispatcherAdvanced {
if event.writerDispatcher != dispatcherID {
// Ignore stale DONE from non-writer dispatchers while waiting writer Action_Write.
return event
}

// there is a block event and the dispatcher write or pass action already
// which means we have sent pass or write action to it
// the writer already synced ddl to downstream
if event.writerDispatcher == dispatcherID {
if event.needSchedule {
// We should schedule only after writer finished Action_Write.
// Otherwise truncate/create like ddl may expose new table dml before old table cleanup.
// we need do schedule when writerDispatcherAdvanced
// Otherwise, if we do schedule when just selected = true, then ask dispatcher execute ddl
// when meeting truncate table,
// there is possible that dml for the new table will arrive before truncate ddl executed.
// that will lead to data loss
scheduled := b.tryScheduleEvent(event)
if !scheduled {
// Not scheduled yet, keep waiting and resend later.
// not scheduled yet, just return, wait for next resend
return event
}
} else {
// the pass action will be sent periodically in resend logic if not acked
event.writerDispatcherAdvanced = true
event.lastResendTime = time.Now().Add(-20 * time.Second)
}

// Start Phase 3 from a clean coverage window.
event.rangeChecker.Reset()
event.reportedDispatchers = make(map[common.DispatcherID]struct{})
event.lastResendTime = time.Now().Add(-20 * time.Second)
return event
}

// Phase 3 (Pass): all influenced dispatchers report DONE for Action_Pass,
// then checkEventFinish removes the barrier from blockedEvents.
// checkpoint ts is advanced, clear the map, so do not need to resend message anymore
event.markDispatcherEventDone(dispatcherID)
b.checkEventFinish(event)
return event
Expand Down Expand Up @@ -436,9 +390,8 @@ func (b *Barrier) handleBlockState(changefeedID common.ChangeFeedID,
// the block event, and check whether we need to send write action
event.markDispatcherEventDone(dispatcherID)
status, targetID := event.checkEventAction(dispatcherID)
if event.selected.Load() && event.needSchedule {
// scheduling is only required for ddl that changes tables. enqueue once the
// barrier is selected, regardless of whether the action is sent immediately.
if status != nil && event.needSchedule {
// scheduling is only required for ddl that changes tables, enqueue the event
b.pendingEvents.add(event)
}
return event, status, targetID, true
Expand Down Expand Up @@ -477,8 +430,6 @@ func (b *Barrier) getOrInsertNewEvent(changefeedID common.ChangeFeedID, dispatch
event, ok := b.blockedEvents.Get(key)
if !ok {
event = NewBlockEvent(changefeedID, dispatcherID, b.spanController, b.operatorController, blockState, b.splitTableEnabled, b.mode)
event.flushEnabled = b.flushEnabled
event.flushDispatcherAdvanced = !b.flushEnabled
b.blockedEvents.Set(key, event)
}
return event
Expand All @@ -487,21 +438,17 @@ func (b *Barrier) getOrInsertNewEvent(changefeedID common.ChangeFeedID, dispatch
// check whether the event is get all the done message from dispatchers
// if so, remove the event from blockedTs, not need to resend message anymore
func (b *Barrier) checkEventFinish(be *BarrierEvent) {
if !be.selected.Load() {
return
}
if !be.flushDispatcherAdvanced || !be.writerDispatcherAdvanced {
return
}
if !be.allDispatcherReported() {
return
}

log.Info("all dispatchers reported event done, remove event",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("committs", be.commitTs),
zap.Int64("mode", b.mode))
b.blockedEvents.Delete(getEventKey(be.commitTs, be.isSyncPoint))
if be.selected.Load() {
log.Info("all dispatchers reported event done, remove event",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("committs", be.commitTs),
zap.Int64("mode", b.mode))
// already selected a dispatcher to write, now all dispatchers reported the block event
b.blockedEvents.Delete(getEventKey(be.commitTs, be.isSyncPoint))
}
}

func (b *Barrier) tryScheduleEvent(event *BarrierEvent) bool {
Expand Down
Loading
Loading