Skip to content

Commit 6cebf97

Browse files
committed
Run a separate in memory snapshot to reduce number of entries stored in raft memory storage
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent cfe958d commit 6cebf97

File tree

4 files changed

+112
-44
lines changed

4 files changed

+112
-44
lines changed

server/etcdserver/api/membership/cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
899899
if m.ClientURLs != nil {
900900
mustUpdateMemberAttrInStore(c.lg, store, m)
901901
}
902-
c.lg.Info(
902+
c.lg.Debug(
903903
"snapshot storing member",
904904
zap.String("id", m.ID.String()),
905905
zap.Strings("peer-urls", m.PeerURLs),

server/etcdserver/server.go

+51-39
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ const (
109109
readyPercentThreshold = 0.9
110110

111111
DowngradeEnabledPath = "/downgrade/enabled"
112+
memorySnapshotCount = 100
112113
)
113114

114115
var (
@@ -293,6 +294,7 @@ type EtcdServer struct {
293294
*AccessController
294295
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295296
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
297+
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
296298
forceDiskSnapshot bool
297299
corruptionChecker CorruptionChecker
298300
}
@@ -1195,17 +1197,24 @@ func (s *EtcdServer) ForceSnapshot() {
11951197
}
11961198

11971199
func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) {
1198-
if !s.shouldSnapshot(ep) {
1200+
//TODO: Remove disk snapshot in v3.7
1201+
shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep)
1202+
shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep)
1203+
if !shouldSnapshotToDisk && !shouldSnapshotToMemory {
11991204
return
12001205
}
1201-
s.snapshot(ep)
1206+
s.snapshot(ep, shouldSnapshotToDisk)
12021207
s.compactRaftLog(ep.appliedi)
12031208
}
12041209

1205-
func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
1210+
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
12061211
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
12071212
}
12081213

1214+
func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool {
1215+
return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount
1216+
}
1217+
12091218
func (s *EtcdServer) hasMultipleVotingMembers() bool {
12101219
return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
12111220
}
@@ -2119,28 +2128,30 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
21192128
}
21202129

21212130
// TODO: non-blocking snapshot
2122-
func (s *EtcdServer) snapshot(ep *etcdProgress) {
2131+
func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
21232132
lg := s.Logger()
2124-
lg.Info(
2125-
"triggering snapshot",
2126-
zap.String("local-member-id", s.MemberID().String()),
2127-
zap.Uint64("local-member-applied-index", ep.appliedi),
2128-
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
2129-
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
2130-
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
2131-
)
2132-
s.forceDiskSnapshot = false
2133-
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
2134-
// commit kv to write metadata (for example: consistent index) to disk.
2135-
//
2136-
// This guarantees that Backend's consistent_index is >= index of last snapshot.
2137-
//
2138-
// KV().commit() updates the consistent index in backend.
2139-
// All operations that update consistent index must be called sequentially
2140-
// from applyAll function.
2141-
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2142-
// the go routine created below.
2143-
s.KV().Commit()
2133+
d := GetMembershipInfoInV2Format(lg, s.cluster)
2134+
if toDisk {
2135+
s.Logger().Info(
2136+
"triggering snapshot",
2137+
zap.String("local-member-id", s.MemberID().String()),
2138+
zap.Uint64("local-member-applied-index", ep.appliedi),
2139+
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
2140+
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
2141+
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
2142+
)
2143+
s.forceDiskSnapshot = false
2144+
// commit kv to write metadata (for example: consistent index) to disk.
2145+
//
2146+
// This guarantees that Backend's consistent_index is >= index of last snapshot.
2147+
//
2148+
// KV().commit() updates the consistent index in backend.
2149+
// All operations that update consistent index must be called sequentially
2150+
// from applyAll function.
2151+
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2152+
// the go routine created below.
2153+
s.KV().Commit()
2154+
}
21442155

21452156
// For backward compatibility, generate v2 snapshot from v3 state.
21462157
snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
@@ -2152,23 +2163,25 @@ func (s *EtcdServer) snapshot(ep *etcdProgress) {
21522163
}
21532164
lg.Panic("failed to create snapshot", zap.Error(err))
21542165
}
2166+
ep.memorySnapshotIndex = ep.appliedi
21552167

21562168
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
21572169

