Skip to content

Commit 88bba4d

Browse files
authored
Merge pull request #19813 from serathius/refactor-membership-apply
Simplify apply by handling membership apply through the same applyV3 stack
2 parents 1a3a5a4 + a286590 commit 88bba4d

File tree

7 files changed

+62
-83
lines changed

7 files changed

+62
-83
lines changed

server/etcdserver/apply/apply.go

+16-27
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ type Result struct {
6363
Trace *traceutil.Trace
6464
}
6565

66-
type applyFunc func(r *pb.InternalRaftRequest) *Result
66+
type applyFunc func(*pb.InternalRaftRequest, membership.ShouldApplyV3) *Result
6767

6868
// applierV3 is the interface for processing V3 raft messages
6969
type applierV3 interface {
7070
// Apply executes the generic portion of application logic for the current applier, but
7171
// delegates the actual execution to the applyFunc method.
72-
Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result
72+
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result
7373

7474
Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
7575
Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
@@ -103,6 +103,9 @@ type applierV3 interface {
103103
RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
104104
UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
105105
RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
106+
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3)
107+
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3)
108+
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3)
106109
}
107110

108111
type ApplierOptions struct {
@@ -135,8 +138,8 @@ func newApplierV3Backend(opts ApplierOptions) applierV3 {
135138
}
136139
}
137140

138-
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result {
139-
return applyFunc(r)
141+
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result {
142+
return applyFunc(r, shouldApplyV3)
140143
}
141144

142145
func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
@@ -384,39 +387,25 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList
384387
return resp, err
385388
}
386389

387-
type ApplierMembership struct {
388-
lg *zap.Logger
389-
cluster *membership.RaftCluster
390-
snapshotServer SnapshotServer
391-
}
392-
393-
func NewApplierMembership(lg *zap.Logger, cluster *membership.RaftCluster, snapshotServer SnapshotServer) *ApplierMembership {
394-
return &ApplierMembership{
395-
lg: lg,
396-
cluster: cluster,
397-
snapshotServer: snapshotServer,
398-
}
399-
}
400-
401-
func (a *ApplierMembership) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
402-
prevVersion := a.cluster.Version()
390+
func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
391+
prevVersion := a.options.Cluster.Version()
403392
newVersion := semver.Must(semver.NewVersion(r.Ver))
404-
a.cluster.SetVersion(newVersion, api.UpdateCapability, shouldApplyV3)
393+
a.options.Cluster.SetVersion(newVersion, api.UpdateCapability, shouldApplyV3)
405394
// Force snapshot after cluster version downgrade.
406395
if prevVersion != nil && newVersion.LessThan(*prevVersion) {
407-
lg := a.lg
396+
lg := a.options.Logger
408397
if lg != nil {
409398
lg.Info("Cluster version downgrade detected, forcing snapshot",
410399
zap.String("prev-cluster-version", prevVersion.String()),
411400
zap.String("new-cluster-version", newVersion.String()),
412401
)
413402
}
414-
a.snapshotServer.ForceSnapshot()
403+
a.options.SnapshotServer.ForceSnapshot()
415404
}
416405
}
417406

418-
func (a *ApplierMembership) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
419-
a.cluster.UpdateAttributes(
407+
func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
408+
a.options.Cluster.UpdateAttributes(
420409
types.ID(r.Member_ID),
421410
membership.Attributes{
422411
Name: r.MemberAttributes.Name,
@@ -426,12 +415,12 @@ func (a *ApplierMembership) ClusterMemberAttrSet(r *membershippb.ClusterMemberAt
426415
)
427416
}
428417

429-
func (a *ApplierMembership) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
418+
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
430419
d := version.DowngradeInfo{Enabled: false}
431420
if r.Enabled {
432421
d = version.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
433422
}
434-
a.cluster.SetDowngradeInfo(&d, shouldApplyV3)
423+
a.options.Cluster.SetDowngradeInfo(&d, shouldApplyV3)
435424
}
436425

437426
type quotaApplierV3 struct {

server/etcdserver/apply/apply_auth.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
2121
"go.etcd.io/etcd/pkg/v3/traceutil"
2222
"go.etcd.io/etcd/server/v3/auth"
23+
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
2324
"go.etcd.io/etcd/server/v3/etcdserver/txn"
2425
"go.etcd.io/etcd/server/v3/lease"
2526
)
@@ -40,7 +41,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a
4041
return &authApplierV3{applierV3: base, as: as, lessor: lessor}
4142
}
4243

