Skip to content

Commit d8f3b56

Browse files
committed
Create a v2 snapshot when running etcdutl migrate command
Signed-off-by: Benjamin Wang <[email protected]>
1 parent 9db8dcb commit d8f3b56

File tree

4 files changed

+226
-42
lines changed

4 files changed

+226
-42
lines changed

etcdutl/ctl.go

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func init() {
4545
etcdutl.NewVersionCommand(),
4646
etcdutl.NewCompletionCommand(),
4747
etcdutl.NewMigrateCommand(),
48+
etcdutl.NewV2SnapshotCommand(), // TODO: remove in 3.8
4849
)
4950
}
5051

etcdutl/etcdutl/common.go

+120
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,24 @@
1515
package etcdutl
1616

1717
import (
18+
"errors"
19+
"fmt"
20+
21+
"github.com/coreos/go-semver/semver"
1822
"go.uber.org/zap"
1923
"go.uber.org/zap/zapcore"
2024

2125
"go.etcd.io/etcd/client/pkg/v3/logutil"
2226
"go.etcd.io/etcd/pkg/v3/cobrautl"
27+
"go.etcd.io/etcd/server/v3/etcdserver"
28+
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
29+
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
30+
"go.etcd.io/etcd/server/v3/storage/backend"
31+
"go.etcd.io/etcd/server/v3/storage/datadir"
32+
"go.etcd.io/etcd/server/v3/storage/schema"
33+
"go.etcd.io/etcd/server/v3/storage/wal"
34+
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
35+
"go.etcd.io/raft/v3/raftpb"
2336
)
2437

2538
func GetLogger() *zap.Logger {
@@ -32,3 +45,110 @@ func GetLogger() *zap.Logger {
3245
}
3346
return lg
3447
}
48+
49+
func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) {
50+
snapshot, err := getLatestV2Snapshot(lg, dataDir)
51+
if err != nil {
52+
return walpb.Snapshot{}, err
53+
}
54+
55+
var walsnap walpb.Snapshot
56+
if snapshot != nil {
57+
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
58+
}
59+
return walsnap, nil
60+
}
61+
62+
func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, error) {
63+
walPath := datadir.ToWALDir(dataDir)
64+
walSnaps, err := wal.ValidSnapshotEntries(lg, walPath)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
ss := snap.New(lg, datadir.ToSnapDir(dataDir))
70+
snapshot, err := ss.LoadNewestAvailable(walSnaps)
71+
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
72+
return nil, err
73+
}
74+
75+
return snapshot, nil
76+
}
77+
78+
func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
79+
var (
80+
lg = GetLogger()
81+
82+
snapDir = datadir.ToSnapDir(dataDir)
83+
walDir = datadir.ToWALDir(dataDir)
84+
)
85+
86+
ci, term := schema.ReadConsistentIndex(be.ReadTx())
87+
88+
cl := membership.NewCluster(lg)
89+
cl.SetBackend(schema.NewMembershipBackend(lg, be))
90+
cl.Recover(func(*zap.Logger, *semver.Version) {})
91+
92+
latestWALSnap, err := getLatestWALSnap(lg, dataDir)
93+
if err != nil {
94+
return err
95+
}
96+
97+
// Each time before creating the v2 snapshot, etcdserve always flush
98+
// the backend storage (bbolt db), so the consistent index should never
99+
// less than the Index or term of the latest snapshot.
100+
if ci < latestWALSnap.Index || term < latestWALSnap.Term {
101+
// This should never happen
102+
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)
103+
}
104+
105+
voters, learners := getVotersAndLearners(cl)
106+
confState := raftpb.ConfState{
107+
Voters: voters,
108+
Learners: learners,
109+
}
110+
111+
// create the v2 snaspshot file
112+
raftSnap := raftpb.Snapshot{
113+
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
114+
Metadata: raftpb.SnapshotMetadata{
115+
Index: ci,
116+
Term: term,
117+
ConfState: confState,
118+
},
119+
}
120+
sn := snap.New(lg, snapDir)
121+
if err := sn.SaveSnap(raftSnap); err != nil {
122+
return err
123+
}
124+
125+
// save WAL snapshot record
126+
w, err := wal.Open(lg, walDir, latestWALSnap)
127+
if err != nil {
128+
return err
129+
}
130+
defer w.Close()
131+
// We must read all records to locate the tail of the last valid WAL file.
132+
if _, _, _, err = w.ReadAll(); err != nil {
133+
return err
134+
}
135+
136+
return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState})
137+
}
138+
139+
func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
140+
var (
141+
voters []uint64
142+
learners []uint64
143+
)
144+
for _, m := range cl.Members() {
145+
if m.IsLearner {
146+
learners = append(learners, uint64(m.ID))
147+
continue
148+
}
149+
150+
voters = append(voters, uint64(m.ID))
151+
}
152+
153+
return voters, learners
154+
}

