@@ -52,6 +52,9 @@ const (
5252 // defaultSendResolvedTsInterval use to control whether to send a resolvedTs event to the dispatcher when its scan is skipped.
5353 defaultSendResolvedTsInterval = time .Second * 2
5454 defaultRefreshMinSentResolvedTsInterval = time .Second * 1
55+ defaultSyncPointCheckpointCapMultiplier = 2
56+ defaultSyncPointLagSuppressThreshold = 20 * time .Minute
57+ defaultSyncPointLagResumeThreshold = 15 * time .Minute
5558)
5659
5760// eventBroker get event from the eventStore, and send the event to the dispatchers.
@@ -92,8 +95,11 @@ type eventBroker struct {
9295 // metricsCollector handles all metrics collection and reporting
9396 metricsCollector * metricsCollector
9497
95- scanRateLimiter * rate.Limiter
96- scanLimitInBytes uint64
98+ scanRateLimiter * rate.Limiter
99+ scanLimitInBytes uint64
100+ syncPointCheckpointCapMultiplier uint64
101+ syncPointLagSuppressThreshold time.Duration
102+ syncPointLagResumeThreshold time.Duration
97103}
98104
99105func newEventBroker (
@@ -113,10 +119,26 @@ func newEventBroker(
113119 sendMessageWorkerCount := config .DefaultBasicEventHandlerConcurrency
114120 scanWorkerCount := config .DefaultBasicEventHandlerConcurrency * 4
115121
116- scanTaskQueueSize := config .GetGlobalServerConfig ().Debug .EventService .ScanTaskQueueSize / scanWorkerCount
122+ eventServiceConfig := config .GetGlobalServerConfig ().Debug .EventService
123+ scanTaskQueueSize := eventServiceConfig .ScanTaskQueueSize / scanWorkerCount
117124 sendMessageQueueSize := basicChannelSize * 4
118125
119- scanLimitInBytes := config .GetGlobalServerConfig ().Debug .EventService .ScanLimitInBytes
126+ scanLimitInBytes := eventServiceConfig .ScanLimitInBytes
127+ syncPointCheckpointCapMultiplier := eventServiceConfig .SyncPointCheckpointCapMultiplier
128+ if syncPointCheckpointCapMultiplier <= 0 {
129+ syncPointCheckpointCapMultiplier = defaultSyncPointCheckpointCapMultiplier
130+ }
131+ syncPointLagSuppressThreshold := eventServiceConfig .SyncPointLagSuppressThreshold
132+ if syncPointLagSuppressThreshold <= 0 {
133+ syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold
134+ }
135+ syncPointLagResumeThreshold := eventServiceConfig .SyncPointLagResumeThreshold
136+ if syncPointLagResumeThreshold <= 0 {
137+ syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold
138+ }
139+ if syncPointLagResumeThreshold > syncPointLagSuppressThreshold {
140+ syncPointLagResumeThreshold = syncPointLagSuppressThreshold
141+ }
120142
121143 g , ctx := errgroup .WithContext (ctx )
122144 ctx , cancel := context .WithCancel (ctx )
@@ -125,22 +147,25 @@ func newEventBroker(
125147 // For now, since there is only one upstream, using the default pdClock is sufficient.
126148 pdClock := appcontext.GetService [pdutil.Clock ](appcontext .DefaultPDClock )
127149 c := & eventBroker {
128- tidbClusterID : id ,
129- eventStore : eventStore ,
130- pdClock : pdClock ,
131- mounter : event .NewMounter (tz , integrity ),
132- schemaStore : schemaStore ,
133- changefeedMap : sync.Map {},
134- dispatchers : sync.Map {},
135- tableTriggerDispatchers : sync.Map {},
136- msgSender : mc ,
137- taskChan : make ([]chan scanTask , scanWorkerCount ),
138- messageCh : make ([]chan * wrapEvent , sendMessageWorkerCount ),
139- redoMessageCh : make ([]chan * wrapEvent , sendMessageWorkerCount ),
140- cancel : cancel ,
141- g : g ,
142- scanRateLimiter : rate .NewLimiter (rate .Limit (scanLimitInBytes ), scanLimitInBytes ),
143- scanLimitInBytes : uint64 (scanLimitInBytes ),
150+ tidbClusterID : id ,
151+ eventStore : eventStore ,
152+ pdClock : pdClock ,
153+ mounter : event .NewMounter (tz , integrity ),
154+ schemaStore : schemaStore ,
155+ changefeedMap : sync.Map {},
156+ dispatchers : sync.Map {},
157+ tableTriggerDispatchers : sync.Map {},
158+ msgSender : mc ,
159+ taskChan : make ([]chan scanTask , scanWorkerCount ),
160+ messageCh : make ([]chan * wrapEvent , sendMessageWorkerCount ),
161+ redoMessageCh : make ([]chan * wrapEvent , sendMessageWorkerCount ),
162+ cancel : cancel ,
163+ g : g ,
164+ scanRateLimiter : rate .NewLimiter (rate .Limit (scanLimitInBytes ), scanLimitInBytes ),
165+ scanLimitInBytes : uint64 (scanLimitInBytes ),
166+ syncPointCheckpointCapMultiplier : uint64 (syncPointCheckpointCapMultiplier ),
167+ syncPointLagSuppressThreshold : syncPointLagSuppressThreshold ,
168+ syncPointLagResumeThreshold : syncPointLagResumeThreshold ,
144169 }
145170
146171 // Initialize metrics collector
@@ -185,7 +210,12 @@ func newEventBroker(
185210 return c .refreshMinSentResolvedTs (ctx )
186211 })
187212
188- log .Info ("new event broker created" , zap .Uint64 ("id" , id ), zap .Uint64 ("scanLimitInBytes" , c .scanLimitInBytes ))
213+ log .Info ("new event broker created" ,
214+ zap .Uint64 ("id" , id ),
215+ zap .Uint64 ("scanLimitInBytes" , c .scanLimitInBytes ),
216+ zap .Uint64 ("syncPointCheckpointCapMultiplier" , c .syncPointCheckpointCapMultiplier ),
217+ zap .Duration ("syncPointLagSuppressThreshold" , c .syncPointLagSuppressThreshold ),
218+ zap .Duration ("syncPointLagResumeThreshold" , c .syncPointLagResumeThreshold ))
189219 return c
190220}
191221
@@ -355,13 +385,17 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error {
355385 return true
356386 }
357387 stat .receivedResolvedTs .Store (endTs )
388+ boundedEndTs := c .capCommitTsEndByCheckpoint (stat , endTs )
358389 for _ , e := range ddlEvents {
390+ if e .FinishedTs > boundedEndTs {
391+ break
392+ }
359393 ep := & e
360394 c .sendDDL (ctx , remoteID , ep , stat )
361395 }
362- if endTs > startTs {
396+ if boundedEndTs > startTs {
363397 // After all the events are sent, we send the watermark to the dispatcher.
364- c .sendResolvedTs (stat , endTs )
398+ c .sendResolvedTs (stat , boundedEndTs )
365399 } else {
366400 // If there is no new ddl event, we still need to send a signal resolved-ts event to keep downstream responsive,
367401 // but do not advance the watermark here.
@@ -406,6 +440,33 @@ func (c *eventBroker) logUninitializedDispatchers(ctx context.Context) error {
406440 }
407441}
408442
443+ func (c * eventBroker ) capCommitTsEndByCheckpoint (task scanTask , commitTsEnd uint64 ) uint64 {
444+ if ! task .enableSyncPoint || task .syncPointInterval <= 0 || c .syncPointCheckpointCapMultiplier == 0 {
445+ return commitTsEnd
446+ }
447+ checkpointTs := task .checkpointTs .Load ()
448+ if checkpointTs == 0 {
449+ return commitTsEnd
450+ }
451+
452+ capDuration := time .Duration (c .syncPointCheckpointCapMultiplier ) * task .syncPointInterval
453+ checkpointCapTs := oracle .GoTimeToTS (oracle .GetTimeFromTS (checkpointTs ).Add (capDuration ))
454+ if checkpointCapTs >= commitTsEnd {
455+ return commitTsEnd
456+ }
457+
458+ log .Debug ("scan range commitTsEnd capped by checkpoint bound" ,
459+ zap .Stringer ("changefeedID" , task .changefeedStat .changefeedID ),
460+ zap .Stringer ("dispatcherID" , task .id ),
461+ zap .Uint64 ("oldCommitTsEnd" , commitTsEnd ),
462+ zap .Uint64 ("newCommitTsEnd" , checkpointCapTs ),
463+ zap .Uint64 ("checkpointTs" , checkpointTs ),
464+ zap .Duration ("syncPointInterval" , task .syncPointInterval ),
465+ zap .Uint64 ("multiplier" , c .syncPointCheckpointCapMultiplier ))
466+ metrics .EventServiceScanCappedByCheckpointCount .WithLabelValues (task .changefeedStat .changefeedID .String ()).Inc ()
467+ return checkpointCapTs
468+ }
469+
409470// getScanTaskDataRange determines the valid data range for scanning a given task.
410471// It checks various conditions (dispatcher status, DDL state, max commit ts of dml event)
411472// to decide whether scanning is needed and returns the appropriate time range.
@@ -450,6 +511,7 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang
450511 )
451512 }
452513 }
514+ dataRange .CommitTsEnd = c .capCommitTsEndByCheckpoint (task , dataRange .CommitTsEnd )
453515
454516 if dataRange .CommitTsEnd <= dataRange .CommitTsStart && hasPendingDDLEventInCurrentRange {
455517 // Global scan window base can be pinned by other lagging dispatchers.
@@ -461,6 +523,7 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang
461523 }
462524 localScanMaxTs := oracle .GoTimeToTS (oracle .GetTimeFromTS (dataRange .CommitTsStart ).Add (interval ))
463525 dataRange .CommitTsEnd = min (commitTsEndBeforeWindow , localScanMaxTs )
526+ dataRange .CommitTsEnd = c .capCommitTsEndByCheckpoint (task , dataRange .CommitTsEnd )
464527 if dataRange .CommitTsEnd > dataRange .CommitTsStart {
465528 log .Info ("scan window local advance due to pending ddl" ,
466529 zap .Stringer ("changefeedID" , task .changefeedStat .changefeedID ),
@@ -587,22 +650,74 @@ func (c *eventBroker) hasSyncPointEventsBeforeTs(ts uint64, d *dispatcherStat) b
587650 return d .enableSyncPoint && ts > d .nextSyncPoint .Load ()
588651}
589652
653+ func syncPointLagDuration (sentResolvedTs , checkpointTs uint64 ) time.Duration {
654+ if sentResolvedTs <= checkpointTs {
655+ return 0
656+ }
657+ return oracle .GetTimeFromTS (sentResolvedTs ).Sub (oracle .GetTimeFromTS (checkpointTs ))
658+ }
659+
660+ func (c * eventBroker ) shouldSuppressSyncPointEmission (d * dispatcherStat ) bool {
661+ if d == nil || c .syncPointLagSuppressThreshold <= 0 {
662+ return false
663+ }
664+
665+ sentResolvedTs := d .sentResolvedTs .Load ()
666+ checkpointTs := d .checkpointTs .Load ()
667+ lag := syncPointLagDuration (sentResolvedTs , checkpointTs )
668+ metrics .EventServiceSyncPointLagGaugeVec .WithLabelValues (d .changefeedStat .changefeedID .String ()).Set (lag .Seconds ())
669+
670+ if d .syncPointSendSuppressed .Load () {
671+ if lag <= c .syncPointLagResumeThreshold {
672+ if d .syncPointSendSuppressed .CompareAndSwap (true , false ) {
673+ log .Info ("syncpoint emission resumed" ,
674+ zap .Stringer ("changefeedID" , d .changefeedStat .changefeedID ),
675+ zap .Stringer ("dispatcherID" , d .id ),
676+ zap .Uint64 ("sentResolvedTs" , sentResolvedTs ),
677+ zap .Uint64 ("checkpointTs" , checkpointTs ),
678+ zap .Duration ("lag" , lag ),
679+ zap .Duration ("resumeThreshold" , c .syncPointLagResumeThreshold ))
680+ }
681+ return false
682+ }
683+ return true
684+ }
685+
686+ if lag > c .syncPointLagSuppressThreshold {
687+ if d .syncPointSendSuppressed .CompareAndSwap (false , true ) {
688+ log .Info ("syncpoint emission suppressed due to lag" ,
689+ zap .Stringer ("changefeedID" , d .changefeedStat .changefeedID ),
690+ zap .Stringer ("dispatcherID" , d .id ),
691+ zap .Uint64 ("sentResolvedTs" , sentResolvedTs ),
692+ zap .Uint64 ("checkpointTs" , checkpointTs ),
693+ zap .Duration ("lag" , lag ),
694+ zap .Duration ("suppressThreshold" , c .syncPointLagSuppressThreshold ),
695+ zap .Duration ("resumeThreshold" , c .syncPointLagResumeThreshold ))
696+ }
697+ return true
698+ }
699+ return false
700+ }
701+
590702// emitSyncPointEventIfNeeded emits a sync point event if the current ts is greater than the next sync point, and updates the next sync point.
591703// We need call this function every time we send a event(whether dml/ddl/resolvedTs),
592704// thus to ensure the sync point event is in correct order for each dispatcher.
593705func (c * eventBroker ) emitSyncPointEventIfNeeded (ts uint64 , d * dispatcherStat , remoteID node.ID ) {
594706 for d .enableSyncPoint && ts > d .nextSyncPoint .Load () {
595707 commitTs := d .nextSyncPoint .Load ()
596708 d .nextSyncPoint .Store (oracle .GoTimeToTS (oracle .GetTimeFromTS (commitTs ).Add (d .syncPointInterval )))
709+ if c .shouldSuppressSyncPointEmission (d ) {
710+ metrics .EventServiceSyncPointSuppressedCount .WithLabelValues (d .changefeedStat .changefeedID .String ()).Inc ()
711+ continue
712+ }
597713
598714 e := event .NewSyncPointEvent (d .id , commitTs , d .seq .Add (1 ), d .epoch )
715+ syncPointEvent := newWrapSyncPointEvent (remoteID , e )
716+ c .getMessageCh (d .messageWorkerIndex , common .IsRedoMode (d .info .GetMode ())) <- syncPointEvent
599717 log .Debug ("send syncpoint event to dispatcher" ,
600718 zap .Stringer ("changefeedID" , d .changefeedStat .changefeedID ),
601719 zap .Stringer ("dispatcherID" , d .id ), zap .Int64 ("tableID" , d .info .GetTableSpan ().GetTableID ()),
602720 zap .Uint64 ("commitTs" , e .GetCommitTs ()), zap .Uint64 ("seq" , e .GetSeq ()))
603-
604- syncPointEvent := newWrapSyncPointEvent (remoteID , e )
605- c .getMessageCh (d .messageWorkerIndex , common .IsRedoMode (d .info .GetMode ())) <- syncPointEvent
606721 }
607722}
608723
0 commit comments