43-
func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result {
44+
func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result {
4445
aa.mu.Lock()
4546
defer aa.mu.Unlock()
4647
if r.Header != nil {
@@ -56,7 +57,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *
5657
return &Result{Err: err}
5758
}
5859
}
59-
ret := aa.applierV3.Apply(r, applyFunc)
60+
ret := aa.applierV3.Apply(r, shouldApplyV3, applyFunc)
6061
aa.authInfo.Username = ""
6162
aa.authInfo.Revision = 0
6263
return ret

server/etcdserver/apply/apply_auth_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func dummyIndexWaiter(_ uint64) <-chan struct{} {
4444
return ch
4545
}
4646

47-
func dummyApplyFunc(_ *pb.InternalRaftRequest) *Result {
47+
func dummyApplyFunc(_ *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
4848
return &Result{}
4949
}
5050

@@ -217,7 +217,7 @@ func TestAuthApplierV3_Apply(t *testing.T) {
217217
mustCreateRolesAndEnableAuth(t, authApplier)
218218
for _, tc := range tcs {
219219
t.Run(tc.name, func(t *testing.T) {
220-
result := authApplier.Apply(tc.request, dummyApplyFunc)
220+
result := authApplier.Apply(tc.request, membership.ApplyBoth, dummyApplyFunc)
221221
require.Equalf(t, result, tc.expectResult, "Apply: got %v, expect: %v", result, tc.expectResult)
222222
})
223223
}
@@ -384,7 +384,7 @@ func TestAuthApplierV3_AdminPermission(t *testing.T) {
384384
if tc.adminPermissionNeeded {
385385
tc.request.Header = &pb.RequestHeader{Username: userReadOnly}
386386
}
387-
result := authApplier.Apply(tc.request, dummyApplyFunc)
387+
result := authApplier.Apply(tc.request, membership.ApplyBoth, dummyApplyFunc)
388388
require.Equalf(t, errors.Is(result.Err, auth.ErrPermissionDenied), tc.adminPermissionNeeded, "Admin permission needed")
389389
})
390390
}

server/etcdserver/apply/uber_applier.go

+30-4
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ import (
2121
"go.uber.org/zap"
2222

2323
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
24+
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
2425
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
2526
"go.etcd.io/etcd/server/v3/etcdserver/txn"
2627
"go.etcd.io/etcd/server/v3/storage/mvcc"
2728
)
2829

2930
type UberApplier interface {
30-
Apply(r *pb.InternalRaftRequest) *Result
31+
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result
3132
}
3233

3334
type uberApplier struct {
@@ -78,18 +79,18 @@ func (a *uberApplier) restoreAlarms() {
7879
}
7980
}
8081

81-
func (a *uberApplier) Apply(r *pb.InternalRaftRequest) *Result {
82+
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
8283
// We first execute chain of Apply() calls down the hierarchy:
8384
// (i.e. CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend),
8485
// then dispatch() unpacks the request to a specific method (like Put),
8586
// that gets executed down the hierarchy again:
8687
// i.e. CorruptApplier.Put(CappedApplier.Put(...(BackendApplier.Put(...)))).
87-
return a.applyV3.Apply(r, a.dispatch)
88+
return a.applyV3.Apply(r, shouldApplyV3, a.dispatch)
8889
}
8990

9091
// dispatch translates the request (r) into appropriate call (like Put) on
9192
// the underlying applyV3 object.
92-
func (a *uberApplier) dispatch(r *pb.InternalRaftRequest) *Result {
93+
func (a *uberApplier) dispatch(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
9394
op := "unknown"
9495
ar := &Result{}
9596
defer func(start time.Time) {
@@ -101,6 +102,31 @@ func (a *uberApplier) dispatch(r *pb.InternalRaftRequest) *Result {
101102
}
102103
}(time.Now())
103104

105+
switch {
106+
case r.ClusterVersionSet != nil:
107+
op = "ClusterVersionSet" // Implemented in 3.5.x
108+
a.applyV3.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
109+
return ar
110+
case r.ClusterMemberAttrSet != nil:
111+
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
112+
a.applyV3.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
113+
return ar
114+
case r.DowngradeInfoSet != nil:
115+
op = "DowngradeInfoSet" // Implemented in 3.5.x
116+
a.applyV3.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
117+
return ar
118+
case r.DowngradeVersionTest != nil:
119+
op = "DowngradeVersionTest" // Implemented in 3.6 for test only
120+
// do nothing, we are just to ensure etcdserver don't panic in case
121+
// users(test cases) intentionally inject DowngradeVersionTestRequest
122+
// into the WAL files.
123+
return ar
124+
default:
125+
}
126+
if !shouldApplyV3 {
127+
return nil
128+
}
129+
104130
switch {
105131
case r.Range != nil:
106132
op = "Range"

server/etcdserver/apply/uber_applier_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,13 @@ func TestUberApplier_Alarm_Corrupt(t *testing.T) {
130130
MemberID: memberID,
131131
Alarm: pb.AlarmType_CORRUPT,
132132
},
133-
})
133+
}, membership.ApplyBoth)
134134
require.NotNil(t, result)
135135
require.NoError(t, result.Err)
136136

