Skip to content

Commit 935f0c0

Browse files
authored
Merge pull request #19471 from fuweid/backport-DowngradeInfo-3.6
[3.6] Backport DowngradeInfo proto change
2 parents 0f89474 + a743e47 commit 935f0c0

File tree

11 files changed

+776
-357
lines changed

11 files changed

+776
-357
lines changed

Documentation/dev-guide/apispec/swagger/rpc.swagger.json

+17
Original file line numberDiff line numberDiff line change
@@ -2135,6 +2135,19 @@
21352135
}
21362136
}
21372137
},
2138+
"etcdserverpbDowngradeInfo": {
2139+
"type": "object",
2140+
"properties": {
2141+
"enabled": {
2142+
"type": "boolean",
2143+
"description": "enabled indicates whether the cluster is enabled to downgrade."
2144+
},
2145+
"targetVersion": {
2146+
"type": "string",
2147+
"description": "targetVersion is the target downgrade version."
2148+
}
2149+
}
2150+
},
21382151
"etcdserverpbDowngradeRequest": {
21392152
"type": "object",
21402153
"properties": {
@@ -2840,6 +2853,10 @@
28402853
"type": "string",
28412854
"format": "int64",
28422855
"title": "dbSizeQuota is the configured etcd storage quota in bytes (the value passed to etcd instance by flag --quota-backend-bytes)"
2856+
},
2857+
"downgradeInfo": {
2858+
"$ref": "#/definitions/etcdserverpbDowngradeInfo",
2859+
"description": "downgradeInfo indicates if there is downgrade process."
28432860
}
28442861
}
28452862
},

api/etcdserverpb/rpc.pb.go

+610-323
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/etcdserverpb/rpc.proto

+9
Original file line numberDiff line numberDiff line change
@@ -1198,6 +1198,15 @@ message StatusResponse {
11981198
string storageVersion = 11 [(versionpb.etcd_version_field)="3.6"];
11991199
// dbSizeQuota is the configured etcd storage quota in bytes (the value passed to etcd instance by flag --quota-backend-bytes)
12001200
int64 dbSizeQuota = 12 [(versionpb.etcd_version_field)="3.6"];
1201+
// downgradeInfo indicates if there is downgrade process.
1202+
DowngradeInfo downgradeInfo = 13 [(versionpb.etcd_version_field)="3.6"];
1203+
}
1204+
1205+
message DowngradeInfo {
1206+
// enabled indicates whether the cluster is enabled to downgrade.
1207+
bool enabled = 1;
1208+
// targetVersion is the target downgrade version.
1209+
string targetVersion = 2;
12011210
}
12021211

12031212
message AuthEnableRequest {

etcdctl/ctlv3/command/printer.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package command
1717
import (
1818
"errors"
1919
"fmt"
20+
"strconv"
2021
"strings"
2122

2223
"github.com/dustin/go-humanize"
@@ -220,7 +221,7 @@ func makeEndpointHealthTable(healthList []epHealth) (hdr []string, rows [][]stri
220221
func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]string) {
221222
hdr = []string{
222223
"endpoint", "ID", "version", "storage version", "db size", "in use", "percentage not in use", "quota", "is leader", "is learner", "raft term",
223-
"raft index", "raft applied index", "errors",
224+
"raft index", "raft applied index", "errors", "downgrade target version", "downgrade enabled",
224225
}
225226
for _, status := range statusList {
226227
rows = append(rows, []string{
@@ -238,6 +239,8 @@ func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]stri
238239
fmt.Sprint(status.Resp.RaftIndex),
239240
fmt.Sprint(status.Resp.RaftAppliedIndex),
240241
fmt.Sprint(strings.Join(status.Resp.Errors, ", ")),
242+
status.Resp.DowngradeInfo.GetTargetVersion(),
243+
strconv.FormatBool(status.Resp.DowngradeInfo.GetEnabled()),
241244
})
242245
}
243246
return hdr, rows

etcdctl/ctlv3/command/printer_fields.go

+2
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ func (p *fieldsPrinter) EndpointStatus(eps []epStatus) {
203203
fmt.Println(`"RaftAppliedIndex" :`, ep.Resp.RaftAppliedIndex)
204204
fmt.Println(`"Errors" :`, ep.Resp.Errors)
205205
fmt.Printf("\"Endpoint\" : %q\n", ep.Ep)
206+
fmt.Printf("\"DowngradeTargetVersion\" : %q\n", ep.Resp.DowngradeInfo.GetTargetVersion())
207+
fmt.Println(`"DowngradeEnabled" :`, ep.Resp.DowngradeInfo.GetEnabled())
206208
fmt.Println()
207209
}
208210
}

