Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,060 changes: 856 additions & 204 deletions heartbeatpb/heartbeat.pb.go

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions heartbeatpb/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,22 @@ message MaintainerHeartbeat {
repeated MaintainerStatus statuses = 1;
}

// DrainProgress is maintainer-observed progress for dispatcher drain on one target.
message DrainProgress {
// target_node_id is the node currently being evacuated for this changefeed.
// It matches the active dispatcher drain target seen by this maintainer.
string target_node_id = 1;
// target_epoch is the epoch attached to target_node_id.
// Reviewers should read (target_node_id, target_epoch) as one monotonic target snapshot.
uint64 target_epoch = 2;
// target_dispatcher_count is the current number of this changefeed's
// dispatchers still hosted on target_node_id in the maintainer snapshot.
uint32 target_dispatcher_count = 3;
// target_inflight_drain_move_count is the number of unfinished move
// operators already evacuating dispatchers away from target_node_id.
uint32 target_inflight_drain_move_count = 4;
}

message MaintainerStatus {
ChangefeedID changefeedID = 1;
string feed_state = 2;
Expand All @@ -147,6 +163,9 @@ message MaintainerStatus {
repeated RunningError err = 5;
bool bootstrap_done = 6;
uint64 lastSyncedTs = 7; // last synced ts of all tables in the changefeed, used in /:changefeed_id/synced API
// drain_progress reports the active dispatcher drain target observed by this maintainer.
// Nil means no active dispatcher drain target.
DrainProgress drain_progress = 8;
}

// NodeLiveness is node-reported liveness.
Expand All @@ -168,6 +187,10 @@ enum NodeLiveness {
message NodeHeartbeat {
NodeLiveness liveness = 1;
uint64 node_epoch = 2;
// dispatcher_drain_target_* reports the manager-level dispatcher drain target
// currently applied on this node. Empty target means the drain target is clear.
string dispatcher_drain_target_node_id = 3;
uint64 dispatcher_drain_target_epoch = 4;
}

// SetNodeLivenessRequest asks a node to transition its local liveness.
Expand All @@ -182,6 +205,13 @@ message SetNodeLivenessResponse {
uint64 node_epoch = 2;
}

// SetDispatcherDrainTargetRequest asks a maintainer manager to apply the dispatcher drain target.
// target_node_id can be empty to clear the current target.
message SetDispatcherDrainTargetRequest {
string target_node_id = 1;
uint64 target_epoch = 2;
}

message CoordinatorBootstrapRequest {
int64 version = 1;
}
Expand Down
6 changes: 6 additions & 0 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,12 @@ func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus {
return status
}

// SetDispatcherDrainTarget applies the newest drain target to this maintainer.
func (m *Maintainer) SetDispatcherDrainTarget(target node.ID, epoch uint64) {
m.controller.SetDispatcherDrainTarget(target, epoch)
m.statusChanged.Store(true)
}

func (m *Maintainer) initialize() error {
start := time.Now()
log.Info("start to initialize changefeed maintainer",
Expand Down
22 changes: 20 additions & 2 deletions maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/maintainer/operator"
"github.com/pingcap/ticdc/maintainer/replica"
mscheduler "github.com/pingcap/ticdc/maintainer/scheduler"
"github.com/pingcap/ticdc/maintainer/span"
"github.com/pingcap/ticdc/maintainer/split"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/scheduler"
pkgscheduler "github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils/threadpool"
Expand All @@ -43,7 +44,7 @@ type Controller struct {
bootstrapped bool
startTs uint64

schedulerController *scheduler.Controller
schedulerController *pkgscheduler.Controller
operatorController *operator.Controller
redoOperatorController *operator.Controller
spanController *span.Controller
Expand All @@ -70,6 +71,10 @@ type Controller struct {

keyspaceMeta common.KeyspaceMeta
enableRedo bool

// drainState keeps the latest dispatcher drain target visible to this
// maintainer even before drain-aware schedulers are introduced.
drainState *mscheduler.DrainState
}

func NewController(changefeedID common.ChangeFeedID,
Expand Down Expand Up @@ -135,6 +140,7 @@ func NewController(changefeedID common.ChangeFeedID,
splitter: splitter,
keyspaceMeta: keyspaceMeta,
enableRedo: enableRedo,
drainState: mscheduler.NewDrainState(),
}
}

Expand Down Expand Up @@ -261,3 +267,15 @@ func (c *Controller) GetMinRedoCheckpointTs(minCheckpointTs uint64) uint64 {
minCheckpointTsForSpan := c.redoSpanController.GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs)
return min(minCheckpointTsForOperator, minCheckpointTsForSpan)
}

// SetDispatcherDrainTarget applies the newest drain target visible to this
// changefeed. Older epochs are ignored so local state does not regress.
func (c *Controller) SetDispatcherDrainTarget(target node.ID, epoch uint64) {
c.drainState.SetDispatcherDrainTarget(target, epoch)
}

// getDispatcherDrainTarget returns the current drain target snapshot used by
// status reporting and later drain-aware schedulers.
func (c *Controller) getDispatcherDrainTarget() (node.ID, uint64) {
return c.drainState.DispatcherDrainTarget()
}
Loading
Loading