Skip to content

Commit 732e7ef

Browse files
committed
Create a v2 snapshot when running etcdutl migrate command
Also added test to cover the etcdutl migrate command Signed-off-by: Benjamin Wang <[email protected]>
1 parent 32cfd45 commit 732e7ef

File tree

4 files changed

+374
-6
lines changed

4 files changed

+374
-6
lines changed

etcdutl/etcdutl/common.go

+93
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@ package etcdutl
1616

1717
import (
1818
"errors"
19+
"fmt"
1920

2021
"go.uber.org/zap"
2122
"go.uber.org/zap/zapcore"
2223

2324
"go.etcd.io/etcd/client/pkg/v3/logutil"
2425
"go.etcd.io/etcd/pkg/v3/cobrautl"
26+
"go.etcd.io/etcd/server/v3/etcdserver"
27+
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
2528
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
29+
"go.etcd.io/etcd/server/v3/storage/backend"
2630
"go.etcd.io/etcd/server/v3/storage/datadir"
31+
"go.etcd.io/etcd/server/v3/storage/schema"
2732
"go.etcd.io/etcd/server/v3/storage/wal"
2833
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
2934
"go.etcd.io/raft/v3/raftpb"
@@ -68,3 +73,91 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro
6873

6974
return snapshot, nil
7075
}
76+
77+
func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
78+
var (
79+
lg = GetLogger()
80+
81+
snapDir = datadir.ToSnapDir(dataDir)
82+
walDir = datadir.ToWALDir(dataDir)
83+
)
84+
85+
ci, term := schema.ReadConsistentIndex(be.ReadTx())
86+
87+
cl := membership.NewCluster(lg)
88+
cl.SetBackend(schema.NewMembershipBackend(lg, be))
89+
cl.UnsafeLoad()
90+
91+
latestWALSnap, err := getLatestWALSnap(lg, dataDir)
92+
if err != nil {
93+
return err
94+
}
95+
96+
// Each time before creating the v2 snapshot, etcdserve always flush
97+
// the backend storage (bbolt db), so the consistent index should never
98+
// less than the Index or term of the latest snapshot.
99+
if ci < latestWALSnap.Index || term < latestWALSnap.Term {
100+
// This should never happen
101+
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)
102+
}
103+
104+
voters, learners := getVotersAndLearners(cl)
105+
confState := raftpb.ConfState{
106+
Voters: voters,
107+
Learners: learners,
108+
}
109+
110+
// create the v2 snaspshot file
111+
raftSnap := raftpb.Snapshot{
112+
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
113+
Metadata: raftpb.SnapshotMetadata{
114+
Index: ci,
115+
Term: term,
116+
ConfState: confState,
117+
},
118+
}
119+
sn := snap.New(lg, snapDir)
120+
if err = sn.SaveSnap(raftSnap); err != nil {
121+
return err
122+
}
123+
124+
// save WAL snapshot record
125+
w, err := wal.Open(lg, walDir, latestWALSnap)
126+
if err != nil {
127+
return err
128+
}
129+
defer w.Close()
130+
// We must read all records to locate the tail of the last valid WAL file.
131+
_, st, _, err := w.ReadAll()
132+
if err != nil {
133+
return err
134+
}
135+
136+
if err := w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}); err != nil {
137+
return err
138+
}
139+
140+
if term >= st.Term && ci > st.Commit {
141+
if err := w.Save(raftpb.HardState{Term: term, Commit: ci, Vote: st.Vote}, nil); err != nil {
142+
return err
143+
}
144+
}
145+
return w.Sync()
146+
}
147+
148+
func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
149+
var (
150+
voters []uint64
151+
learners []uint64
152+
)
153+
for _, m := range cl.Members() {
154+
if m.IsLearner {
155+
learners = append(learners, uint64(m.ID))
156+
continue
157+
}
158+
159+
voters = append(voters, uint64(m.ID))
160+
}
161+
162+
return voters, learners
163+
}