scripts/etcd_version_annotations.txt

+4
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ etcdserverpb.DeleteRangeResponse: "3.0"
150150
etcdserverpb.DeleteRangeResponse.deleted: ""
151151
etcdserverpb.DeleteRangeResponse.header: ""
152152
etcdserverpb.DeleteRangeResponse.prev_kvs: "3.1"
153+
etcdserverpb.DowngradeInfo: ""
154+
etcdserverpb.DowngradeInfo.enabled: ""
155+
etcdserverpb.DowngradeInfo.targetVersion: ""
153156
etcdserverpb.DowngradeRequest: "3.5"
154157
etcdserverpb.DowngradeRequest.CANCEL: ""
155158
etcdserverpb.DowngradeRequest.DowngradeAction: "3.5"
@@ -382,6 +385,7 @@ etcdserverpb.StatusResponse: "3.0"
382385
etcdserverpb.StatusResponse.dbSize: ""
383386
etcdserverpb.StatusResponse.dbSizeInUse: "3.4"
384387
etcdserverpb.StatusResponse.dbSizeQuota: "3.6"
388+
etcdserverpb.StatusResponse.downgradeInfo: "3.6"
385389
etcdserverpb.StatusResponse.errors: "3.4"
386390
etcdserverpb.StatusResponse.header: ""
387391
etcdserverpb.StatusResponse.isLearner: "3.4"

server/etcdserver/api/v3rpc/maintenance.go

+7
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,17 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
262262
DbSizeInUse: ms.bg.Backend().SizeInUse(),
263263
IsLearner: ms.cs.IsLearner(),
264264
DbSizeQuota: ms.cg.Config().QuotaBackendBytes,
265+
DowngradeInfo: &pb.DowngradeInfo{Enabled: false},
265266
}
266267
if storageVersion := ms.vs.GetStorageVersion(); storageVersion != nil {
267268
resp.StorageVersion = storageVersion.String()
268269
}
270+
if downgradeInfo := ms.vs.GetDowngradeInfo(); downgradeInfo != nil {
271+
resp.DowngradeInfo = &pb.DowngradeInfo{
272+
Enabled: downgradeInfo.Enabled,
273+
TargetVersion: downgradeInfo.TargetVersion,
274+
}
275+
}
269276
if resp.Leader == raft.None {
270277
resp.Errors = append(resp.Errors, errors.ErrNoLeader.Error())
271278
}

tests/e2e/cluster_downgrade_test.go

+24-24
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/stretchr/testify/assert"
2626
"github.com/stretchr/testify/require"
2727

28+
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
2829
"go.etcd.io/etcd/api/v3/version"
2930
"go.etcd.io/etcd/client/pkg/v3/fileutil"
3031
"go.etcd.io/etcd/client/pkg/v3/types"
@@ -51,6 +52,10 @@ func TestDowngradeUpgradeClusterOf1(t *testing.T) {
5152
testDowngradeUpgrade(t, 1, 1, false, noCancellation)
5253
}
5354

55+
func TestDowngradeUpgrade2InClusterOf3(t *testing.T) {
56+
testDowngradeUpgrade(t, 2, 3, false, noCancellation)
57+
}
58+
5459
func TestDowngradeUpgradeClusterOf3(t *testing.T) {
5560
testDowngradeUpgrade(t, 3, 3, false, noCancellation)
5661
}
@@ -128,6 +133,9 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
128133
time.Sleep(etcdserver.HealthInterval)
129134
}
130135

136+
t.Log("Downgrade should be disabled")
137+
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: false})
138+
131139
t.Log("Adding member to test membership, but a learner avoid breaking quorum")
132140
resp, err := cc.MemberAddAsLearner(context.Background(), "fake1", []string{"http://127.0.0.1:1001"})
133141
require.NoError(t, err)
@@ -150,6 +158,10 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
150158
return // No need to perform downgrading, end the test here
151159
}
152160
e2e.DowngradeEnable(t, epc, lastVersion)
161+
162+
t.Log("Downgrade should be enabled")
163+
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: true, TargetVersion: lastClusterVersion.String()})
164+
153165
if triggerCancellation == cancelRightAfterEnable {
154166
t.Logf("Cancelling downgrade right after enabling (no node is downgraded yet)")
155167
e2e.DowngradeCancel(t, epc)
@@ -162,10 +174,10 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
162174
t.Logf("Elect members for operations on members: %v", membersToChange)
163175

164176
t.Logf("Starting downgrade process to %q", lastVersionStr)
165-
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, currentVersion, lastClusterVersion)
177+
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, true, currentVersion, lastClusterVersion)
166178
require.NoError(t, err)
167179
if len(membersToChange) == len(epc.Procs) {
168-
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
180+
e2e.AssertProcessLogs(t, epc.Procs[epc.WaitLeader(t)], "the cluster has been downgraded")
169181
}
170182

