Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions downstreamadapter/eventcollector/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,21 @@ func (d *dispatcherStat) doReset(serverID node.ID, resetTs uint64) {
}
// remove the dispatcher from the dynamic stream
resetRequest := d.newDispatcherResetRequest(d.eventCollector.getLocalServerID().String(), resetTs, epoch)
if d.target.EnableSyncPoint() {
req := resetRequest.DispatcherRequest
log.Info("send reset dispatcher syncpoint request",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.Stringer("eventServiceID", serverID),
zap.Uint64("epoch", epoch),
zap.Uint64("resetTs", resetTs),
zap.Uint64("checkpointTs", d.target.GetCheckpointTs()),
zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()),
zap.Uint64("syncPointTs", req.SyncPointTs),
zap.Uint64("syncPointIntervalSeconds", req.SyncPointInterval),
zap.Bool("skipSyncpointAtStartTs", d.target.GetSkipSyncpointAtStartTs()),
)
}
Comment on lines +227 to +241
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new Info log added here is redundant with the existing Info log on line 244. Both logs are triggered during a dispatcher reset when syncpoint is enabled, and they share several fields (changefeedID, dispatcher, eventServiceID, epoch, resetTs). It is recommended to merge these logs into a single Info log with optional fields or change this new log to Debug level to reduce log noise.

Suggested change
if d.target.EnableSyncPoint() {
req := resetRequest.DispatcherRequest
log.Info("send reset dispatcher syncpoint request",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.Stringer("eventServiceID", serverID),
zap.Uint64("epoch", epoch),
zap.Uint64("resetTs", resetTs),
zap.Uint64("checkpointTs", d.target.GetCheckpointTs()),
zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()),
zap.Uint64("syncPointTs", req.SyncPointTs),
zap.Uint64("syncPointIntervalSeconds", req.SyncPointInterval),
zap.Bool("skipSyncpointAtStartTs", d.target.GetSkipSyncpointAtStartTs()),
)
}
if d.target.EnableSyncPoint() {
req := resetRequest.DispatcherRequest
log.Debug("send reset dispatcher syncpoint request",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.Stringer("eventServiceID", serverID),
zap.Uint64("epoch", epoch),
zap.Uint64("resetTs", resetTs),
zap.Uint64("checkpointTs", d.target.GetCheckpointTs()),
zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()),
zap.Uint64("syncPointTs", req.SyncPointTs),
zap.Uint64("syncPointIntervalSeconds", req.SyncPointInterval),
zap.Bool("skipSyncpointAtStartTs", d.target.GetSkipSyncpointAtStartTs()),
)
}

msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, resetRequest)
d.eventCollector.enqueueMessageForSend(msg)
log.Info("send reset dispatcher request to event service",
Expand Down
131 changes: 123 additions & 8 deletions pkg/eventservice/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,14 @@ func (c *changefeedStatus) getSyncPointPreparingTs() uint64 {
return c.syncPointPreparingTs.Load()
}

func (c *changefeedStatus) isDispatcherStaleForSyncpoint(dispatcher *dispatcherStat, now time.Time) bool {
lastHeartbeatTime := dispatcher.lastReceivedHeartbeatTime.Load()
if lastHeartbeatTime <= 0 {
return false
}
return now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold
}

// tryEnterSyncPointPrepare tries to enter syncpoint prepare stage for candidateTs.
// If a prepare ts already exists, the same ts is accepted, and a smaller ts can
// replace it before commit stage starts.
Expand All @@ -504,6 +512,11 @@ func (c *changefeedStatus) tryEnterSyncPointPrepare(candidateTs uint64) bool {
case c.syncPointInFlightTs.Load() != 0:
return false
case candidateTs < preparingTs:
log.Info("syncpoint prepare ts moved backward",
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("oldPreparingTs", preparingTs),
zap.Uint64("newPreparingTs", candidateTs),
zap.Uint64("inFlightTs", c.syncPointInFlightTs.Load()))
c.syncPointPreparingTs.Store(candidateTs)
return true
default:
Expand All @@ -529,21 +542,62 @@ func (c *changefeedStatus) tryPromoteSyncPointToCommitIfReady() {
return
}

now := time.Now()
hasEligible := false
ready := true
blockingFound := false
var (
blockingDispatcherID common.DispatcherID
blockingSentResolvedTs uint64
blockingCheckpointTs uint64
blockingNextSyncPoint uint64
blockingSeq uint64
blockingEpoch uint64
)
c.dispatchers.Range(func(_ any, value any) bool {
dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load()
if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 {
return true
}
if c.isDispatcherStaleForSyncpoint(dispatcher, now) {
return true
}
hasEligible = true
if dispatcher.sentResolvedTs.Load() < preparingTs {
sentResolvedTs := dispatcher.sentResolvedTs.Load()
if sentResolvedTs < preparingTs {
ready = false
if !blockingFound {
blockingFound = true
blockingDispatcherID = dispatcher.id
blockingSentResolvedTs = sentResolvedTs
blockingCheckpointTs = dispatcher.checkpointTs.Load()
blockingNextSyncPoint = dispatcher.nextSyncPoint.Load()
blockingSeq = dispatcher.seq.Load()
blockingEpoch = dispatcher.epoch
}
return false
}
return true
})
if !hasEligible || !ready {
if !hasEligible {
return
}
if !ready {
fields := []zap.Field{
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("preparingTs", preparingTs),
}
if blockingFound {
fields = append(fields,
zap.Stringer("blockingDispatcherID", blockingDispatcherID),
zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs),
zap.Uint64("blockingCheckpointTs", blockingCheckpointTs),
zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPoint),
zap.Uint64("blockingSeq", blockingSeq),
zap.Uint64("blockingEpoch", blockingEpoch),
)
}
log.Debug("syncpoint prepare stage blocked by dispatcher", fields...)
return
}
Comment on lines +585 to 602
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The construction of the fields slice and the subsequent append calls happen on every call to tryPromoteSyncPointToCommitIfReady when the syncpoint is blocked, even if the log level is higher than Debug. Since this function is called periodically (every 1 second) for every changefeed, this can lead to unnecessary allocations and GC pressure. Consider wrapping the logging logic in a check for the Debug level.

Suggested change
if !ready {
fields := []zap.Field{
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("preparingTs", preparingTs),
}
if blockingFound {
fields = append(fields,
zap.Stringer("blockingDispatcherID", blockingDispatcherID),
zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs),
zap.Uint64("blockingCheckpointTs", blockingCheckpointTs),
zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPoint),
zap.Uint64("blockingSeq", blockingSeq),
zap.Uint64("blockingEpoch", blockingEpoch),
)
}
log.Debug("syncpoint prepare stage blocked by dispatcher", fields...)
return
}
if !ready {
if log.GetLevel() <= zap.DebugLevel {
fields := []zap.Field{
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("preparingTs", preparingTs),
}
if blockingFound {
fields = append(fields,
zap.Stringer("blockingDispatcherID", blockingDispatcherID),
zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs),
zap.Uint64("blockingCheckpointTs", blockingCheckpointTs),
zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPoint),
zap.Uint64("blockingSeq", blockingSeq),
zap.Uint64("blockingEpoch", blockingEpoch),
)
}
log.Debug("syncpoint prepare stage blocked by dispatcher", fields...)
}
return
}


Expand All @@ -563,27 +617,88 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() {
return
}

