Skip to content

Commit 4068189

Browse files
authored
Merge pull request #18825 from serathius/snapshot-separate
Run a separate in memory snapshot to reduce number of entries stored in raft memory storage
2 parents 6e8b3e3 + 4989834 commit 4068189

File tree

4 files changed

+134
-78
lines changed

4 files changed

+134
-78
lines changed

server/etcdserver/api/membership/cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
899899
if m.ClientURLs != nil {
900900
mustUpdateMemberAttrInStore(c.lg, store, m)
901901
}
902-
c.lg.Info(
902+
c.lg.Debug(
903903
"snapshot storing member",
904904
zap.String("id", m.ID.String()),
905905
zap.Strings("peer-urls", m.PeerURLs),

server/etcdserver/server.go

+73-60
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ const (
109109
readyPercentThreshold = 0.9
110110

111111
DowngradeEnabledPath = "/downgrade/enabled"
112+
memorySnapshotCount = 100
112113
)
113114

114115
var (
@@ -291,9 +292,10 @@ type EtcdServer struct {
291292
clusterVersionChanged *notify.Notifier
292293

293294
*AccessController
294-
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295+
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295296
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
296-
forceSnapshot bool
297+
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
298+
forceDiskSnapshot bool
297299
corruptionChecker CorruptionChecker
298300
}
299301

@@ -741,10 +743,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
741743
}
742744

743745
type etcdProgress struct {
744-
confState raftpb.ConfState
745-
snapi uint64
746-
appliedt uint64
747-
appliedi uint64
746+
confState raftpb.ConfState
747+
diskSnapshotIndex uint64
748+
memorySnapshotIndex uint64
749+
appliedt uint64
750+
appliedi uint64
748751
}
749752

750753
// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
@@ -809,10 +812,11 @@ func (s *EtcdServer) run() {
809812
s.r.start(rh)
810813

811814
ep := etcdProgress{
812-
confState: sn.Metadata.ConfState,
813-
snapi: sn.Metadata.Index,
814-
appliedt: sn.Metadata.Term,
815-
appliedi: sn.Metadata.Index,
815+
confState: sn.Metadata.ConfState,
816+
diskSnapshotIndex: sn.Metadata.Index,
817+
memorySnapshotIndex: sn.Metadata.Index,
818+
appliedt: sn.Metadata.Term,
819+
appliedi: sn.Metadata.Index,
816820
}
817821

818822
defer func() {
@@ -979,7 +983,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
979983
// storage, since the raft routine might be slower than toApply routine.
980984
<-apply.notifyc
981985

982-
s.triggerSnapshot(ep)
986+
s.snapshotIfNeededAndCompactRaftLog(ep)
983987
select {
984988
// snapshot requested via send()
985989
case m := <-s.r.msgSnapC:
@@ -998,15 +1002,15 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
9981002
lg := s.Logger()
9991003
lg.Info(
10001004
"applying snapshot",
1001-
zap.Uint64("current-snapshot-index", ep.snapi),
1005+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10021006
zap.Uint64("current-applied-index", ep.appliedi),
10031007
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10041008
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
10051009
)
10061010
defer func() {
10071011
lg.Info(
10081012
"applied snapshot",
1009-
zap.Uint64("current-snapshot-index", ep.snapi),
1013+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10101014
zap.Uint64("current-applied-index", ep.appliedi),
10111015
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10121016
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
@@ -1017,7 +1021,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
10171021
if toApply.snapshot.Metadata.Index <= ep.appliedi {
10181022
lg.Panic(
10191023
"unexpected leader snapshot from outdated index",
1020-
zap.Uint64("current-snapshot-index", ep.snapi),
1024+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10211025
zap.Uint64("current-applied-index", ep.appliedi),
10221026
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10231027
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
@@ -1132,7 +1136,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
11321136

11331137
ep.appliedt = toApply.snapshot.Metadata.Term
11341138
ep.appliedi = toApply.snapshot.Metadata.Index
1135-
ep.snapi = ep.appliedi
1139+
ep.diskSnapshotIndex = ep.appliedi
1140+
ep.memorySnapshotIndex = ep.appliedi
11361141
ep.confState = toApply.snapshot.Metadata.ConfState
11371142

11381143
// As backends and implementations like alarmsStore changed, we need
@@ -1188,31 +1193,26 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
11881193
}
11891194

11901195
func (s *EtcdServer) ForceSnapshot() {
1191-
s.forceSnapshot = true
1196+
s.forceDiskSnapshot = true
11921197
}
11931198

1194-
func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
1195-
if !s.shouldSnapshot(ep) {
1199+
func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) {
1200+
//TODO: Remove disk snapshot in v3.7
1201+
shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep)
1202+
shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep)
1203+
if !shouldSnapshotToDisk && !shouldSnapshotToMemory {
11961204
return
11971205
}
1198-
lg := s.Logger()
1199-
lg.Info(
1200-
"triggering snapshot",
1201-
zap.String("local-member-id", s.MemberID().String()),
1202-
zap.Uint64("local-member-applied-index", ep.appliedi),
1203-
zap.Uint64("local-member-snapshot-index", ep.snapi),
1204-
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
1205-
zap.Bool("snapshot-forced", s.forceSnapshot),
1206-
)
1207-
s.forceSnapshot = false
1208-
1209-
s.snapshot(ep.appliedi, ep.confState)
1206+
s.snapshot(ep, shouldSnapshotToDisk)
12101207
s.compactRaftLog(ep.appliedi)
1211-
ep.snapi = ep.appliedi
12121208
}
12131209

1214-
func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
1215-
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
1210+
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
1211+
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
1212+
}
1213+
1214+
func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool {
1215+
return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount
12161216
}
12171217

12181218
func (s *EtcdServer) hasMultipleVotingMembers() bool {
@@ -2128,23 +2128,33 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
21282128
}
21292129

21302130
// TODO: non-blocking snapshot
2131-
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
2132-
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
2133-
// commit kv to write metadata (for example: consistent index) to disk.
2134-
//
2135-
// This guarantees that Backend's consistent_index is >= index of last snapshot.
2136-
//
2137-
// KV().commit() updates the consistent index in backend.
2138-
// All operations that update consistent index must be called sequentially
2139-
// from applyAll function.
2140-
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2141-
// the go routine created below.
2142-
s.KV().Commit()
2143-
2131+
func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
21442132
lg := s.Logger()
2133+
d := GetMembershipInfoInV2Format(lg, s.cluster)
2134+
if toDisk {
2135+
s.Logger().Info(
2136+
"triggering snapshot",
2137+
zap.String("local-member-id", s.MemberID().String()),
2138+
zap.Uint64("local-member-applied-index", ep.appliedi),
2139+
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
2140+
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
2141+
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
2142+
)
2143+
s.forceDiskSnapshot = false
2144+
// commit kv to write metadata (for example: consistent index) to disk.
2145+
//
2146+
// This guarantees that Backend's consistent_index is >= index of last snapshot.
2147+
//
2148+
// KV().commit() updates the consistent index in backend.
2149+
// All operations that update consistent index must be called sequentially
2150+
// from applyAll function.
2151+
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2152+
// the go routine created below.
2153+
s.KV().Commit()
2154+
}
21452155

21462156
// For backward compatibility, generate v2 snapshot from v3 state.
2147-
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
2157+
snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
21482158
if err != nil {
21492159
// the snapshot was done asynchronously with the progress of raft.
21502160
// raft might have already got a newer snapshot.
@@ -2153,21 +2163,25 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
21532163
}
21542164
lg.Panic("failed to create snapshot", zap.Error(err))
21552165
}
2166+
ep.memorySnapshotIndex = ep.appliedi
21562167

21572168
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
21582169

2159-
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2160-
if err = s.r.storage.SaveSnap(snap); err != nil {
2161-
lg.Panic("failed to save snapshot", zap.Error(err))
2162-
}
2163-
if err = s.r.storage.Release(snap); err != nil {
2164-
lg.Panic("failed to release wal", zap.Error(err))
2165-
}
2170+
if toDisk {
2171+
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2172+
if err = s.r.storage.SaveSnap(snap); err != nil {
2173+
lg.Panic("failed to save snapshot", zap.Error(err))
2174+
}
2175+
ep.diskSnapshotIndex = ep.appliedi
2176+
if err = s.r.storage.Release(snap); err != nil {
2177+
lg.Panic("failed to release wal", zap.Error(err))
2178+
}
21662179

2167-
lg.Info(
2168-
"saved snapshot",
2169-
zap.Uint64("snapshot-index", snap.Metadata.Index),
2170-
)
2180+
lg.Info(
2181+
"saved snapshot to disk",
2182+
zap.Uint64("snapshot-index", snap.Metadata.Index),
2183+
)
2184+
}
21712185
}
21722186

21732187
func (s *EtcdServer) compactRaftLog(snapi uint64) {
@@ -2188,7 +2202,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
21882202
if snapi > s.Cfg.SnapshotCatchUpEntries {
21892203
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
21902204
}
2191-
21922205
err := s.r.raftStorage.Compact(compacti)
21932206
if err != nil {
21942207
// the compaction was done asynchronously with the progress of raft.
@@ -2198,7 +2211,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
21982211
}
21992212
lg.Panic("failed to compact", zap.Error(err))
22002213
}
2201-
lg.Info(
2214+
lg.Debug(
22022215
"compacted Raft logs",
22032216
zap.Uint64("compact-index", compacti),
22042217
)

server/etcdserver/server_test.go

+57-16
Original file line numberDiff line numberDiff line change
@@ -627,8 +627,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
627627
}
628628
}
629629

630-
// TestSnapshot should snapshot the store and cut the persistent
631-
func TestSnapshot(t *testing.T) {
630+
// TestSnapshotDisk should save the snapshot to disk and release old snapshots
631+
func TestSnapshotDisk(t *testing.T) {
632632
revertFunc := verify.DisableVerifications()
633633
defer revertFunc()
634634

@@ -667,24 +667,65 @@ func TestSnapshot(t *testing.T) {
667667
gaction, _ := p.Wait(2)
668668
defer func() { ch <- struct{}{} }()
669669

670-
if len(gaction) != 2 {
671-
t.Errorf("len(action) = %d, want 2", len(gaction))
672-
return
673-
}
674-
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
675-
t.Errorf("action = %s, want SaveSnap", gaction[0])
676-
}
670+
assert.Len(t, gaction, 2)
671+
assert.Equal(t, testutil.Action{Name: "SaveSnap"}, gaction[0])
672+
assert.Equal(t, testutil.Action{Name: "Release"}, gaction[1])
673+
}()
674+
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
675+
srv.snapshot(&ep, true)
676+
<-ch
677+
assert.Empty(t, st.Action())
678+
assert.Equal(t, uint64(1), ep.diskSnapshotIndex)
679+
assert.Equal(t, uint64(1), ep.memorySnapshotIndex)
680+
}
677681

678-
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) {
679-
t.Errorf("action = %s, want Release", gaction[1])
680-
}
682+
func TestSnapshotMemory(t *testing.T) {
683+
revertFunc := verify.DisableVerifications()
684+
defer revertFunc()
685+
686+
be, _ := betesting.NewDefaultTmpBackend(t)
687+
defer betesting.Close(t, be)
688+
689+
s := raft.NewMemoryStorage()
690+
s.Append([]raftpb.Entry{{Index: 1}})
691+
st := mockstore.NewRecorderStream()
692+
p := mockstorage.NewStorageRecorderStream("")
693+
r := newRaftNode(raftNodeConfig{
694+
lg: zaptest.NewLogger(t),
695+
Node: newNodeNop(),
696+
raftStorage: s,
697+
storage: p,
698+
})
699+
srv := &EtcdServer{
700+
lgMu: new(sync.RWMutex),
701+
lg: zaptest.NewLogger(t),
702+
r: *r,
703+
v2store: st,
704+
consistIndex: cindex.NewConsistentIndex(be),
705+
}
706+
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
707+
defer func() {
708+
assert.NoError(t, srv.kv.Close())
681709
}()
710+
srv.be = be
682711

683-
srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
712+
cl := membership.NewCluster(zaptest.NewLogger(t))
713+
srv.cluster = cl
714+
715+
ch := make(chan struct{}, 1)
716+
717+
go func() {
718+
gaction, _ := p.Wait(1)
719+
defer func() { ch <- struct{}{} }()
720+
721+
assert.Empty(t, gaction)
722+
}()
723+
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
724+
srv.snapshot(&ep, false)
684725
<-ch
685-
if len(st.Action()) != 0 {
686-
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
687-
}
726+
assert.Empty(t, st.Action())
727+
assert.Equal(t, uint64(0), ep.diskSnapshotIndex)
728+
assert.Equal(t, uint64(1), ep.memorySnapshotIndex)
688729
}
689730

690731
// TestSnapshotOrdering ensures raft persists snapshot onto disk before

tests/integration/v3_watch_restore_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
111111
//
112112
// Since there is no way to confirm server has compacted the log, we
113113
// use log monitor to watch and expect "compacted Raft logs" content.
114-
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
114+
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
115+
// For now let's use snapshot log which should be equivalent to compaction.
116+
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)
115117

116118
// After RecoverPartition, leader L will send snapshot to slow F_m0
117119
// follower, because F_m0(index:8) is 'out of date' compared to

0 commit comments

Comments
 (0)