171183
t.Log("Downgrade complete")
@@ -198,10 +210,19 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
198210
beforeMembers, beforeKV = getMembersAndKeys(t, cc)
199211

200212
t.Logf("Starting upgrade process to %q", currentVersionStr)
201-
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, lastClusterVersion, currentVersion)
213+
downgradeEnabled := triggerCancellation == noCancellation && numberOfMembersToDowngrade < clusterSize
214+
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, downgradeEnabled, lastClusterVersion, currentVersion)
202215
require.NoError(t, err)
203216
t.Log("Upgrade complete")
204217

218+
if downgradeEnabled {
219+
t.Log("Downgrade should be still enabled")
220+
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: true, TargetVersion: lastClusterVersion.String()})
221+
} else {
222+
t.Log("Downgrade should be disabled")
223+
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: false})
224+
}
225+
205226
afterMembers, afterKV = getMembersAndKeys(t, cc)
206227
assert.Equal(t, beforeKV.Kvs, afterKV.Kvs)
207228
assert.Equal(t, beforeMembers.Members, afterMembers.Members)
@@ -224,27 +245,6 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr
224245
return epc
225246
}
226247

227-
func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
228-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
229-
defer cancel()
230-
for i := 0; i < len(epc.Procs); i++ {
231-
endpoints := epc.Procs[i].EndpointsGRPC()
232-
cli, err := clientv3.New(clientv3.Config{
233-
Endpoints: endpoints,
234-
DialTimeout: 3 * time.Second,
235-
})
236-
require.NoError(t, err)
237-
defer cli.Close()
238-
resp, err := cli.Status(ctx, endpoints[0])
239-
require.NoError(t, err)
240-
if resp.Header.GetMemberId() == resp.Leader {
241-
return epc.Procs[i]
242-
}
243-
}
244-
t.Fatal("Leader not found")
245-
return nil
246-
}
247-
248248
func generateSnapshot(t *testing.T, snapshotCount uint64, cc *e2e.EtcdctlV3) {
249249
ctx, cancel := context.WithCancel(context.Background())
250250
defer cancel()

tests/framework/e2e/cluster.go

+16
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"testing"
2929
"time"
3030

31+
"github.com/coreos/go-semver/semver"
3132
"go.uber.org/zap"
3233
"go.uber.org/zap/zaptest"
3334

@@ -711,6 +712,21 @@ func (cfg *EtcdProcessClusterConfig) binaryPath(i int) string {
711712
return execPath
712713
}
713714

715+
func (epc *EtcdProcessCluster) MinServerVersion() (*semver.Version, error) {
716+
var minVersion *semver.Version
717+
for _, member := range epc.Procs {
718+
ver, err := GetVersionFromBinary(member.Config().ExecPath)
719+
if err != nil {
720+
return nil, fmt.Errorf("failed to get version from member %s binary: %w", member.Config().Name, err)
721+
}
722+
723+
if minVersion == nil || ver.LessThan(*minVersion) {
724+
minVersion = ver
725+
}
726+
}
727+
return minVersion, nil
728+
}
729+
714730
func values(cfg embed.Config) map[string]string {
715731
fs := flag.NewFlagSet("etcd", flag.ContinueOnError)
716732
cfg.AddFlags(fs)

tests/framework/e2e/downgrade.go

+73-6
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import (
2727
"github.com/stretchr/testify/require"
2828
"go.uber.org/zap"
2929

30+
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
3031
"go.etcd.io/etcd/api/v3/version"
32+
"go.etcd.io/etcd/server/v3/etcdserver"
3133
"go.etcd.io/etcd/tests/v3/framework/testutils"
3234
)
3335

@@ -46,7 +48,6 @@ func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version)
4648
Server: OffsetMinor(ver, 1).String(),
4749
Storage: ver.String(),
4850
})
49-
AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
5051
}
5152