etcdutl/etcdutl/common_test.go

+228
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,26 @@
1515
package etcdutl
1616

1717
import (
18+
"path"
19+
"path/filepath"
1820
"testing"
1921

22+
"github.com/coreos/go-semver/semver"
2023
"github.com/stretchr/testify/require"
2124
"go.uber.org/zap"
2225

2326
"go.etcd.io/etcd/api/v3/etcdserverpb"
2427
"go.etcd.io/etcd/client/pkg/v3/fileutil"
28+
"go.etcd.io/etcd/client/pkg/v3/types"
2529
"go.etcd.io/etcd/pkg/v3/pbutil"
30+
"go.etcd.io/etcd/server/v3/etcdserver"
31+
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
2632
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
33+
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
34+
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
35+
"go.etcd.io/etcd/server/v3/storage/backend"
2736
"go.etcd.io/etcd/server/v3/storage/datadir"
37+
"go.etcd.io/etcd/server/v3/storage/schema"
2838
"go.etcd.io/etcd/server/v3/storage/wal"
2939
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
3040
"go.etcd.io/raft/v3/raftpb"
@@ -141,3 +151,221 @@ func TestGetLatestWalSnap(t *testing.T) {
141151
})
142152
}
143153
}
154+
155+
func TestCreateV2SnapshotFromV3Store(t *testing.T) {
156+
testCases := []struct {
157+
name string
158+
initialIndex uint64
159+
initialTerm uint64
160+
initialCommitIndex uint64
161+
consistentIndex uint64
162+
term uint64
163+
clusterVersion string
164+
members []uint64
165+
learners []uint64
166+
removedMembers []uint64
167+
expectedErrMsg string
168+
}{
169+
{
170+
name: "unexpected term: less than the last snapshot.term",
171+
initialTerm: 4,
172+
initialIndex: 2,
173+
initialCommitIndex: 2,
174+
consistentIndex: 10,
175+
term: 2,
176+
expectedErrMsg: "less than the latest snapshot",
177+
},
178+
{
179+
name: "unexpected consistent index: less than the last snapshot.index",
180+
initialTerm: 2,
181+
initialIndex: 20,
182+
initialCommitIndex: 20,
183+
consistentIndex: 10,
184+
term: 3,
185+
expectedErrMsg: "less than the latest snapshot",
186+
},
187+
{
188+
name: "normal case with a smaller initial commit index",
189+
initialTerm: 3,
190+
initialIndex: 4,
191+
initialCommitIndex: 20,
192+
consistentIndex: 32,
193+
term: 4,
194+
clusterVersion: "3.5.0",
195+
members: []uint64{100, 200},
196+
learners: []uint64{300},
197+
removedMembers: []uint64{400, 500},
198+
},
199+
{
200+
name: "normal case with a bigger initial commit index",
201+
initialTerm: 3,
202+
initialIndex: 4,
203+
initialCommitIndex: 40,
204+
consistentIndex: 32,
205+
term: 4,
206+
clusterVersion: "3.5.0",
207+
members: []uint64{100, 200},
208+
learners: []uint64{300},
209+
removedMembers: []uint64{400, 500},
210+
},
211+
{
212+
name: "empty cluster version",
213+
initialTerm: 3,
214+
initialIndex: 4,
215+
initialCommitIndex: 20,
216+
consistentIndex: 45,
217+
term: 4,
218+
clusterVersion: "",
219+
members: []uint64{110, 200},
220+
learners: []uint64{350},
221+
removedMembers: []uint64{450, 500},
222+
},
223+
{
224+
name: "no learner",
225+
initialTerm: 3,
226+
initialIndex: 4,
227+
initialCommitIndex: 8,
228+
consistentIndex: 7,
229+
term: 5,
230+
clusterVersion: "3.5.0",
231+
members: []uint64{150, 200},
232+
removedMembers: []uint64{450, 550},
233+
},
234+
{
235+
name: "no removed members",
236+
initialTerm: 6,
237+
initialIndex: 40,
238+
initialCommitIndex: 40,
239+
consistentIndex: 41,
240+
term: 6,
241+
clusterVersion: "3.7.0",
242+
members: []uint64{160, 200},
243+
learners: []uint64{300},
244+
},
245+
{
246+
name: "no learner and removed members",
247+
initialTerm: 6,
248+
initialIndex: 18,
249+
initialCommitIndex: 40,
250+
consistentIndex: 19,
251+
term: 6,
252+
clusterVersion: "3.6.0",
253+
members: []uint64{120, 220},
254+
},
255+
}
256+
257+
for _, tc := range testCases {
258+
t.Run(tc.name, func(t *testing.T) {
259+
dataDir := t.TempDir()
260+
lg := zap.NewNop()
261+
262+
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToMemberDir(dataDir)))
263+
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToWALDir(dataDir)))
264+
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToSnapDir(dataDir)))
265+
266+
// generate the initial state for wal and v2 snapshot,
267+
t.Log("Populate the wal file")
268+
w, err := wal.Create(lg, datadir.ToWALDir(dataDir), pbutil.MustMarshal(
269+
&etcdserverpb.Metadata{
270+
NodeID: 1,
271+
ClusterID: 2,
272+
},
273+
))
274+
require.NoError(t, err)
275+
err = w.SaveSnapshot(walpb.Snapshot{Index: tc.initialIndex, Term: tc.initialTerm, ConfState: &raftpb.ConfState{Voters: []uint64{1}}})
276+
require.NoError(t, err)
277+
err = w.Save(raftpb.HardState{Term: tc.initialTerm, Commit: tc.initialCommitIndex, Vote: 1}, nil)
278+
require.NoError(t, err)
279+
err = w.Close()
280+
require.NoError(t, err)
281+
282+
t.Log("Generate a v2 snapshot file")
283+
ss := snap.New(lg, datadir.ToSnapDir(dataDir))
284+
err = ss.SaveSnap(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: tc.initialIndex, Term: tc.initialTerm, ConfState: raftpb.ConfState{Voters: []uint64{1}}}})
285+
require.NoError(t, err)
286+
287+
t.Log("Load and verify the latest v2 snapshot file")
288+
oldV2Snap, err := getLatestV2Snapshot(lg, dataDir)
289+
require.NoError(t, err)
290+
require.Equal(t, raftpb.SnapshotMetadata{Index: tc.initialIndex, Term: tc.initialTerm, ConfState: raftpb.ConfState{Voters: []uint64{1}}}, oldV2Snap.Metadata)
291+
292+
t.Log("Prepare the bbolt db")
293+
be := backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db"))
294+
schema.CreateMetaBucket(be.BatchTx())
295+
schema.NewMembershipBackend(lg, be).MustCreateBackendBuckets()
296+
297+
if len(tc.clusterVersion) > 0 {
298+
t.Logf("Populate the cluster version: %s", tc.clusterVersion)
299+
schema.NewMembershipBackend(lg, be).MustSaveClusterVersionToBackend(semver.New(tc.clusterVersion))
300+
} else {
301+
t.Log("Skip populating cluster version due to not provided")
302+
}
303+
304+
tx := be.BatchTx()
305+
tx.LockOutsideApply()
306+
t.Log("Populate the consistent index and term")
307+
ci := cindex.NewConsistentIndex(be)
308+
ci.SetConsistentIndex(tc.consistentIndex, tc.term)
309+
ci.UnsafeSave(tx)
310+
tx.Unlock()
311+
312+
t.Logf("Populate members: %d", len(tc.members))
313+
memberBackend := schema.NewMembershipBackend(lg, be)
314+
for _, mID := range tc.members {
315+
memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID)})
316+
}
317+
318+
t.Logf("Populate learner: %d", len(tc.learners))
319+
for _, mID := range tc.learners {
320+
memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID), RaftAttributes: membership.RaftAttributes{IsLearner: true}})
321+
}
322+
323+
t.Logf("Populate removed members: %d", len(tc.removedMembers))
324+
for _, mID := range tc.removedMembers {
325+
memberBackend.MustDeleteMemberFromBackend(types.ID(mID))
326+
}
327+
328+
t.Log("Committing bbolt db")
329+
be.ForceCommit()
330+
require.NoError(t, be.Close())
331+
332+
t.Log("Creating a new v2 snapshot file based on the v3 store")
333+
err = createV2SnapshotFromV3Store(dataDir, backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db")))
334+
if len(tc.expectedErrMsg) > 0 {
335+
require.ErrorContains(t, err, tc.expectedErrMsg)
336+
return
337+
}
338+
require.NoError(t, err)
339+
340+
t.Log("Loading & verifying the new latest v2 snapshot file")
341+
newV2Snap, err := getLatestV2Snapshot(lg, dataDir)
342+
require.NoError(t, err)
343+
require.Equal(t, raftpb.SnapshotMetadata{Index: tc.consistentIndex, Term: tc.term, ConfState: raftpb.ConfState{Voters: tc.members, Learners: tc.learners}}, newV2Snap.Metadata)
344+
345+
st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
346+
require.NoError(t, st.Recovery(newV2Snap.Data))
347+
348+
cv, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "version"), false, false)
349+
if len(tc.clusterVersion) > 0 {
350+
require.NoError(t, err)
351+
if !semver.New(*cv.Node.Value).Equal(*semver.New(tc.clusterVersion)) {
352+
t.Fatalf("Unexpected cluster version, got %s, want %s", semver.New(*cv.Node.Value).String(), tc.clusterVersion)
353+
}
354+
} else {
355+
require.ErrorContains(t, err, "Key not found")
356+
}
357+
358+
members, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "members"), true, true)
359+
require.NoError(t, err)
360+
require.Len(t, members.Node.Nodes, len(tc.members)+len(tc.learners))
361+
362+
removedMembers, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "removed_members"), true, true)
363+
if len(tc.removedMembers) > 0 {
364+
require.NoError(t, err)
365+
require.Equal(t, len(tc.removedMembers), len(removedMembers.Node.Nodes))
366+
} else {
367+
require.ErrorContains(t, err, "Key not found")
368+
}
369+
})
370+
}
371+
}

