Skip to content

Commit 058f883

Browse files
[IMPROVED] Meta peer-remove JS audit success response is not error
Signed-off-by: Maurice van Veen <[email protected]>
1 parent e99e528 commit 058f883

File tree

3 files changed

+70
-8
lines changed

3 files changed

+70
-8
lines changed

server/jetstream_api.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2432,9 +2432,10 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac
24322432
// So we have a valid peer.
24332433
if err := meta.ProposeRemovePeer(found); err == nil {
24342434
if cc.peerRemoveReply == nil {
2435-
cc.peerRemoveReply = make(map[string]string, 1)
2435+
cc.peerRemoveReply = make(map[string]peerRemoveInfo, 1)
24362436
}
2437-
cc.peerRemoveReply[found] = reply
2437+
// Only copy the request, the subject and reply are already copied.
2438+
cc.peerRemoveReply[found] = peerRemoveInfo{ci: ci, subject: subject, reply: reply, request: string(msg)}
24382439
}
24392440
}
24402441

server/jetstream_cluster.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type jetStreamCluster struct {
5353
inflight map[string]map[string]*inflightInfo
5454
// Holds a map of a peer ID to the reply subject, to only respond after gaining
5555
// quorum on the peer-remove action.
56-
peerRemoveReply map[string]string
56+
peerRemoveReply map[string]peerRemoveInfo
5757
// Signals meta-leader should check the stream assignments.
5858
streamsCheck bool
5959
// Server.
@@ -87,6 +87,14 @@ type inflightInfo struct {
8787
cfg *StreamConfig
8888
}
8989

90+
// Used to track inflight peer-remove info to respond 'success' after quorum.
91+
type peerRemoveInfo struct {
92+
ci *ClientInfo
93+
subject string
94+
reply string
95+
request string
96+
}
97+
9098
// Used to guide placement of streams and meta controllers in clustered JetStream.
9199
type Placement struct {
92100
Cluster string `json:"cluster,omitempty"`
@@ -2116,12 +2124,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
21162124
s := js.srv
21172125
if s.JetStreamIsLeader() {
21182126
var (
2119-
reply string
2120-
ok bool
2127+
info peerRemoveInfo
2128+
ok bool
21212129
)
21222130
js.mu.Lock()
21232131
if cc := js.cluster; cc != nil && cc.peerRemoveReply != nil {
2124-
if reply, ok = cc.peerRemoveReply[peer]; ok {
2132+
if info, ok = cc.peerRemoveReply[peer]; ok {
21252133
delete(cc.peerRemoveReply, peer)
21262134
}
21272135
if len(cc.peerRemoveReply) == 0 {
@@ -2130,10 +2138,11 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
21302138
}
21312139
js.mu.Unlock()
21322140

2133-
if reply != _EMPTY_ {
2141+
if info.reply != _EMPTY_ {
2142+
sysAcc := s.SystemAccount()
21342143
var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
21352144
resp.Success = true
2136-
s.sendInternalAccountMsg(nil, reply, s.jsonResponse(&resp))
2145+
s.sendAPIResponse(info.ci, sysAcc, info.subject, info.reply, info.request, s.jsonResponse(&resp))
21372146
}
21382147
}
21392148
}

server/jetstream_cluster_1_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11138,6 +11138,10 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1113811138
require_NoError(t, err)
1113911139
defer sub.Drain()
1114011140

11141+
auditSub, err := nc.SubscribeSync(JSAuditAdvisory)
11142+
require_NoError(t, err)
11143+
defer auditSub.Drain()
11144+
1114111145
// Since a majority is down, we'll expect to time out since there's no quorum.
1114211146
req := &JSApiMetaServerRemoveRequest{Server: remove1}
1114311147
jsreq, err := json.Marshal(req)
@@ -11156,6 +11160,18 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1115611160
}
1115711161
require_Error(t, resp.Error, NewJSClusterServerMemberChangeInflightError())
1115811162

11163+
// Audit should reflect the same.
11164+
rmsg, err = auditSub.NextMsg(time.Second)
11165+
require_NoError(t, err)
11166+
var auditResp JSAPIAudit
11167+
require_NoError(t, json.Unmarshal(rmsg.Data, &auditResp))
11168+
resp = JSApiMetaServerRemoveResponse{}
11169+
require_NoError(t, json.Unmarshal([]byte(auditResp.Response), &resp))
11170+
if resp.Error == nil {
11171+
t.Fatalf("Expected an error, got none")
11172+
}
11173+
require_Error(t, resp.Error, NewJSClusterServerMemberChangeInflightError())
11174+
1115911175
// Don't allow concurrent meta membership changes.
1116011176
req = &JSApiMetaServerRemoveRequest{Server: remove2}
1116111177
jsreq, err = json.Marshal(req)
@@ -11169,6 +11185,18 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1116911185
}
1117011186
require_Error(t, resp.Error, NewJSClusterServerMemberChangeInflightError())
1117111187

11188+
// Audit should reflect the same.
11189+
rmsg, err = auditSub.NextMsg(time.Second)
11190+
require_NoError(t, err)
11191+
auditResp = JSAPIAudit{}
11192+
require_NoError(t, json.Unmarshal(rmsg.Data, &auditResp))
11193+
resp = JSApiMetaServerRemoveResponse{}
11194+
require_NoError(t, json.Unmarshal([]byte(auditResp.Response), &resp))
11195+
if resp.Error == nil {
11196+
t.Fatalf("Expected an error, got none")
11197+
}
11198+
require_Error(t, resp.Error, NewJSClusterServerMemberChangeInflightError())
11199+
1117211200
// Bring back one server so that the peer-remove can get quorum now.
1117311201
// The response should come in shortly after.
1117411202
c.restartServer(s1)
@@ -11185,6 +11213,18 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1118511213
}
1118611214
require_True(t, resp.Success)
1118711215

11216+
// Audit should reflect the same.
11217+
rmsg, err = auditSub.NextMsg(time.Second)
11218+
require_NoError(t, err)
11219+
auditResp = JSAPIAudit{}
11220+
require_NoError(t, json.Unmarshal(rmsg.Data, &auditResp))
11221+
resp = JSApiMetaServerRemoveResponse{}
11222+
require_NoError(t, json.Unmarshal([]byte(auditResp.Response), &resp))
11223+
if resp.Error != nil {
11224+
require_NoError(t, resp.Error)
11225+
}
11226+
require_True(t, resp.Success)
11227+
1118811228
// A retry of the first peer-remove should return an error that the peer is already removed.
1118911229
// Both a "success" response and this error should be used to know the peer-remove was successful.
1119011230
req = &JSApiMetaServerRemoveRequest{Server: remove1}
@@ -11198,6 +11238,18 @@ func TestJetStreamClusterMetaPeerRemoveResponseAfterQuorum(t *testing.T) {
1119811238
t.Fatalf("Expected an error, got none")
1119911239
}
1120011240
require_Error(t, resp.Error, NewJSClusterServerNotMemberError())
11241+
11242+
// Audit should reflect the same.
11243+
rmsg, err = auditSub.NextMsg(time.Second)
11244+
require_NoError(t, err)
11245+
auditResp = JSAPIAudit{}
11246+
require_NoError(t, json.Unmarshal(rmsg.Data, &auditResp))
11247+
resp = JSApiMetaServerRemoveResponse{}
11248+
require_NoError(t, json.Unmarshal([]byte(auditResp.Response), &resp))
11249+
if resp.Error == nil {
11250+
t.Fatalf("Expected an error, got none")
11251+
}
11252+
require_Error(t, resp.Error, NewJSClusterServerNotMemberError())
1120111253
}
1120211254

1120311255
//

0 commit comments

Comments
 (0)