@@ -497,9 +497,21 @@ func (c *eventBroker) nudgeSyncPointCommitIfNeeded(d *dispatcherStat) bool {
497497 if ! c .shouldNudgeSyncPointCommit (d ) {
498498 return false
499499 }
500+ commitTs := d .nextSyncPoint .Load ()
501+ watermark := d .sentResolvedTs .Load ()
502+ preparingTs := d .changefeedStat .getSyncPointPreparingTs ()
503+ inFlightTs := d .changefeedStat .syncPointInFlightTs .Load ()
504+ log .Debug ("nudge syncpoint commit with resolved event" ,
505+ zap .Stringer ("changefeedID" , d .changefeedStat .changefeedID ),
506+ zap .Stringer ("dispatcherID" , d .id ),
507+ zap .Uint64 ("watermark" , watermark ),
508+ zap .Uint64 ("nextSyncPointTs" , commitTs ),
509+ zap .Uint64 ("preparingTs" , preparingTs ),
510+ zap .Uint64 ("inFlightTs" , inFlightTs ),
511+ zap .Bool ("inCommitStage" , d .changefeedStat .isSyncPointInCommitStage (commitTs )))
500512 // Resend resolved-ts at current watermark to trigger syncpoint emission in commit stage,
501513 // even when there is no fresh upstream event to drive a new scan.
502- c .sendResolvedTs (d , d . sentResolvedTs . Load () )
514+ c .sendResolvedTs (d , watermark )
503515 return true
504516}
505517
@@ -722,7 +734,30 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang
722734
723735 if dataRange .CommitTsEnd <= dataRange .CommitTsStart {
724736 updateMetricEventServiceSkipResolvedTsCount (task .info .GetMode ())
725- if c .nudgeSyncPointCommitIfNeeded (task ) {
737+ nextSyncPointTs := uint64 (0 )
738+ preparingTs := uint64 (0 )
739+ inFlightTs := uint64 (0 )
740+ if task .enableSyncPoint {
741+ nextSyncPointTs = task .nextSyncPoint .Load ()
742+ preparingTs = task .changefeedStat .getSyncPointPreparingTs ()
743+ inFlightTs = task .changefeedStat .syncPointInFlightTs .Load ()
744+ }
745+ nudged := c .nudgeSyncPointCommitIfNeeded (task )
746+ log .Debug ("scan range empty after capping" ,
747+ zap .Stringer ("changefeedID" , task .changefeedStat .changefeedID ),
748+ zap .Stringer ("dispatcherID" , task .id ),
749+ zap .Uint64 ("startTs" , dataRange .CommitTsStart ),
750+ zap .Uint64 ("endTs" , dataRange .CommitTsEnd ),
751+ zap .Uint64 ("scanMaxTs" , scanMaxTs ),
752+ zap .Uint64 ("ddlResolvedTs" , ddlState .ResolvedTs ),
753+ zap .Uint64 ("ddlCommitTs" , ddlState .MaxEventCommitTs ),
754+ zap .Bool ("hasPendingDDLEventInCurrentRange" , hasPendingDDLEventInCurrentRange ),
755+ zap .Uint64 ("nextSyncPointTs" , nextSyncPointTs ),
756+ zap .Uint64 ("preparingTs" , preparingTs ),
757+ zap .Uint64 ("inFlightTs" , inFlightTs ),
758+ zap .Bool ("nudgedSyncPointCommit" , nudged ),
759+ )
760+ if nudged {
726761 return false , common.DataRange {}
727762 }
728763 // Scan range can become empty after applying capping (for example, scan window).
@@ -756,15 +791,41 @@ func (c *eventBroker) capCommitTsEndBySyncPoint(task scanTask, commitTsEnd uint6
756791 if ! task .enableSyncPoint {
757792 return commitTsEnd
758793 }
794+ originalCommitTsEnd := commitTsEnd
759795 c .fastForwardSyncPointIfNeeded (task )
796+
797+ cappedByNextSyncPoint := false
760798 nextSyncPoint := task .nextSyncPoint .Load ()
761799 if nextSyncPoint > 0 && commitTsEnd > nextSyncPoint {
762800 task .changefeedStat .tryEnterSyncPointPrepare (nextSyncPoint )
763801 commitTsEnd = nextSyncPoint
802+ cappedByNextSyncPoint = true
764803 }
804+
805+ cappedByPreparingTs := false
765806 preparingTs := task .changefeedStat .getSyncPointPreparingTs ()
766- if preparingTs > 0 && ! task .changefeedStat .isSyncPointInCommitStage (preparingTs ) {
767- commitTsEnd = min (commitTsEnd , preparingTs )
807+ inCommitStage := false
808+ if preparingTs > 0 {
809+ inCommitStage = task .changefeedStat .isSyncPointInCommitStage (preparingTs )
810+ if ! inCommitStage {
811+ newCommitTsEnd := min (commitTsEnd , preparingTs )
812+ cappedByPreparingTs = newCommitTsEnd < commitTsEnd
813+ commitTsEnd = newCommitTsEnd
814+ }
815+ }
816+
817+ if commitTsEnd < originalCommitTsEnd {
818+ log .Debug ("scan range commitTsEnd capped by syncpoint" ,
819+ zap .Stringer ("changefeedID" , task .changefeedStat .changefeedID ),
820+ zap .Stringer ("dispatcherID" , task .id ),
821+ zap .Uint64 ("oldCommitTsEnd" , originalCommitTsEnd ),
822+ zap .Uint64 ("newCommitTsEnd" , commitTsEnd ),
823+ zap .Bool ("cappedByNextSyncPoint" , cappedByNextSyncPoint ),
824+ zap .Bool ("cappedByPreparingTs" , cappedByPreparingTs ),
825+ zap .Uint64 ("nextSyncPointTs" , nextSyncPoint ),
826+ zap .Uint64 ("preparingTs" , preparingTs ),
827+ zap .Bool ("inCommitStage" , inCommitStage ),
828+ )
768829 }
769830 return commitTsEnd
770831}
@@ -1536,6 +1597,24 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error {
15361597
15371598 newStat := newDispatcherStat (dispatcherInfo , uint64 (len (c .taskChan )), uint64 (len (c .messageCh )), tableInfo , status )
15381599 newStat .copyStatistics (oldStat )
1600+ if newStat .enableSyncPoint {
1601+ oldNextSyncPoint := oldStat .nextSyncPoint .Load ()
1602+ newNextSyncPoint := newStat .nextSyncPoint .Load ()
1603+ if newNextSyncPoint > 0 && oldNextSyncPoint > 0 && newNextSyncPoint < oldNextSyncPoint {
1604+ log .Warn ("dispatcher syncpoint moved backward after reset" ,
1605+ zap .Stringer ("changefeedID" , changefeedID ),
1606+ zap .Stringer ("dispatcherID" , dispatcherID ),
1607+ zap .Uint64 ("oldEpoch" , oldStat .epoch ),
1608+ zap .Uint64 ("newEpoch" , newStat .epoch ),
1609+ zap .Uint64 ("oldStartTs" , oldStat .info .GetStartTs ()),
1610+ zap .Uint64 ("newStartTs" , dispatcherInfo .GetStartTs ()),
1611+ zap .Uint64 ("oldNextSyncPointTs" , oldNextSyncPoint ),
1612+ zap .Uint64 ("newNextSyncPointTs" , newNextSyncPoint ),
1613+ zap .Uint64 ("preparingTs" , status .getSyncPointPreparingTs ()),
1614+ zap .Uint64 ("inFlightTs" , status .syncPointInFlightTs .Load ()),
1615+ )
1616+ }
1617+ }
15391618
15401619 for {
15411620 if statPtr .CompareAndSwap (oldStat , newStat ) {
0 commit comments