Skip to content

Commit e99e528

Browse files
[FIXED] Disallow concurrent meta member changes
Signed-off-by: Maurice van Veen <[email protected]>
1 parent f0772ac commit e99e528

File tree

4 files changed

+63
-3
lines changed

4 files changed

+63
-3
lines changed

server/errors.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,5 +1998,15 @@
19981998
"help": "",
19991999
"url": "",
20002000
"deprecates": ""
2001+
},
2002+
{
2003+
"constant": "JSClusterServerMemberChangeInflightErr",
2004+
"code": 400,
2005+
"error_code": 10202,
2006+
"description": "cluster member change is in progress",
2007+
"comment": "",
2008+
"help": "",
2009+
"url": "",
2010+
"deprecates": ""
20012011
}
20022012
]

server/jetstream_api.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2399,6 +2399,13 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac
23992399
js.mu.Lock()
24002400
defer js.mu.Unlock()
24012401

2402+
// Another peer-remove is already in progress, don't allow multiple concurrent changes.
2403+
if cc.peerRemoveReply != nil {
2404+
resp.Error = NewJSClusterServerMemberChangeInflightError()
2405+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2406+
return
2407+
}
2408+
24022409
var found string
24032410
for _, p := range meta.Peers() {
24042411
// If Peer is specified, it takes precedence

server/jetstream_cluster_1_test.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11127,7 +11127,8 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1112711127
}
1112811128

1112911129
// Shutdown a majority.
11130-
remove := s3.Name()
11130+
remove1 := s3.Name()
11131+
remove2 := s2.Name()
1113111132
s1.Shutdown()
1113211133
s2.Shutdown()
1113311134
s3.Shutdown()
@@ -11138,21 +11139,35 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1113811139
defer sub.Drain()
1113911140

1114011141
// Since a majority is down, we'll expect to time out since there's no quorum.
11141-
req := &JSApiMetaServerRemoveRequest{Server: remove}
11142+
req := &JSApiMetaServerRemoveRequest{Server: remove1}
1114211143
jsreq, err := json.Marshal(req)
1114311144
require_NoError(t, err)
1114411145
require_NoError(t, nc.PublishRequest(JSApiRemoveServer, reply, jsreq))
1114511146
_, err = sub.NextMsg(time.Second)
1114611147
require_Error(t, err, nats.ErrTimeout)
1114711148

11149+
// Retrying the request should fail, as the leader has already removed it as its peer.
1114811150
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, time.Second)
1114911151
require_NoError(t, err)
1115011152
var resp JSApiMetaServerRemoveResponse
1115111153
require_NoError(t, json.Unmarshal(rmsg.Data, &resp))
1115211154
if resp.Error == nil {
1115311155
t.Fatalf("Expected an error, got none")
1115411156
}
11155-
require_Error(t, resp.Error, NewJSClusterServerNotMemberError())
11157+
require_Error(t, resp.Error, NewJSClusterServerMemberChangeInflightError())
11158+
11159+
// Don't allow concurrent meta membership changes.
11160+
req = &JSApiMetaServerRemoveRequest{Server: remove2}
11161+
jsreq, err = json.Marshal(req)
11162+
require_NoError(t, err)
11163+
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, time.Second)
11164+
require_NoError(t, err)
11165+
resp = JSApiMetaServerRemoveResponse{}
11166+
require_NoError(t, json.Unmarshal(rmsg.Data, &resp))
11167+
if resp.Error == nil {
11168+
t.Fatalf("Expected an error, got none")
11169+
}
11170+
require_Error(t, resp.Error, NewJSClusterServerMemberChangeInflightError())
1115611171

1115711172
// Bring back one server so that the peer-remove can get quorum now.
1115811173
// The response should come in shortly after.
@@ -11169,6 +11184,20 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1116911184
require_NoError(t, resp.Error)
1117011185
}
1117111186
require_True(t, resp.Success)
11187+
11188+
// A retry of the first peer-remove should return an error that the peer is already removed.
11189+
// Both a "success" response and this error should be used to know the peer-remove was successful.
11190+
req = &JSApiMetaServerRemoveRequest{Server: remove1}
11191+
jsreq, err = json.Marshal(req)
11192+
require_NoError(t, err)
11193+
rmsg, err = nc.Request(JSApiRemoveServer, jsreq, time.Second)
11194+
require_NoError(t, err)
11195+
resp = JSApiMetaServerRemoveResponse{}
11196+
require_NoError(t, json.Unmarshal(rmsg.Data, &resp))
11197+
if resp.Error == nil {
11198+
t.Fatalf("Expected an error, got none")
11199+
}
11200+
require_Error(t, resp.Error, NewJSClusterServerNotMemberError())
1117211201
}
1117311202

1117411203
//

server/jetstream_errors_generated.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ const (
5959
// JSClusterRequiredErr JetStream clustering support required
6060
JSClusterRequiredErr ErrorIdentifier = 10010
6161

62+
// JSClusterServerMemberChangeInflightErr cluster member change is in progress
63+
JSClusterServerMemberChangeInflightErr ErrorIdentifier = 10202
64+
6265
// JSClusterServerNotMemberErr server is not a member of the cluster
6366
JSClusterServerNotMemberErr ErrorIdentifier = 10044
6467

@@ -626,6 +629,7 @@ var (
626629
JSClusterNotLeaderErr: {Code: 500, ErrCode: 10009, Description: "JetStream cluster can not handle request"},
627630
JSClusterPeerNotMemberErr: {Code: 400, ErrCode: 10040, Description: "peer not a member"},
628631
JSClusterRequiredErr: {Code: 503, ErrCode: 10010, Description: "JetStream clustering support required"},
632+
JSClusterServerMemberChangeInflightErr: {Code: 400, ErrCode: 10202, Description: "cluster member change is in progress"},
629633
JSClusterServerNotMemberErr: {Code: 400, ErrCode: 10044, Description: "server is not a member of the cluster"},
630634
JSClusterTagsErr: {Code: 400, ErrCode: 10011, Description: "tags placement not supported for operation"},
631635
JSClusterUnSupportFeatureErr: {Code: 503, ErrCode: 10036, Description: "not currently supported in clustered mode"},
@@ -1031,6 +1035,16 @@ func NewJSClusterRequiredError(opts ...ErrorOption) *ApiError {
10311035
return ApiErrors[JSClusterRequiredErr]
10321036
}
10331037

1038+
// NewJSClusterServerMemberChangeInflightError creates a new JSClusterServerMemberChangeInflightErr error: "cluster member change is in progress"
1039+
func NewJSClusterServerMemberChangeInflightError(opts ...ErrorOption) *ApiError {
1040+
eopts := parseOpts(opts)
1041+
if ae, ok := eopts.err.(*ApiError); ok {
1042+
return ae
1043+
}
1044+
1045+
return ApiErrors[JSClusterServerMemberChangeInflightErr]
1046+
}
1047+
10341048
// NewJSClusterServerNotMemberError creates a new JSClusterServerNotMemberErr error: "server is not a member of the cluster"
10351049
func NewJSClusterServerNotMemberError(opts ...ErrorOption) *ApiError {
10361050
eopts := parseOpts(opts)

0 commit comments

Comments
 (0)