137137
for _, tc := range tcs {
138138
t.Run(tc.name, func(t *testing.T) {
139-
result = ua.Apply(tc.request)
139+
result = ua.Apply(tc.request, membership.ApplyBoth)
140140
require.NotNil(t, result)
141141
require.Equalf(t, tc.expectError, result.Err, "Apply: got %v, expect: %v", result.Err, tc.expectError)
142142
})
@@ -232,13 +232,13 @@ func TestUberApplier_Alarm_Quota(t *testing.T) {
232232
MemberID: memberID,
233233
Alarm: pb.AlarmType_NOSPACE,
234234
},
235-
})
235+
}, membership.ApplyBoth)
236236
require.NotNil(t, result)
237237
require.NoError(t, result.Err)
238238

239239
for _, tc := range tcs {
240240
t.Run(tc.name, func(t *testing.T) {
241-
result = ua.Apply(tc.request)
241+
result = ua.Apply(tc.request, membership.ApplyBoth)
242242
require.NotNil(t, result)
243243
require.Equalf(t, tc.expectError, result.Err, "Apply: got %v, expect: %v", result.Err, tc.expectError)
244244
})
@@ -255,11 +255,11 @@ func TestUberApplier_Alarm_Deactivate(t *testing.T) {
255255
MemberID: memberID,
256256
Alarm: pb.AlarmType_NOSPACE,
257257
},
258-
})
258+
}, membership.ApplyBoth)
259259
require.NotNil(t, result)
260260
require.NoError(t, result.Err)
261261

262-
result = ua.Apply(&pb.InternalRaftRequest{Put: &pb.PutRequest{Key: []byte(key)}})
262+
result = ua.Apply(&pb.InternalRaftRequest{Put: &pb.PutRequest{Key: []byte(key)}}, membership.ApplyBoth)
263263
require.NotNil(t, result)
264264
require.Equalf(t, errors.ErrNoSpace, result.Err, "Apply: got %v, expect: %v", result.Err, errors.ErrNoSpace)
265265

