Skip to content

Commit 2b2b7ec

Browse files
committed
maintainer,heartbeatpb: add drain target plumbing
1 parent 567506c commit 2b2b7ec

File tree

10 files changed

+1644
-442
lines changed

10 files changed

+1644
-442
lines changed

heartbeatpb/heartbeat.pb.go

Lines changed: 848 additions & 204 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

heartbeatpb/heartbeat.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ message MaintainerHeartbeat {
139139
repeated MaintainerStatus statuses = 1;
140140
}
141141

142+
// DrainProgress is maintainer-observed progress for dispatcher drain on one target.
143+
message DrainProgress {
144+
string target_node_id = 1;
145+
uint64 target_epoch = 2;
146+
uint32 target_dispatcher_count = 3;
147+
uint32 target_inflight_drain_move_count = 4;
148+
}
149+
142150
message MaintainerStatus {
143151
ChangefeedID changefeedID = 1;
144152
string feed_state = 2;
@@ -147,6 +155,9 @@ message MaintainerStatus {
147155
repeated RunningError err = 5;
148156
bool bootstrap_done = 6;
149157
uint64 lastSyncedTs = 7; // last synced ts of all tables in the changefeed, used in /:changefeed_id/synced API
158+
// drain_progress reports the active dispatcher drain target observed by this maintainer.
159+
// Nil means no active dispatcher drain target.
160+
DrainProgress drain_progress = 8;
150161
}
151162

152163
// NodeLiveness is node-reported liveness.
@@ -168,6 +179,10 @@ enum NodeLiveness {
168179
message NodeHeartbeat {
169180
NodeLiveness liveness = 1;
170181
uint64 node_epoch = 2;
182+
// dispatcher_drain_target_* reports the manager-level dispatcher drain target
183+
// currently applied on this node. Empty target means the drain target is clear.
184+
string dispatcher_drain_target_node_id = 3;
185+
uint64 dispatcher_drain_target_epoch = 4;
171186
}
172187

173188
// SetNodeLivenessRequest asks a node to transition its local liveness.
@@ -182,6 +197,13 @@ message SetNodeLivenessResponse {
182197
uint64 node_epoch = 2;
183198
}
184199

200+
// SetDispatcherDrainTargetRequest asks a maintainer manager to apply the dispatcher drain target.
201+
// target_node_id can be empty to clear the current target.
202+
message SetDispatcherDrainTargetRequest {
203+
string target_node_id = 1;
204+
uint64 target_epoch = 2;
205+
}
206+
185207
message CoordinatorBootstrapRequest {
186208
int64 version = 1;
187209
}

maintainer/maintainer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,12 @@ func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus {
383383
return status
384384
}
385385

386+
// SetDispatcherDrainTarget applies the newest drain target to this maintainer.
387+
func (m *Maintainer) SetDispatcherDrainTarget(target node.ID, epoch uint64) {
388+
m.controller.SetDispatcherDrainTarget(target, epoch)
389+
m.statusChanged.Store(true)
390+
}
391+
386392
func (m *Maintainer) initialize() error {
387393
start := time.Now()
388394
log.Info("start to initialize changefeed maintainer",

maintainer/maintainer_controller.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import (
2121
"github.com/pingcap/ticdc/heartbeatpb"
2222
"github.com/pingcap/ticdc/maintainer/operator"
2323
"github.com/pingcap/ticdc/maintainer/replica"
24+
mscheduler "github.com/pingcap/ticdc/maintainer/scheduler"
2425
"github.com/pingcap/ticdc/maintainer/span"
2526
"github.com/pingcap/ticdc/maintainer/split"
2627
"github.com/pingcap/ticdc/pkg/common"
2728
appcontext "github.com/pingcap/ticdc/pkg/common/context"
2829
"github.com/pingcap/ticdc/pkg/config"
2930
"github.com/pingcap/ticdc/pkg/messaging"
3031
"github.com/pingcap/ticdc/pkg/node"
31-
"github.com/pingcap/ticdc/pkg/scheduler"
32+
pkgscheduler "github.com/pingcap/ticdc/pkg/scheduler"
3233
"github.com/pingcap/ticdc/pkg/util"
3334
"github.com/pingcap/ticdc/server/watcher"
3435
"github.com/pingcap/ticdc/utils/threadpool"
@@ -43,7 +44,7 @@ type Controller struct {
4344
bootstrapped bool
4445
startTs uint64
4546

46-
schedulerController *scheduler.Controller
47+
schedulerController *pkgscheduler.Controller
4748
operatorController *operator.Controller
4849
redoOperatorController *operator.Controller
4950
spanController *span.Controller
@@ -70,6 +71,10 @@ type Controller struct {
7071

7172
keyspaceMeta common.KeyspaceMeta
7273
enableRedo bool
74+
75+
// drainState keeps the latest dispatcher drain target visible to this
76+
// maintainer even before drain-aware schedulers are introduced.
77+
drainState *mscheduler.DrainState
7378
}
7479

7580
func NewController(changefeedID common.ChangeFeedID,
@@ -135,6 +140,7 @@ func NewController(changefeedID common.ChangeFeedID,
135140
splitter: splitter,
136141
keyspaceMeta: keyspaceMeta,
137142
enableRedo: enableRedo,
143+
drainState: mscheduler.NewDrainState(),
138144
}
139145
}
140146

@@ -261,3 +267,15 @@ func (c *Controller) GetMinRedoCheckpointTs(minCheckpointTs uint64) uint64 {
261267
minCheckpointTsForSpan := c.redoSpanController.GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs)
262268
return min(minCheckpointTsForOperator, minCheckpointTsForSpan)
263269
}
270+
271+
// SetDispatcherDrainTarget applies the newest drain target visible to this
272+
// changefeed. Older epochs are ignored so local state does not regress.
273+
func (c *Controller) SetDispatcherDrainTarget(target node.ID, epoch uint64) {
274+
c.drainState.SetDispatcherDrainTarget(target, epoch)
275+
}
276+
277+
// getDispatcherDrainTarget returns the current drain target snapshot used by
278+
// status reporting and later drain-aware schedulers.
279+
func (c *Controller) getDispatcherDrainTarget() (node.ID, uint64) {
280+
return c.drainState.DispatcherDrainTarget()
281+
}

0 commit comments

Comments
 (0)