etcdutl/etcdutl/migrate_command.go

+44-42
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package etcdutl
1616

1717
import (
18-
"errors"
1918
"fmt"
2019
"strings"
2120

@@ -25,12 +24,10 @@ import (
2524

2625
"go.etcd.io/etcd/api/v3/version"
2726
"go.etcd.io/etcd/pkg/v3/cobrautl"
28-
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
2927
"go.etcd.io/etcd/server/v3/storage/backend"
3028
"go.etcd.io/etcd/server/v3/storage/datadir"
3129
"go.etcd.io/etcd/server/v3/storage/schema"
3230
"go.etcd.io/etcd/server/v3/storage/wal"
33-
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
3431
)
3532

3633
// NewMigrateCommand prints out the version of etcd.
@@ -77,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) {
7774

7875
func (o *migrateOptions) Config() (*migrateConfig, error) {
7976
c := &migrateConfig{
80-
force: o.force,
81-
lg: GetLogger(),
77+
force: o.force,
78+
dataDir: o.dataDir,
79+
lg: GetLogger(),
8280
}
8381
var err error
8482
dotCount := strings.Count(o.targetVersion, ".")
@@ -93,67 +91,69 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
9391
return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion))
9492
}
9593

96-
dbPath := datadir.ToBackendFileName(o.dataDir)
97-
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
94+
return c, nil
95+
}
9896

99-
walPath := datadir.ToWALDir(o.dataDir)
100-
walSnap, err := getLatestWALSnap(c.lg, o.dataDir)
97+
type migrateConfig struct {
98+
lg *zap.Logger
99+
be backend.Backend
100+
targetVersion *semver.Version
101+
walVersion schema.WALVersion
102+
dataDir string
103+
force bool
104+
}
105+
106+
func (c *migrateConfig) finalize() error {
107+
walPath := datadir.ToWALDir(c.dataDir)
108+
walSnap, err := getLatestWALSnap(c.lg, c.dataDir)
101109
if err != nil {
102-
return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err)
110+
return fmt.Errorf("failed to get the lastest snapshot: %w", err)
103111
}
104112
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
105113
if err != nil {
106-
return nil, fmt.Errorf(`failed to open wal: %w`, err)
114+
return fmt.Errorf(`failed to open wal: %w`, err)
107115
}
108116
defer w.Close()
109117
c.walVersion, err = wal.ReadWALVersion(w)
110118
if err != nil {
111-
return nil, fmt.Errorf(`failed to read wal: %w`, err)
112-
}
113-
114-
return c, nil
115-
}
116-
117-
func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) {
118-
walPath := datadir.ToWALDir(dataDir)
119-
walSnaps, err := wal.ValidSnapshotEntries(lg, walPath)
120-
if err != nil {
121-
return walpb.Snapshot{}, err
122-
}
123-
124-
ss := snap.New(lg, datadir.ToSnapDir(dataDir))
125-
snapshot, err := ss.LoadNewestAvailable(walSnaps)
126-
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
127-
return walpb.Snapshot{}, err
128-
}
129-
130-
var walsnap walpb.Snapshot
131-
if snapshot != nil {
132-
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
119+
return fmt.Errorf(`failed to read wal: %w`, err)
133120
}
134-
return walsnap, nil
135-
}
136121