@@ -270,11 +270,11 @@ func TestUberApplier_Alarm_Deactivate(t *testing.T) {
270270
MemberID: memberID,
271271
Alarm: pb.AlarmType_NOSPACE,
272272
},
273-
})
273+
}, membership.ApplyBoth)
274274
require.NotNil(t, result)
275275
require.NoError(t, result.Err)
276276

277-
result = ua.Apply(&pb.InternalRaftRequest{Put: &pb.PutRequest{Key: []byte(key)}})
277+
result = ua.Apply(&pb.InternalRaftRequest{Put: &pb.PutRequest{Key: []byte(key)}}, membership.ApplyBoth)
278278
require.NotNil(t, result)
279279
assert.NoError(t, result.Err)
280280
}

server/etcdserver/server.go

+1-38
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ import (
6262
"go.etcd.io/etcd/server/v3/etcdserver/apply"
6363
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
6464
"go.etcd.io/etcd/server/v3/etcdserver/errors"
65-
"go.etcd.io/etcd/server/v3/etcdserver/txn"
6665
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
6766
"go.etcd.io/etcd/server/v3/features"
6867
"go.etcd.io/etcd/server/v3/lease"
@@ -1985,7 +1984,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
19851984
if !needResult && raftReq.Txn != nil {
19861985
removeNeedlessRangeReqs(raftReq.Txn)
19871986
}
1988-
ar = s.applyInternalRaftRequest(&raftReq, shouldApplyV3)
1987+
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
19891988
}
19901989

19911990
// do not re-toApply applied entries.
@@ -2021,42 +2020,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
20212020
})
20222021
}
20232022

2024-
func (s *EtcdServer) applyInternalRaftRequest(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply.Result {
2025-
if r.ClusterVersionSet == nil && r.ClusterMemberAttrSet == nil && r.DowngradeInfoSet == nil && r.DowngradeVersionTest == nil {
2026-
if !shouldApplyV3 {
2027-
return nil
2028-
}
2029-
return s.uberApply.Apply(r)
2030-
}
2031-
membershipApplier := apply.NewApplierMembership(s.lg, s.cluster, s)
2032-
op := "unknown"
2033-
defer func(start time.Time) {
2034-
txn.ApplySecObserve("v3", op, true, time.Since(start))
2035-
txn.WarnOfExpensiveRequest(s.lg, s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, nil, nil)
2036-
}(time.Now())
2037-
switch {
2038-
case r.ClusterVersionSet != nil:
2039-
op = "ClusterVersionSet" // Implemented in 3.5.x
2040-
membershipApplier.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
2041-
return &apply.Result{}
2042-
case r.ClusterMemberAttrSet != nil:
2043-
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
2044-
membershipApplier.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
2045-
case r.DowngradeInfoSet != nil:
2046-
op = "DowngradeInfoSet" // Implemented in 3.5.x
2047-
membershipApplier.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
2048-
case r.DowngradeVersionTest != nil:
2049-
op = "DowngradeVersionTest" // Implemented in 3.6 for test only
2050-
// do nothing, we are just to ensure etcdserver don't panic in case
2051-
// users(test cases) intentionally inject DowngradeVersionTestRequest
2052-
// into the WAL files.
2053-
default:
2054-
s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
2055-
return nil
2056-
}
2057-
return &apply.Result{}
2058-
}
2059-
20602023
func noSideEffect(r *pb.InternalRaftRequest) bool {
20612024
return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
20622025
}

server/etcdserver/server_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func TestApplyRepeat(t *testing.T) {
149149

150150
type uberApplierMock struct{}
151151

152-
func (uberApplierMock) Apply(r *pb.InternalRaftRequest) *apply2.Result {
152+
func (uberApplierMock) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply2.Result {
153153
return &apply2.Result{}
154154
}
155155

0 commit comments

Comments
 (0)