now := time.Now()
hasEligible := false
canAdvance := true
blockingFound := false
blockingReason := ""
var (
blockingDispatcherID common.DispatcherID
blockingNextSyncPointTs uint64
blockingCheckpointTs uint64
blockingSentResolvedTs uint64
blockingDispatcherSeq uint64
blockingDispatcherEpoch uint64
)
c.dispatchers.Range(func(_ any, value any) bool {
dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load()
if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 {
return true
}
if dispatcher.nextSyncPoint.Load() <= inFlightTs {
if c.isDispatcherStaleForSyncpoint(dispatcher, now) {
return true
}
hasEligible = true
nextSyncPointTs := dispatcher.nextSyncPoint.Load()
checkpointTs := dispatcher.checkpointTs.Load()
if nextSyncPointTs <= inFlightTs {
canAdvance = false
if !blockingFound {
blockingFound = true
blockingReason = "nextSyncPointNotAdvanced"
blockingDispatcherID = dispatcher.id
blockingNextSyncPointTs = nextSyncPointTs
blockingCheckpointTs = checkpointTs
blockingSentResolvedTs = dispatcher.sentResolvedTs.Load()
blockingDispatcherSeq = dispatcher.seq.Load()
blockingDispatcherEpoch = dispatcher.epoch
}
return false
}
if dispatcher.checkpointTs.Load() <= inFlightTs {
if checkpointTs <= inFlightTs {
canAdvance = false
if !blockingFound {
blockingFound = true
blockingReason = "checkpointNotAdvanced"
blockingDispatcherID = dispatcher.id
blockingNextSyncPointTs = nextSyncPointTs
blockingCheckpointTs = checkpointTs
blockingSentResolvedTs = dispatcher.sentResolvedTs.Load()
blockingDispatcherSeq = dispatcher.seq.Load()
blockingDispatcherEpoch = dispatcher.epoch
}
return false
}
return true
})

if canAdvance {
c.syncPointInFlightTs.Store(0)
if c.syncPointPreparingTs.Load() == inFlightTs {
c.syncPointPreparingTs.Store(0)
if !hasEligible {
return
}

if !canAdvance {
fields := []zap.Field{
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("inFlightTs", inFlightTs),
zap.Uint64("preparingTs", c.syncPointPreparingTs.Load()),
}
if blockingFound {
fields = append(fields,
zap.String("blockingReason", blockingReason),
zap.Stringer("blockingDispatcherID", blockingDispatcherID),
zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPointTs),
zap.Uint64("blockingCheckpointTs", blockingCheckpointTs),
zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs),
zap.Uint64("blockingSeq", blockingDispatcherSeq),
zap.Uint64("blockingEpoch", blockingDispatcherEpoch),
)
}
log.Debug("syncpoint commit stage blocked by dispatcher", fields...)
return
}
Comment on lines +679 to +698
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the previous comment, the fields slice is allocated and populated on every call when the commit stage is blocked, regardless of the active log level. This should be optimized to avoid unnecessary allocations in this periodic task.

	if !canAdvance {
		if log.GetLevel() <= zap.DebugLevel {
			fields := []zap.Field{
				zap.Stringer("changefeedID", c.changefeedID),
				zap.Uint64("inFlightTs", inFlightTs),
				zap.Uint64("preparingTs", c.syncPointPreparingTs.Load()),
			}
			if blockingFound {
				fields = append(fields,
					zap.String("blockingReason", blockingReason),
					zap.Stringer("blockingDispatcherID", blockingDispatcherID),
					zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPointTs),
					zap.Uint64("blockingCheckpointTs", blockingCheckpointTs),
					zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs),
					zap.Uint64("blockingSeq", blockingDispatcherSeq),
					zap.Uint64("blockingEpoch", blockingDispatcherEpoch),
				)
			}
			log.Debug("syncpoint commit stage blocked by dispatcher", fields...)
		}
		return
	}


c.syncPointInFlightTs.Store(0)
if c.syncPointPreparingTs.Load() == inFlightTs {
c.syncPointPreparingTs.Store(0)
}
}
60 changes: 60 additions & 0 deletions pkg/eventservice/dispatcher_stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,63 @@ func TestSyncPointPrepareCannotLowerAfterPromote(t *testing.T) {
require.Equal(t, preparingTs, status.syncPointPreparingTs.Load())
require.Equal(t, preparingTs, status.syncPointInFlightTs.Load())
}

func TestSyncPointPreparePromotionSkipsStaleDispatcher(t *testing.T) {
t.Parallel()

status := newChangefeedStatus(common.NewChangefeedID4Test("default", "syncpoint-stale-prepare"), 10*time.Second)
preparingTs := uint64(200)
status.syncPointPreparingTs.Store(preparingTs)

fresh := &dispatcherStat{}
fresh.seq.Store(1)
fresh.sentResolvedTs.Store(preparingTs)
fresh.lastReceivedHeartbeatTime.Store(time.Now().Unix())
freshPtr := &atomic.Pointer[dispatcherStat]{}
freshPtr.Store(fresh)
status.addDispatcher(common.DispatcherID{Low: 1, High: 1}, freshPtr)

stale := &dispatcherStat{}
stale.seq.Store(1)
stale.sentResolvedTs.Store(preparingTs - 1)
stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix())
stalePtr := &atomic.Pointer[dispatcherStat]{}
stalePtr.Store(stale)
status.addDispatcher(common.DispatcherID{Low: 2, High: 2}, stalePtr)

status.tryPromoteSyncPointToCommitIfReady()
require.True(t, status.isSyncPointInCommitStage(preparingTs))
}