2158-
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2159-
if err = s.r.storage.SaveSnap(snap); err != nil {
2160-
lg.Panic("failed to save snapshot", zap.Error(err))
2161-
}
2162-
ep.diskSnapshotIndex = ep.appliedi
2163-
ep.memorySnapshotIndex = ep.appliedi
2164-
if err = s.r.storage.Release(snap); err != nil {
2165-
lg.Panic("failed to release wal", zap.Error(err))
2166-
}
2170+
if toDisk {
2171+
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2172+
if err = s.r.storage.SaveSnap(snap); err != nil {
2173+
lg.Panic("failed to save snapshot", zap.Error(err))
2174+
}
2175+
ep.diskSnapshotIndex = ep.appliedi
2176+
if err = s.r.storage.Release(snap); err != nil {
2177+
lg.Panic("failed to release wal", zap.Error(err))
2178+
}
21672179

2168-
lg.Info(
2169-
"saved snapshot",
2170-
zap.Uint64("snapshot-index", snap.Metadata.Index),
2171-
)
2180+
lg.Info(
2181+
"saved snapshot to disk",
2182+
zap.Uint64("snapshot-index", snap.Metadata.Index),
2183+
)
2184+
}
21722185
}
21732186

21742187
func (s *EtcdServer) compactRaftLog(snapi uint64) {
@@ -2189,7 +2202,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
21892202
if snapi > s.Cfg.SnapshotCatchUpEntries {
21902203
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
21912204
}
2192-
21932205
err := s.r.raftStorage.Compact(compacti)
21942206
if err != nil {
21952207
// the compaction was done asynchronously with the progress of raft.
@@ -2199,7 +2211,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
21992211
}
22002212
lg.Panic("failed to compact", zap.Error(err))
22012213
}
2202-
lg.Info(
2214+
lg.Debug(
22032215
"compacted Raft logs",
22042216
zap.Uint64("compact-index", compacti),
22052217
)

server/etcdserver/server_test.go

+57-3
Original file line numberDiff line numberDiff line change
@@ -627,8 +627,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
627627
}
628628
}
629629

630-
// TestSnapshot should snapshot the store and cut the persistent
631-
func TestSnapshot(t *testing.T) {
630+
// TestSnapshotDisk should save the snapshot to disk and release old snapshots
631+
func TestSnapshotDisk(t *testing.T) {
632632
revertFunc := verify.DisableVerifications()
633633
defer revertFunc()
634634

@@ -680,7 +680,7 @@ func TestSnapshot(t *testing.T) {
680680
}
681681
}()
682682
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
683-
srv.snapshot(&ep)
683+
srv.snapshot(&ep, true)
684684
<-ch
685685
if len(st.Action()) != 0 {
686686
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
@@ -689,6 +689,60 @@ func TestSnapshot(t *testing.T) {
689689
assert.Equal(t, uint64(1), ep.memorySnapshotIndex)
690690
}
691691

692+
func TestSnapshotMemory(t *testing.T) {
693+
revertFunc := verify.DisableVerifications()
694+
defer revertFunc()
695+
696+
be, _ := betesting.NewDefaultTmpBackend(t)
697+
defer betesting.Close(t, be)
698+
699+
s := raft.NewMemoryStorage()
700+
s.Append([]raftpb.Entry{{Index: 1}})
701+
st := mockstore.NewRecorderStream()
702+
p := mockstorage.NewStorageRecorderStream("")
703+
r := newRaftNode(raftNodeConfig{
704+
lg: zaptest.NewLogger(t),
705+
Node: newNodeNop(),
706+
raftStorage: s,
707+
storage: p,
708+
})
709+
srv := &EtcdServer{
710+
lgMu: new(sync.RWMutex),
711+
lg: zaptest.NewLogger(t),
712+
r: *r,
713+
v2store: st,
714+
consistIndex: cindex.NewConsistentIndex(be),
715+
}
716+
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
717+
defer func() {
718+
assert.NoError(t, srv.kv.Close())
719+
}()
720+
srv.be = be
721+
722+
cl := membership.NewCluster(zaptest.NewLogger(t))
723+
srv.cluster = cl
724+
725+
ch := make(chan struct{}, 1)
726+
727+
go func() {
728+
gaction, _ := p.Wait(1)
729+
defer func() { ch <- struct{}{} }()
730+
731+
if len(gaction) != 0 {
732+
t.Errorf("len(action) = %d, want 0", len(gaction))
733+
return
734+
}
735+
}()
736+
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
737+
srv.snapshot(&ep, false)
738+
<-ch
739+
if len(st.Action()) != 0 {
740+
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
741+
}
742+
assert.Equal(t, uint64(0), ep.diskSnapshotIndex)
743+
assert.Equal(t, uint64(1), ep.memorySnapshotIndex)
744+
}
745+
692746
// TestSnapshotOrdering ensures raft persists snapshot onto disk before
693747
// snapshot db is applied.
694748
func TestSnapshotOrdering(t *testing.T) {

tests/integration/v3_watch_restore_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
111111
//
112112
// Since there is no way to confirm server has compacted the log, we
113113
// use log monitor to watch and expect "compacted Raft logs" content.
114-
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
114+
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
115+
// For now let's use snapshot log which should be equivalent to compaction.
116+
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)
115117

116118
// After RecoverPartition, leader L will send snapshot to slow F_m0
117119
// follower, because F_m0(index:8) is 'out of date' compared to

0 commit comments

Comments
 (0)