-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathmaintainer_controller.go
More file actions
281 lines (250 loc) · 11.4 KB
/
maintainer_controller.go
File metadata and controls
281 lines (250 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package maintainer
import (
"sync"
"time"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/maintainer/operator"
"github.com/pingcap/ticdc/maintainer/replica"
mscheduler "github.com/pingcap/ticdc/maintainer/scheduler"
"github.com/pingcap/ticdc/maintainer/span"
"github.com/pingcap/ticdc/maintainer/split"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
pkgscheduler "github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils/threadpool"
"go.uber.org/zap"
)
// Controller schedules and balance tables
// there are 3 main components in the controller, scheduler, span controller and operator controller
type Controller struct {
// bootstrapped set to true after initialize all necessary resources,
// it's not affected by new node join the cluster.
bootstrapped bool
startTs uint64
schedulerController *pkgscheduler.Controller
operatorController *operator.Controller
redoOperatorController *operator.Controller
spanController *span.Controller
redoSpanController *span.Controller
barrier *Barrier
redoBarrier *Barrier
messageCenter messaging.MessageCenter
nodeManager *watcher.NodeManager
splitter *split.Splitter
replicaConfig *config.ReplicaConfig
changefeedID common.ChangeFeedID
taskPool threadpool.ThreadPool
// Store the task handles, it's used to stop the task handlers when the controller is stopped.
taskHandles []*threadpool.TaskHandle
taskHandlesMu sync.RWMutex
enableTableAcrossNodes bool
batchSize int
keyspaceMeta common.KeyspaceMeta
enableRedo bool
// drainState keeps the latest dispatcher drain target visible to this
// maintainer even before drain-aware schedulers are introduced.
drainState *mscheduler.DrainState
}
func NewController(changefeedID common.ChangeFeedID,
checkpointTs uint64,
taskPool threadpool.ThreadPool,
replicaConfig *config.ReplicaConfig,
ddlSpan, redoDDLSpan *replica.SpanReplication,
batchSize int, balanceInterval time.Duration,
refresher *replica.RegionCountRefresher,
keyspaceMeta common.KeyspaceMeta,
enableRedo bool,
) *Controller {
mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
var (
enableTableAcrossNodes bool
splitter *split.Splitter
)
if replicaConfig != nil && util.GetOrZero(replicaConfig.Scheduler.EnableTableAcrossNodes) {
enableTableAcrossNodes = true
splitter = split.NewSplitter(keyspaceMeta.ID, changefeedID, replicaConfig.Scheduler)
}
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
// Create span controller
var schedulerCfg *config.ChangefeedSchedulerConfig
if replicaConfig != nil {
schedulerCfg = replicaConfig.Scheduler
}
spanController := span.NewController(changefeedID, ddlSpan, splitter, schedulerCfg, refresher, keyspaceMeta.ID, common.DefaultMode)
var (
redoSpanController *span.Controller
redoOC *operator.Controller
)
if enableRedo {
redoSpanController = span.NewController(changefeedID, redoDDLSpan, splitter, schedulerCfg, refresher, keyspaceMeta.ID, common.RedoMode)
redoOC = operator.NewOperatorController(changefeedID, redoSpanController, batchSize, common.RedoMode)
}
// Create operator controller using spanController
oc := operator.NewOperatorController(changefeedID, spanController, batchSize, common.DefaultMode)
sc := NewScheduleController(
changefeedID, batchSize, oc, redoOC, spanController, redoSpanController, balanceInterval, splitter, schedulerCfg,
)
return &Controller{
startTs: checkpointTs,
changefeedID: changefeedID,
bootstrapped: false,
schedulerController: sc,
operatorController: oc,
redoOperatorController: redoOC,
spanController: spanController,
redoSpanController: redoSpanController,
messageCenter: mc,
nodeManager: nodeManager,
taskPool: taskPool,
replicaConfig: replicaConfig,
enableTableAcrossNodes: enableTableAcrossNodes,
batchSize: batchSize,
splitter: splitter,
keyspaceMeta: keyspaceMeta,
enableRedo: enableRedo,
drainState: mscheduler.NewDrainState(),
}
}
// HandleStatus handle the status report from the node
func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
// HandleStatus reconciles runtime dispatcher reports with maintainer-side state.
//
// In the steady state, spanController (desired tasks), operatorController (in-flight scheduling),
// and dispatchers (actual runtime) agree. During failover / DDL / in-flight operators however,
// we can observe temporarily inconsistent combinations, for example:
// - dispatcher reports Working but maintainer has no task (orphan dispatcher, usually after failover).
// - dispatcher reports Stopped/Removed but maintainer has no operator (operator state lost on failover).
//
// The rules below make the system converge:
// 1) Orphan Working dispatcher without an operator => actively remove it to avoid leaks.
// 2) Non-working dispatcher without an operator => mark the span absent so scheduler can recreate it.
for _, status := range statusList {
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
operatorController := c.getOperatorController(status.Mode)
spanController := c.getSpanController(status.Mode)
operatorController.UpdateOperatorStatus(dispatcherID, from, status)
stm := spanController.GetTaskByID(dispatcherID)
if stm == nil {
// If maintainer doesn't know this dispatcherID, most statuses are late/outdated and can be ignored.
// We only need to act when the runtime says the dispatcher is Working, because that implies there's
// still an active dispatcher consuming resources and potentially producing output.
if status.ComponentStatus != heartbeatpb.ComponentState_Working {
continue
}
if op := operatorController.GetOperator(dispatcherID); op == nil {
// No task + no operator => the dispatcher is orphaned (e.g. previous maintainer crashed after creating it,
// or lost operator state during failover). Remove it to avoid leaks and duplicated outputs.
log.Warn("no span found, remove it",
zap.String("changefeed", c.changefeedID.Name()),
zap.String("from", from.String()),
zap.Any("status", status),
zap.String("dispatcherID", dispatcherID.String()))
// If the span is not found but status is Working, we need to remove it from dispatcher.
_ = c.messageCenter.SendCommand(replica.NewRemoveDispatcherMessage(from, c.changefeedID, status.ID, nil, status.Mode, heartbeatpb.OperatorType_O_Remove))
}
continue
}
nodeID := stm.GetNodeID()
if nodeID != from {
// todo: handle the case that the nodeID is mismatch
log.Warn("nodeID not match",
zap.String("changefeed", c.changefeedID.Name()),
zap.Any("from", from),
zap.Stringer("node", nodeID))
continue
}
spanController.UpdateStatus(stm, status)
// Fallback: dispatcher becomes non-working without an operator.
//
// In normal scheduling flow, a dispatcher should transition to Stopped/Removed as part of a maintainer
// operator (Remove/Move/Split...). However, after maintainer failover we can lose operatorController state
// while dispatcher managers keep executing the already-issued requests.
//
// A real example is a "remove request in transit" during bootstrap:
// - Old maintainer sends a Remove (e.g. the remove-origin phase of Move), but the request hasn't reached
// dispatcher manager yet.
// - New maintainer bootstraps from dispatcher manager snapshots and sees the dispatcher as Working, with
// no in-flight operator reported in bootstrap response.
// - After bootstrap, the in-transit Remove arrives, the dispatcher is removed, and the new maintainer
// observes a terminal status without a corresponding operator.
//
// In these cases we'd observe a non-working status but have no operator to drive the follow-up
// rescheduling, so we mark the span absent to let the scheduler recreate it.
//
// Safety against message reordering/resend:
// - We only reach here when stm != nil and stm.GetNodeID() == from (checked above). If the span was already
// rebound to a different node, we skip it, so late statuses from the old node won't trigger rescheduling.
// - MarkSpanAbsent is idempotent and only affects the scheduler state, so even if we get duplicate terminal
// statuses, the worst case is an extra no-op absent mark.
if status.ComponentStatus == heartbeatpb.ComponentState_Stopped ||
status.ComponentStatus == heartbeatpb.ComponentState_Removed {
if op := operatorController.GetOperator(dispatcherID); op == nil {
log.Warn("dispatcher becomes non-working without operator, mark span absent for rescheduling",
zap.String("changefeed", c.changefeedID.Name()),
zap.String("from", from.String()),
zap.String("dispatcherID", dispatcherID.String()),
zap.Any("status", status))
spanController.MarkSpanAbsent(stm)
}
}
}
}
func (c *Controller) GetMinCheckpointTs(minCheckpointTs uint64) uint64 {
minCheckpointTsForOperator := c.operatorController.GetMinCheckpointTs(minCheckpointTs)
minCheckpointTsForSpan := c.spanController.GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs)
return min(minCheckpointTsForOperator, minCheckpointTsForSpan)
}
func (c *Controller) Stop() {
c.taskHandlesMu.RLock()
for _, handler := range c.taskHandles {
handler.Cancel()
}
c.taskHandlesMu.RUnlock()
c.operatorController.Close()
if c.enableRedo {
c.redoOperatorController.Close()
}
}
func (c *Controller) GetKeyspaceID() uint32 {
return c.keyspaceMeta.ID
}
// RemoveNode is called when a node is removed
func (c *Controller) RemoveNode(id node.ID) {
if c.enableRedo {
c.redoOperatorController.OnNodeRemoved(id)
}
c.operatorController.OnNodeRemoved(id)
}
func (c *Controller) GetMinRedoCheckpointTs(minCheckpointTs uint64) uint64 {
minCheckpointTsForOperator := c.redoOperatorController.GetMinCheckpointTs(minCheckpointTs)
minCheckpointTsForSpan := c.redoSpanController.GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs)
return min(minCheckpointTsForOperator, minCheckpointTsForSpan)
}
// SetDispatcherDrainTarget applies the newest drain target visible to this
// changefeed. Older epochs are ignored so local state does not regress.
func (c *Controller) SetDispatcherDrainTarget(target node.ID, epoch uint64) {
c.drainState.SetDispatcherDrainTarget(target, epoch)
}
// getDispatcherDrainTarget returns the current drain target snapshot used by
// status reporting and later drain-aware schedulers.
func (c *Controller) getDispatcherDrainTarget() (node.ID, uint64) {
return c.drainState.DispatcherDrainTarget()
}