func TestSyncPointCommitFinishSkipsStaleDispatcher(t *testing.T) {
t.Parallel()

status := newChangefeedStatus(common.NewChangefeedID4Test("default", "syncpoint-stale-commit"), 10*time.Second)
inFlightTs := uint64(300)
status.syncPointPreparingTs.Store(inFlightTs)
status.syncPointInFlightTs.Store(inFlightTs)

fresh := &dispatcherStat{}
fresh.seq.Store(1)
fresh.nextSyncPoint.Store(inFlightTs + 100)
fresh.checkpointTs.Store(inFlightTs + 100)
fresh.sentResolvedTs.Store(inFlightTs + 100)
fresh.lastReceivedHeartbeatTime.Store(time.Now().Unix())
freshPtr := &atomic.Pointer[dispatcherStat]{}
freshPtr.Store(fresh)
status.addDispatcher(common.DispatcherID{Low: 3, High: 3}, freshPtr)

stale := &dispatcherStat{}
stale.seq.Store(1)
stale.nextSyncPoint.Store(inFlightTs)
stale.checkpointTs.Store(inFlightTs)
stale.sentResolvedTs.Store(inFlightTs)
stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix())
stalePtr := &atomic.Pointer[dispatcherStat]{}
stalePtr.Store(stale)
status.addDispatcher(common.DispatcherID{Low: 4, High: 4}, stalePtr)

