Skip to content

Commit 83355d5

Browse files
authored
Add support of changing raft peers via HTTP api (#232)
1 parent 67e45f6 commit 83355d5

File tree

14 files changed

+236
-31
lines changed

14 files changed

+236
-31
lines changed

consts/context_key.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const (
2323
ContextKeyStore = "_context_key_storage"
2424
ContextKeyCluster = "_context_key_cluster"
2525
ContextKeyClusterShard = "_context_key_cluster_shard"
26-
ContextKeyHost = "_context_key_host"
26+
ContextKeyRaftNode = "_context_key_raft_node"
2727
)
2828

2929
const (

controller/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (c *Controller) resume(ctx context.Context) error {
104104
}
105105
for _, cluster := range clusters {
106106
c.addCluster(ns, cluster)
107+
logger.Get().Debug("Resume the cluster", zap.String("namespace", ns), zap.String("cluster", cluster))
107108
}
108109
}
109110
return nil

server/api/handler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020

2121
package api
2222

23-
import "github.com/apache/kvrocks-controller/store"
23+
import (
24+
"github.com/apache/kvrocks-controller/store"
25+
)
2426

2527
type Handler struct {
2628
Namespace *NamespaceHandler
2729
Cluster *ClusterHandler
2830
Shard *ShardHandler
2931
Node *NodeHandler
32+
Raft *RaftHandler
3033
}
3134

3235
func NewHandler(s *store.ClusterStore) *Handler {
@@ -35,5 +38,6 @@ func NewHandler(s *store.ClusterStore) *Handler {
3538
Cluster: &ClusterHandler{s: s},
3639
Shard: &ShardHandler{s: s},
3740
Node: &NodeHandler{s: s},
41+
Raft: &RaftHandler{},
3842
}
3943
}

server/api/raft.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
*/
20+
21+
package api
22+
23+
import (
24+
"errors"
25+
"fmt"
26+
"strings"
27+
28+
"github.com/apache/kvrocks-controller/consts"
29+
"github.com/apache/kvrocks-controller/logger"
30+
"github.com/apache/kvrocks-controller/server/helper"
31+
"github.com/apache/kvrocks-controller/store/engine/raft"
32+
33+
"github.com/gin-gonic/gin"
34+
"go.uber.org/zap"
35+
)
36+
37+
const (
38+
OperationAdd = "add"
39+
OperationRemove = "remove"
40+
)
41+
42+
type RaftHandler struct{}
43+
44+
type MemberRequest struct {
45+
ID uint64 `json:"id" validate:"required,gt=0"`
46+
Operation string `json:"operation" validate:"required"`
47+
Peer string `json:"peer"`
48+
}
49+
50+
func (r *MemberRequest) validate() error {
51+
r.Operation = strings.ToLower(r.Operation)
52+
if r.Operation != OperationAdd && r.Operation != OperationRemove {
53+
return fmt.Errorf("operation must be one of [%s]",
54+
strings.Join([]string{OperationAdd, OperationRemove}, ","))
55+
}
56+
if r.Operation == OperationAdd && r.Peer == "" {
57+
return fmt.Errorf("peer should NOT be empty")
58+
}
59+
return nil
60+
}
61+
62+
func (handler *RaftHandler) ListPeers(c *gin.Context) {
63+
raftNode, _ := c.MustGet(consts.ContextKeyRaftNode).(*raft.Node)
64+
helper.ResponseOK(c, gin.H{
65+
"leader": raftNode.GetRaftLead(),
66+
"peers": raftNode.ListPeers(),
67+
})
68+
}
69+
70+
func (handler *RaftHandler) UpdatePeer(c *gin.Context) {
71+
var req MemberRequest
72+
if err := c.BindJSON(&req); err != nil {
73+
helper.ResponseBadRequest(c, err)
74+
return
75+
}
76+
if err := req.validate(); err != nil {
77+
helper.ResponseBadRequest(c, err)
78+
return
79+
}
80+
81+
raftNode, _ := c.MustGet(consts.ContextKeyRaftNode).(*raft.Node)
82+
peers := raftNode.ListPeers()
83+
84+
var err error
85+
if req.Operation == OperationAdd {
86+
for _, peer := range peers {
87+
if peer == req.Peer {
88+
helper.ResponseError(c, fmt.Errorf("peer '%s' already exists", req.Peer))
89+
return
90+
}
91+
}
92+
err = raftNode.AddPeer(c, req.ID, req.Peer)
93+
} else {
94+
if _, ok := peers[req.ID]; !ok {
95+
helper.ResponseBadRequest(c, errors.New("peer not exists"))
96+
return
97+
}
98+
if len(peers) == 1 {
99+
helper.ResponseBadRequest(c, errors.New("can't remove the last peer"))
100+
return
101+
}
102+
err = raftNode.RemovePeer(c, req.ID)
103+
}
104+
if err != nil {
105+
helper.ResponseError(c, err)
106+
} else {
107+
logger.Get().With(zap.Any("request", req)).Info("Update peer success")
108+
helper.ResponseOK(c, nil)
109+
}
110+
}

server/middleware/middleware.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"strconv"
2727
"time"
2828

29+
"github.com/apache/kvrocks-controller/store/engine/raft"
2930
"github.com/gin-gonic/gin"
3031
"github.com/prometheus/client_golang/prometheus"
3132

@@ -66,7 +67,11 @@ func RedirectIfNotLeader(c *gin.Context) {
6667
c.Abort()
6768
return
6869
}
69-
if !storage.IsLeader() {
70+
71+
_, isRaftMode := storage.GetEngine().(*raft.Node)
72+
// Raft engine will forward the request to the leader node under the hood,
73+
// so we don't need to do the redirect.
74+
if !storage.IsLeader() && !isRaftMode {
7075
if !c.GetBool(consts.HeaderIsRedirect) {
7176
c.Set(consts.HeaderIsRedirect, true)
7277
peerAddr := helper.ExtractAddrFromSessionID(storage.Leader())
@@ -131,3 +136,15 @@ func RequiredClusterShard(c *gin.Context) {
131136
c.Set(consts.ContextKeyClusterShard, shard)
132137
c.Next()
133138
}
139+
140+
func RequiredRaftEngine(c *gin.Context) {
141+
storage, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore)
142+
raftNode, ok := storage.GetEngine().(*raft.Node)
143+
if !ok {
144+
helper.ResponseBadRequest(c, errors.New("raft engine is not enabled"))
145+
c.Abort()
146+
return
147+
}
148+
c.Set(consts.ContextKeyRaftNode, raftNode)
149+
c.Next()
150+
}

server/route.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ func (srv *Server) initHandlers() {
4747

4848
apiV1 := engine.Group("/api/v1/")
4949
{
50+
raftAPI := apiV1.Group("raft")
51+
{
52+
raftAPI.Use(middleware.RequiredRaftEngine)
53+
raftAPI.POST("/peers", handler.Raft.UpdatePeer)
54+
raftAPI.GET("/peers", handler.Raft.ListPeers)
55+
}
56+
5057
namespaces := apiV1.Group("namespaces")
5158
{
5259
namespaces.GET("", handler.Namespace.List)

store/engine/engine.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* under the License.
1818
*
1919
*/
20+
2021
package engine
2122

2223
import (

store/engine/raft/config.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,23 @@
2020

2121
package raft
2222

23-
import "errors"
23+
import (
24+
"errors"
25+
"strings"
26+
)
27+
28+
const (
29+
ClusterStateNew = "new"
30+
ClusterStateExisting = "existing"
31+
)
2432

2533
type Config struct {
2634
// ID is the identity of the local raft. ID cannot be 0.
2735
ID uint64 `yaml:"id"`
2836
// DataDir is the directory to store the raft data which includes snapshot and WALs.
2937
DataDir string `yaml:"data_dir"`
30-
// Join should be set to true if the node is joining an existing cluster.
31-
Join bool `yaml:"join"`
38+
// ClusterState is the state of the cluster, can be one of "new" and "existing".
39+
ClusterState string `yaml:"cluster_state"`
3240
// Peers is the list of raft peers.
3341
Peers []string `yaml:"peers"`
3442
// HeartbeatSeconds is the interval to send heartbeat message. Default is 2 seconds.
@@ -47,10 +55,15 @@ func (c *Config) validate() error {
4755
if c.ID > uint64(len(c.Peers)) {
4856
return errors.New("ID cannot be greater than the number of peers")
4957
}
58+
clusterState := strings.ToLower(c.ClusterState)
59+
if clusterState != ClusterStateNew && clusterState != ClusterStateExisting {
60+
return errors.New("cluster state must be one of [new, existing]")
61+
}
5062
return nil
5163
}
5264

5365
func (c *Config) init() {
66+
c.ClusterState = ClusterStateNew
5467
if c.DataDir == "" {
5568
c.DataDir = "."
5669
}

store/engine/raft/config_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
func TestConfig_Validate(t *testing.T) {
3030
c := &Config{}
31+
c.init()
3132

3233
// missing ID
3334
require.ErrorContains(t, c.validate(), "ID cannot be 0")
@@ -40,6 +41,12 @@ func TestConfig_Validate(t *testing.T) {
4041
// ID greater than the number of peers
4142
c.ID = 2
4243
require.ErrorContains(t, c.validate(), "ID cannot be greater than the number of peers")
44+
45+
c.ID = 1
46+
c.ClusterState = "invalid"
47+
require.ErrorContains(t, c.validate(), "cluster state must be one of [new, existing]")
48+
c.ClusterState = ClusterStateNew
49+
require.NoError(t, c.validate())
4350
}
4451

4552
func TestConfig_Init(t *testing.T) {

store/engine/raft/node.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,12 @@ func (n *Node) Addr() string {
115115
return n.addr
116116
}
117117

118-
func (n *Node) Peers() []string {
119-
peers := make([]string, 0)
118+
func (n *Node) ListPeers() map[uint64]string {
119+
peers := make(map[uint64]string)
120120
n.peers.Range(func(key, value interface{}) bool {
121+
id, _ := key.(uint64)
121122
peer, _ := value.(string)
122-
peers = append(peers, peer)
123+
peers[id] = peer
123124
return true
124125
})
125126
return peers
@@ -165,7 +166,7 @@ func (n *Node) run() error {
165166
n.snapshotIndex = snapshot.Metadata.Index
166167
n.confState = snapshot.Metadata.ConfState
167168

168-
if n.config.Join || walExists {
169+
if n.config.ClusterState == ClusterStateExisting || walExists {
169170
n.raftNode = raft.RestartNode(raftConfig)
170171
} else {
171172
n.raftNode = raft.StartNode(raftConfig, peers)
@@ -174,6 +175,7 @@ func (n *Node) run() error {
174175
if err := n.runTransport(); err != nil {
175176
return err
176177
}
178+
n.watchLeaderChange()
177179
return n.runRaftMessages()
178180
}
179181

@@ -224,10 +226,33 @@ func (n *Node) runTransport() error {
224226
return nil
225227
}
226228

229+
func (n *Node) watchLeaderChange() {
230+
n.wg.Add(1)
231+
go func() {
232+
defer n.wg.Done()
233+
234+
ticker := time.NewTicker(time.Second)
235+
defer ticker.Stop()
236+
for {
237+
select {
238+
case <-n.shutdown:
239+
return
240+
case <-ticker.C:
241+
lead := n.GetRaftLead()
242+
if lead != n.leader {
243+
n.leader = lead
244+
n.leaderChanged <- true
245+
n.logger.Info("Found leader changed", zap.Uint64("leader", lead))
246+
}
247+
}
248+
}
249+
}()
250+
}
251+
227252
func (n *Node) runRaftMessages() error {
228253
n.wg.Add(1)
229254
go func() {
230-
ticker := time.NewTicker(100 * time.Millisecond)
255+
ticker := time.NewTicker(time.Second)
231256
defer func() {
232257
ticker.Stop()
233258
n.wg.Done()
@@ -252,9 +277,7 @@ func (n *Node) runRaftMessages() error {
252277
if err := n.applySnapshot(rd.Snapshot); err != nil {
253278
n.logger.Error("Failed to apply snapshot", zap.Error(err))
254279
}
255-
if len(rd.Entries) > 0 {
256-
_ = n.dataStore.raftStorage.Append(rd.Entries)
257-
}
280+
_ = n.dataStore.raftStorage.Append(rd.Entries)
258281

259282
for _, msg := range rd.Messages {
260283
if msg.Type == raftpb.MsgApp {
@@ -465,13 +488,14 @@ func (n *Node) applyEntry(entry raftpb.Entry) error {
465488
n.peers.Store(cc.NodeID, string(cc.Context))
466489
}
467490
case raftpb.ConfChangeRemoveNode:
468-
n.peers.Delete(cc.NodeID)
469-
n.transport.RemovePeer(types.ID(cc.NodeID))
470491
if cc.NodeID == n.config.ID {
492+
n.logger.Info("I have been removed from the cluster, will shutdown")
471493
n.Close()
472-
n.logger.Info("Node removed from the cluster")
473494
return nil
474495
}
496+
n.transport.RemovePeer(types.ID(cc.NodeID))
497+
n.peers.Delete(cc.NodeID)
498+
n.logger.Info("Remove the peer", zap.Uint64("node_id", cc.NodeID))
475499
case raftpb.ConfChangeUpdateNode:
476500
n.transport.UpdatePeer(types.ID(cc.NodeID), []string{string(cc.Context)})
477501
if _, ok := n.peers.Load(cc.NodeID); ok {

0 commit comments

Comments
 (0)