Skip to content

Commit 524f2de

Browse files
[FIXED] Disallow concurrent meta member changes
Signed-off-by: Maurice van Veen <[email protected]>
1 parent f0772ac commit 524f2de

File tree

4 files changed

+48
-2
lines changed

4 files changed

+48
-2
lines changed

server/errors.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,5 +1998,15 @@
19981998
"help": "",
19991999
"url": "",
20002000
"deprecates": ""
2001+
},
2002+
{
2003+
"constant": "JSClusterServerMemberChangeInflightErr",
2004+
"code": 400,
2005+
"error_code": 10202,
2006+
"description": "cluster member change is in progress",
2007+
"comment": "",
2008+
"help": "",
2009+
"url": "",
2010+
"deprecates": ""
20012011
}
20022012
]

server/jetstream_api.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2422,6 +2422,13 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac
24222422
return
24232423
}
24242424

2425+
// Another peer-remove is already in progress, don't allow multiple concurrent changes.
2426+
if cc.peerRemoveReply != nil {
2427+
resp.Error = NewJSClusterServerMemberChangeInflightError()
2428+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2429+
return
2430+
}
2431+
24252432
// So we have a valid peer.
24262433
if err := meta.ProposeRemovePeer(found); err == nil {
24272434
if cc.peerRemoveReply == nil {

server/jetstream_cluster_1_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11127,7 +11127,8 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1112711127
}
1112811128

1112911129
// Shutdown a majority.
11130-
remove := s3.Name()
11130+
remove1 := s3.Name()
11131+
remove2 := s2.Name()
1113111132
s1.Shutdown()
1113211133
s2.Shutdown()
1113311134
s3.Shutdown()
@@ -11138,13 +11139,14 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1113811139
defer sub.Drain()
1113911140

1114011141
// Since a majority is down, we'll expect to time out since there's no quorum.
11141-
req := &JSApiMetaServerRemoveRequest{Server: remove}
11142+
req := &JSApiMetaServerRemoveRequest{Server: remove1}
1114211143
jsreq, err := json.Marshal(req)
1114311144
require_NoError(t, err)
1114411145
require_NoError(t, nc.PublishRequest(JSApiRemoveServer, reply, jsreq))
1114511146
_, err = sub.NextMsg(time.Second)
1114611147
require_Error(t, err, nats.ErrTimeout)
1114711148

11149+
// Retrying the request should fail, as the leader has already removed it as its peer.
1114811150
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, time.Second)
1114911151
require_NoError(t, err)
1115011152
var resp JSApiMetaServerRemoveResponse
@@ -11154,6 +11156,19 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1115411156
}
1115511157
require_Error(t, resp.Error, NewJSClusterServerNotMemberError())
1115611158

11159+
// Don't allow concurrent meta membership changes.
11160+
req = &JSApiMetaServerRemoveRequest{Server: remove2}
11161+
jsreq, err = json.Marshal(req)
11162+
require_NoError(t, err)
11163+
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, time.Second)
11164+
require_NoError(t, err)
11165+
resp = JSApiMetaServerRemoveResponse{}
11166+
require_NoError(t, json.Unmarshal(rmsg.Data, &resp))
11167+
if resp.Error == nil {
11168+
t.Fatalf("Expected an error, got none")
11169+
}
11170+
require_Error(t, resp.Error, NewJSClusterServerMemberChangeInflightError())
11171+
1115711172
// Bring back one server so that the peer-remove can get quorum now.
1115811173
// The response should come in shortly after.
1115911174
c.restartServer(s1)

server/jetstream_errors_generated.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ const (
5959
// JSClusterRequiredErr JetStream clustering support required
6060
JSClusterRequiredErr ErrorIdentifier = 10010
6161

62+
// JSClusterServerMemberChangeInflightErr cluster member change is in progress
63+
JSClusterServerMemberChangeInflightErr ErrorIdentifier = 10202
64+
6265
// JSClusterServerNotMemberErr server is not a member of the cluster
6366
JSClusterServerNotMemberErr ErrorIdentifier = 10044
6467

@@ -626,6 +629,7 @@ var (
626629
JSClusterNotLeaderErr: {Code: 500, ErrCode: 10009, Description: "JetStream cluster can not handle request"},
627630
JSClusterPeerNotMemberErr: {Code: 400, ErrCode: 10040, Description: "peer not a member"},
628631
JSClusterRequiredErr: {Code: 503, ErrCode: 10010, Description: "JetStream clustering support required"},
632+
JSClusterServerMemberChangeInflightErr: {Code: 400, ErrCode: 10202, Description: "cluster member change is in progress"},
629633
JSClusterServerNotMemberErr: {Code: 400, ErrCode: 10044, Description: "server is not a member of the cluster"},
630634
JSClusterTagsErr: {Code: 400, ErrCode: 10011, Description: "tags placement not supported for operation"},
631635
JSClusterUnSupportFeatureErr: {Code: 503, ErrCode: 10036, Description: "not currently supported in clustered mode"},
@@ -1031,6 +1035,16 @@ func NewJSClusterRequiredError(opts ...ErrorOption) *ApiError {
10311035
return ApiErrors[JSClusterRequiredErr]
10321036
}
10331037

1038+
// NewJSClusterServerMemberChangeInflightError creates a new JSClusterServerMemberChangeInflightErr error: "cluster member change is in progress"
1039+
func NewJSClusterServerMemberChangeInflightError(opts ...ErrorOption) *ApiError {
1040+
eopts := parseOpts(opts)
1041+
if ae, ok := eopts.err.(*ApiError); ok {
1042+
return ae
1043+
}
1044+
1045+
return ApiErrors[JSClusterServerMemberChangeInflightErr]
1046+
}
1047+
10341048
// NewJSClusterServerNotMemberError creates a new JSClusterServerNotMemberErr error: "server is not a member of the cluster"
10351049
func NewJSClusterServerNotMemberError(opts ...ErrorOption) *ApiError {
10361050
eopts := parseOpts(opts)

0 commit comments

Comments
 (0)