status.tryFinishSyncPointCommitIfAllEmitted()
require.Equal(t, uint64(0), status.syncPointInFlightTs.Load())
require.Equal(t, uint64(0), status.syncPointPreparingTs.Load())
}
87 changes: 83 additions & 4 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,21 @@ func (c *eventBroker) nudgeSyncPointCommitIfNeeded(d *dispatcherStat) bool {
if !c.shouldNudgeSyncPointCommit(d) {
return false
}
commitTs := d.nextSyncPoint.Load()
watermark := d.sentResolvedTs.Load()
preparingTs := d.changefeedStat.getSyncPointPreparingTs()
inFlightTs := d.changefeedStat.syncPointInFlightTs.Load()
log.Debug("nudge syncpoint commit with resolved event",
zap.Stringer("changefeedID", d.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", d.id),
zap.Uint64("watermark", watermark),
zap.Uint64("nextSyncPointTs", commitTs),
zap.Uint64("preparingTs", preparingTs),
zap.Uint64("inFlightTs", inFlightTs),
zap.Bool("inCommitStage", d.changefeedStat.isSyncPointInCommitStage(commitTs)))
// Resend resolved-ts at current watermark to trigger syncpoint emission in commit stage,
// even when there is no fresh upstream event to drive a new scan.
c.sendResolvedTs(d, d.sentResolvedTs.Load())
c.sendResolvedTs(d, watermark)
return true
}

Expand Down Expand Up @@ -722,7 +734,30 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang

if dataRange.CommitTsEnd <= dataRange.CommitTsStart {
updateMetricEventServiceSkipResolvedTsCount(task.info.GetMode())
if c.nudgeSyncPointCommitIfNeeded(task) {
nextSyncPointTs := uint64(0)
preparingTs := uint64(0)
inFlightTs := uint64(0)
if task.enableSyncPoint {
nextSyncPointTs = task.nextSyncPoint.Load()
preparingTs = task.changefeedStat.getSyncPointPreparingTs()
inFlightTs = task.changefeedStat.syncPointInFlightTs.Load()
}
nudged := c.nudgeSyncPointCommitIfNeeded(task)
log.Debug("scan range empty after capping",
zap.Stringer("changefeedID", task.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", task.id),
zap.Uint64("startTs", dataRange.CommitTsStart),
zap.Uint64("endTs", dataRange.CommitTsEnd),
zap.Uint64("scanMaxTs", scanMaxTs),
zap.Uint64("ddlResolvedTs", ddlState.ResolvedTs),
zap.Uint64("ddlCommitTs", ddlState.MaxEventCommitTs),
zap.Bool("hasPendingDDLEventInCurrentRange", hasPendingDDLEventInCurrentRange),
zap.Uint64("nextSyncPointTs", nextSyncPointTs),
zap.Uint64("preparingTs", preparingTs),
zap.Uint64("inFlightTs", inFlightTs),
zap.Bool("nudgedSyncPointCommit", nudged),
)
Comment on lines +745 to +759
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This log.Debug call is located in the scan path and includes many fields. While zap fields are relatively cheap to construct, the variadic arguments still cause a slice allocation on every call, even when Debug logging is disabled. Given this can be triggered frequently for caught-up or capped dispatchers, it should be wrapped in a level check to improve efficiency.

		nudged := c.nudgeSyncPointCommitIfNeeded(task)
		if log.GetLevel() <= zap.DebugLevel {
			log.Debug("scan range empty after capping",
				zap.Stringer("changefeedID", task.changefeedStat.changefeedID),
				zap.Stringer("dispatcherID", task.id),
				zap.Uint64("startTs", dataRange.CommitTsStart),
				zap.Uint64("endTs", dataRange.CommitTsEnd),
				zap.Uint64("scanMaxTs", scanMaxTs),
				zap.Uint64("ddlResolvedTs", ddlState.ResolvedTs),
				zap.Uint64("ddlCommitTs", ddlState.MaxEventCommitTs),
				zap.Bool("hasPendingDDLEventInCurrentRange", hasPendingDDLEventInCurrentRange),
				zap.Uint64("nextSyncPointTs", nextSyncPointTs),
				zap.Uint64("preparingTs", preparingTs),
				zap.Uint64("inFlightTs", inFlightTs),
				zap.Bool("nudgedSyncPointCommit", nudged),
			)
		}

if nudged {
return false, common.DataRange{}
}
// Scan range can become empty after applying capping (for example, scan window).
Expand Down Expand Up @@ -756,15 +791,41 @@ func (c *eventBroker) capCommitTsEndBySyncPoint(task scanTask, commitTsEnd uint6
if !task.enableSyncPoint {
return commitTsEnd
}
originalCommitTsEnd := commitTsEnd
c.fastForwardSyncPointIfNeeded(task)

cappedByNextSyncPoint := false
nextSyncPoint := task.nextSyncPoint.Load()
if nextSyncPoint > 0 && commitTsEnd > nextSyncPoint {
task.changefeedStat.tryEnterSyncPointPrepare(nextSyncPoint)
commitTsEnd = nextSyncPoint
cappedByNextSyncPoint = true
}

cappedByPreparingTs := false
preparingTs := task.changefeedStat.getSyncPointPreparingTs()
if preparingTs > 0 && !task.changefeedStat.isSyncPointInCommitStage(preparingTs) {
commitTsEnd = min(commitTsEnd, preparingTs)
inCommitStage := false
if preparingTs > 0 {
inCommitStage = task.changefeedStat.isSyncPointInCommitStage(preparingTs)
if !inCommitStage {
newCommitTsEnd := min(commitTsEnd, preparingTs)
cappedByPreparingTs = newCommitTsEnd < commitTsEnd
commitTsEnd = newCommitTsEnd
}
}

if commitTsEnd < originalCommitTsEnd {
log.Debug("scan range commitTsEnd capped by syncpoint",
zap.Stringer("changefeedID", task.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", task.id),
zap.Uint64("oldCommitTsEnd", originalCommitTsEnd),
zap.Uint64("newCommitTsEnd", commitTsEnd),
zap.Bool("cappedByNextSyncPoint", cappedByNextSyncPoint),
zap.Bool("cappedByPreparingTs", cappedByPreparingTs),
zap.Uint64("nextSyncPointTs", nextSyncPoint),
zap.Uint64("preparingTs", preparingTs),
zap.Bool("inCommitStage", inCommitStage),
)
}
return commitTsEnd
}
Expand Down Expand Up @@ -1536,6 +1597,24 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error {

newStat := newDispatcherStat(dispatcherInfo, uint64(len(c.taskChan)), uint64(len(c.messageCh)), tableInfo, status)
newStat.copyStatistics(oldStat)
if newStat.enableSyncPoint {
oldNextSyncPoint := oldStat.nextSyncPoint.Load()
newNextSyncPoint := newStat.nextSyncPoint.Load()
if newNextSyncPoint > 0 && oldNextSyncPoint > 0 && newNextSyncPoint < oldNextSyncPoint {
log.Warn("dispatcher syncpoint moved backward after reset",
zap.Stringer("changefeedID", changefeedID),
zap.Stringer("dispatcherID", dispatcherID),
zap.Uint64("oldEpoch", oldStat.epoch),
zap.Uint64("newEpoch", newStat.epoch),
zap.Uint64("oldStartTs", oldStat.info.GetStartTs()),
zap.Uint64("newStartTs", dispatcherInfo.GetStartTs()),
zap.Uint64("oldNextSyncPointTs", oldNextSyncPoint),
zap.Uint64("newNextSyncPointTs", newNextSyncPoint),
zap.Uint64("preparingTs", status.getSyncPointPreparingTs()),
zap.Uint64("inFlightTs", status.syncPointInFlightTs.Load()),
)
}
}

for {
if statPtr.CompareAndSwap(oldStat, newStat) {
Expand Down
Loading
Loading