Skip to content

Commit 612b9ed

Browse files
committed
dispatcher: limit resend message count
Signed-off-by: dongmen <414110582@qq.com>
1 parent a5fce16 commit 612b9ed

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

downstreamadapter/dispatcher/basic_dispatcher.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ import (
3535
"go.uber.org/zap/zapcore"
3636
)
3737

38+
const (
39+
outdatedActionDoneReportInterval = time.Second
40+
outdatedActionDoneReportMaxEntries = 256
41+
)
42+
3843
// DispatcherService defines the interface for providing dispatcher information and basic event handling.
3944
type DispatcherService interface {
4045
GetId() common.DispatcherID
@@ -217,6 +222,11 @@ type BasicDispatcher struct {
217222

218223
BootstrapState bootstrapState
219224

225+
// outdatedActionDoneAt throttles DONE reports for the same outdated action to avoid
226+
// flooding blockStatusesChan when maintainer resends stale actions repeatedly.
227+
outdatedActionDoneMu sync.Mutex
228+
outdatedActionDoneAt map[BlockEventIdentifier]time.Time
229+
220230
// tableModeCompatibilityChecked indicates whether we have already validated the newest
221231
// table schema is compatible with the current replication mode configuration.
222232
// Only when the initial case or a ddl event is received, we will reset tableModeCompatibilityChecked to check the compatibility.
@@ -256,12 +266,39 @@ func NewBasicDispatcher(
256266
creationPDTs: currentPDTs,
257267
mode: mode,
258268
BootstrapState: BootstrapFinished,
269+
outdatedActionDoneAt: make(map[BlockEventIdentifier]time.Time),
259270
}
260271
dispatcher.resolvedTs.Store(startTs)
261272

262273
return dispatcher
263274
}
264275

276+
func (d *BasicDispatcher) shouldReportOutdatedActionDone(identifier BlockEventIdentifier, now time.Time) bool {
277+
d.outdatedActionDoneMu.Lock()
278+
defer d.outdatedActionDoneMu.Unlock()
279+
280+
if last, ok := d.outdatedActionDoneAt[identifier]; ok && now.Sub(last) < outdatedActionDoneReportInterval {
281+
return false
282+
}
283+
284+
if len(d.outdatedActionDoneAt) >= outdatedActionDoneReportMaxEntries {
285+
expireBefore := now.Add(-2 * outdatedActionDoneReportInterval)
286+
for id, ts := range d.outdatedActionDoneAt {
287+
if ts.Before(expireBefore) {
288+
delete(d.outdatedActionDoneAt, id)
289+
}
290+
}
291+
if len(d.outdatedActionDoneAt) >= outdatedActionDoneReportMaxEntries {
292+
for id := range d.outdatedActionDoneAt {
293+
delete(d.outdatedActionDoneAt, id)
294+
}
295+
}
296+
}
297+
298+
d.outdatedActionDoneAt[identifier] = now
299+
return true
300+
}
301+
265302
// AddDMLEventsToSink filters events for special tables, registers batch wake
266303
// callbacks, and returns true when at least one event remains to be written to
267304
// the downstream sink.
@@ -836,6 +873,13 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
836873
}
837874

838875
// Step3: whether the outdate message or not, we need to return message show we have finished the event.
876+
identifier := BlockEventIdentifier{
877+
CommitTs: action.CommitTs,
878+
IsSyncPoint: action.IsSyncPoint,
879+
}
880+
if !d.shouldReportOutdatedActionDone(identifier, time.Now()) {
881+
return false
882+
}
839883
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
840884
ID: d.id.ToPB(),
841885
State: &heartbeatpb.State{

downstreamadapter/dispatcher/event_dispatcher_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,58 @@ func TestDispatcherHandleEvents(t *testing.T) {
429429
t.Run("cloud storage wake callback after batch enqueue", verifyDMLWakeCallbackStorageAfterBatchEnqueue)
430430
}
431431

432+
func TestOutdatedActionDoneIsThrottled(t *testing.T) {
433+
tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
434+
require.NoError(t, err)
435+
testSink := newDispatcherTestSink(t, common.MysqlSinkType)
436+
dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan)
437+
nodeID := node.NewID()
438+
439+
block := dispatcher.HandleEvents([]DispatcherEvent{
440+
NewDispatcherEvent(&nodeID, commonEvent.ResolvedEvent{ResolvedTs: 10}),
441+
}, func() {})
442+
require.False(t, block)
443+
require.Equal(t, uint64(10), dispatcher.GetResolvedTs())
444+
445+
staleStatus := &heartbeatpb.DispatcherStatus{
446+
Action: &heartbeatpb.DispatcherAction{
447+
Action: heartbeatpb.Action_Pass,
448+
CommitTs: 5,
449+
IsSyncPoint: false,
450+
},
451+
}
452+
453+
await := dispatcher.HandleDispatcherStatus(staleStatus)
454+
require.False(t, await)
455+
select {
456+
case msg := <-dispatcher.GetBlockStatusesChan():
457+
require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage)
458+
require.Equal(t, uint64(5), msg.State.BlockTs)
459+
case <-time.After(time.Second):
460+
require.FailNow(t, "expected stale action DONE")
461+
}
462+
463+
await = dispatcher.HandleDispatcherStatus(staleStatus)
464+
require.False(t, await)
465+
select {
466+
case msg := <-dispatcher.GetBlockStatusesChan():
467+
require.FailNow(t, "unexpected duplicate stale action DONE", "msg=%v", msg)
468+
case <-time.After(100 * time.Millisecond):
469+
}
470+
471+
time.Sleep(outdatedActionDoneReportInterval + 50*time.Millisecond)
472+
473+
await = dispatcher.HandleDispatcherStatus(staleStatus)
474+
require.False(t, await)
475+
select {
476+
case msg := <-dispatcher.GetBlockStatusesChan():
477+
require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage)
478+
require.Equal(t, uint64(5), msg.State.BlockTs)
479+
case <-time.After(time.Second):
480+
require.FailNow(t, "expected stale action DONE after throttle interval")
481+
}
482+
}
483+
432484
func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) {
433485
keyspaceID := getTestingKeyspaceID()
434486
tableSpan := getUncompleteTableSpan()

0 commit comments

Comments
 (0)