5253
t.Log("Cluster is ready for downgrade")
@@ -82,14 +83,59 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
8283
t.Log("Cluster downgrade cancellation is completed")
8384
}
8485

85-
func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
86+
func ValidateDowngradeInfo(t *testing.T, clus *EtcdProcessCluster, expected *pb.DowngradeInfo) {
87+
cfg := clus.Cfg
88+
89+
for i := 0; i < len(clus.Procs); i++ {
90+
member := clus.Procs[i]
91+
mc := member.Etcdctl()
92+
mName := member.Config().Name
93+
94+
testutils.ExecuteWithTimeout(t, 1*time.Minute, func() {
95+
for {
96+
statuses, err := mc.Status(context.Background())
97+
if err != nil {
98+
cfg.Logger.Warn("failed to get member status and retrying",
99+
zap.Error(err),
100+
zap.String("member", mName))
101+
102+
time.Sleep(time.Second)
103+
continue
104+
}
105+
106+
require.Lenf(t, statuses, 1, "member %s", mName)
107+
got := (*pb.StatusResponse)(statuses[0]).GetDowngradeInfo()
108+
109+
if got.GetEnabled() == expected.GetEnabled() && got.GetTargetVersion() == expected.GetTargetVersion() {
110+
cfg.Logger.Info("DowngradeInfo match", zap.String("member", mName))
111+
break
112+
}
113+
114+
cfg.Logger.Warn("DowngradeInfo didn't match retrying",
115+
zap.String("member", mName),
116+
zap.Dict("expected",
117+
zap.Bool("Enabled", expected.GetEnabled()),
118+
zap.String("TargetVersion", expected.GetTargetVersion()),
119+
),
120+
zap.Dict("got",
121+
zap.Bool("Enabled", got.GetEnabled()),
122+
zap.String("TargetVersion", got.GetTargetVersion()),
123+
),
124+
)
125+
time.Sleep(time.Second)
126+
}
127+
})
128+
}
129+
}
130+
131+
func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, downgradeEnabled bool, currentVersion, targetVersion *semver.Version) error {
86132
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
87133
t.Logf("Elect members for operations on members: %v", membersToChange)
88134

89-
return DowngradeUpgradeMembersByID(t, lg, clus, membersToChange, currentVersion, targetVersion)
135+
return DowngradeUpgradeMembersByID(t, lg, clus, membersToChange, downgradeEnabled, currentVersion, targetVersion)
90136
}
91137

92-
func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, membersToChange []int, currentVersion, targetVersion *semver.Version) error {
138+
func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, membersToChange []int, downgradeEnabled bool, currentVersion, targetVersion *semver.Version) error {
93139
if lg == nil {
94140
lg = clus.lg
95141
}
@@ -100,7 +146,6 @@ func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcess
100146
opString = "downgrading"
101147
newExecPath = BinPath.EtcdLastRelease
102148
}
103-
104149
for _, memberID := range membersToChange {
105150
member := clus.Procs[memberID]
106151
if member.Config().ExecPath == newExecPath {
@@ -117,11 +162,33 @@ func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcess
117162
return err
118163
}
119164
}
165+
166+
t.Log("Waiting health interval to make sure the leader propagates version to new processes")
167+
time.Sleep(etcdserver.HealthInterval)
168+
120169
lg.Info("Validating versions")
170+
clusterVersion := targetVersion
171+
if !isDowngrade {
172+
if downgradeEnabled {
173+
// If the downgrade isn't cancelled yet, then the cluster
174+
// version will always stay at the lower version, no matter
175+
// what's the binary version of each member.
176+
clusterVersion = currentVersion
177+
} else {
178+
// If the downgrade has already been cancelled, then the
179+
// cluster version is the minimal server version.
180+
minVer, err := clus.MinServerVersion()
181+
if err != nil {
182+
return fmt.Errorf("failed to get min server version: %w", err)
183+
}
184+
clusterVersion = minVer
185+
}
186+
}
187+
121188
for _, memberID := range membersToChange {
122189
member := clus.Procs[memberID]
123190
ValidateVersion(t, clus.Cfg, member, version.Versions{
124-
Cluster: targetVersion.String(),
191+
Cluster: clusterVersion.String(),
125192
Server: targetVersion.String(),
126193
})
127194
}

0 commit comments

Comments
 (0)