etcdutl/etcdutl/migrate_command.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,28 @@ func migrateCommandFunc(c *migrateConfig) error {
129129
tx := be.BatchTx()
130130
current, err := schema.DetectSchemaVersion(c.lg, be.ReadTx())
131131
if err != nil {
132-
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
132+
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))
133133
return err
134134
}
135135
if current == *c.targetVersion {
136136
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
137137
return nil
138138
}
139139

140+
// only generate a v2 snapshot file for downgrade case
141+
if c.targetVersion.LessThan(current) {
142+
// Update cluster version
143+
schema.NewMembershipBackend(c.lg, be).MustSaveClusterVersionToBackend(c.targetVersion)
144+
145+
// forcibly create a v2 snapshot file
146+
// TODO: remove in 3.8
147+
if err = createV2SnapshotFromV3Store(c.dataDir, be); err != nil {
148+
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
149+
return err
150+
}
151+
c.lg.Info("Generated a v2 snapshot file")
152+
}
153+
140154
if err = c.finalize(); err != nil {
141155
c.lg.Error("Failed to finalize config", zap.Error(err))
142156
return err
@@ -150,6 +164,7 @@ func migrateCommandFunc(c *migrateConfig) error {
150164
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
151165
migrateForce(c.lg, tx, c.targetVersion)
152166
}
167+
153168
be.ForceCommit()
154169
return nil
155170
}

0 commit comments

Comments
 (0)