137-
type migrateConfig struct {
138-
lg *zap.Logger
139-
be backend.Backend
140-
targetVersion *semver.Version
141-
walVersion schema.WALVersion
142-
force bool
122+
return nil
143123
}
144124

145125
func migrateCommandFunc(c *migrateConfig) error {
126+
dbPath := datadir.ToBackendFileName(c.dataDir)
127+
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
146128
defer c.be.Close()
129+
147130
tx := c.be.BatchTx()
148131
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
149132
if err != nil {
150-
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
133+
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))
151134
return err
152135
}
153136
if current == *c.targetVersion {
154137
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
155138
return nil
156139
}
140+
141+
// Update cluster version
142+
be := schema.NewMembershipBackend(c.lg, c.be)
143+
be.MustSaveClusterVersionToBackend(c.targetVersion)
144+
145+
// forcibly create a v2 snapshot file
146+
// TODO: remove in 3.8
147+
if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil {
148+
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
149+
return err
150+
}
151+
152+
if err = c.finalize(); err != nil {
153+
c.lg.Error("Failed to finalize config", zap.Error(err))
154+
return err
155+
}
156+
157157
err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion)
158158
if err != nil {
159159
if !c.force {
@@ -162,7 +162,9 @@ func migrateCommandFunc(c *migrateConfig) error {
162162
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
163163
migrateForce(c.lg, tx, c.targetVersion)
164164
}
165+
165166
c.be.ForceCommit()
167+
166168
return nil
167169
}
168170

etcdutl/etcdutl/v2snapshot_command.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2025 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package etcdutl
16+
17+
import (
18+
"fmt"
19+
"path/filepath"
20+
21+
"github.com/spf13/cobra"
22+
23+
"go.etcd.io/etcd/pkg/v3/cobrautl"
24+
"go.etcd.io/etcd/server/v3/storage/backend"
25+
"go.etcd.io/etcd/server/v3/storage/datadir"
26+
)
27+
28+
var createV2SnapDataDir string
29+
30+
// NewV2SnapshotCommand returns the cobra command for "v2snapshot".
31+
// TODO: remove the command in 3.8
32+
func NewV2SnapshotCommand() *cobra.Command {
33+
cmd := &cobra.Command{
34+
Use: "v2snapshot <subcommand>",
35+
Short: "Manages etcd node v2snapshots",
36+
}
37+
cmd.AddCommand(newV2SnapshotCreateCommand())
38+
return cmd
39+
}
40+
41+
func newV2SnapshotCreateCommand() *cobra.Command {
42+
cmd := &cobra.Command{
43+
Use: "create",
44+
Short: "Create a new v2 snapshot file",
45+
Run: v2SnapshotCreateFunc,
46+
}
47+
cmd.Flags().StringVar(&createV2SnapDataDir, "data-dir", "", "Required. Path to the etcd data directory.")
48+
cmd.MarkFlagRequired("data-dir")
49+
cmd.MarkFlagDirname("data-dir")
50+
return cmd
51+
}
52+
53+
func v2SnapshotCreateFunc(_ *cobra.Command, _ []string) {
54+
be := backend.NewDefaultBackend(GetLogger(), filepath.Join(datadir.ToSnapDir(createV2SnapDataDir), "db"))
55+
defer be.Close()
56+
57+
if err := createV2SnapshotFromV3Store(createV2SnapDataDir, be); err != nil {
58+
cobrautl.ExitWithError(cobrautl.ExitError, err)
59+
}
60+
fmt.Println("Created a v2 snapshot file.")
61+
}

0 commit comments

Comments
 (0)