Skip to content

Commit e4d3691

Browse files
committed
Run a separate in memory snapshot to reduce number of entries stored in raft memory storage
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 414f75b commit e4d3691

File tree

4 files changed

+118
-44
lines changed

4 files changed

+118
-44
lines changed

server/etcdserver/api/membership/cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
899899
if m.ClientURLs != nil {
900900
mustUpdateMemberAttrInStore(c.lg, store, m)
901901
}
902-
c.lg.Info(
902+
c.lg.Debug(
903903
"snapshot storing member",
904904
zap.String("id", m.ID.String()),
905905
zap.Strings("peer-urls", m.PeerURLs),

server/etcdserver/server.go

+53-39
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ const (
109109
readyPercentThreshold = 0.9
110110

111111
DowngradeEnabledPath = "/downgrade/enabled"
112+
memorySnapshotCount = 100
112113
)
113114

114115
var (
@@ -293,6 +294,7 @@ type EtcdServer struct {
293294
*AccessController
294295
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295296
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
297+
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
296298
forceDiskSnapshot bool
297299
corruptionChecker CorruptionChecker
298300
}
@@ -1195,17 +1197,27 @@ func (s *EtcdServer) ForceSnapshot() {
11951197
}
11961198

11971199
func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) {
1198-
if !s.shouldSnapshot(ep) {
1200+
//TODO: Remove disk snapshot in v3.7
1201+
shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep)
1202+
shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep)
1203+
if !shouldSnapshotToDisk && !shouldSnapshotToMemory {
11991204
return
12001205
}
1201-
s.snapshot(ep)
1206+
if shouldSnapshotToDisk {
1207+
s.forceDiskSnapshot = false
1208+
}
1209+
s.snapshot(ep, shouldSnapshotToDisk)
12021210
s.compactRaftLog(ep.appliedi)
12031211
}
12041212

1205-
func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
1213+
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
12061214
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
12071215
}
12081216

1217+
func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool {
1218+
return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount
1219+
}
1220+
12091221
func (s *EtcdServer) hasMultipleVotingMembers() bool {
12101222
return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
12111223
}
@@ -2119,28 +2131,29 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
21192131
}
21202132

21212133
// TODO: non-blocking snapshot
2122-
func (s *EtcdServer) snapshot(ep *etcdProgress) {
2134+
func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
21232135
lg := s.Logger()
2124-
lg.Info(
2125-
"triggering snapshot",
2126-
zap.String("local-member-id", s.MemberID().String()),
2127-
zap.Uint64("local-member-applied-index", ep.appliedi),
2128-
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
2129-
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
2130-
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
2131-
)
2132-
s.forceDiskSnapshot = false
2133-
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
2134-
// commit kv to write metadata (for example: consistent index) to disk.
2135-
//
2136-
// This guarantees that Backend's consistent_index is >= index of last snapshot.
2137-
//
2138-
// KV().commit() updates the consistent index in backend.
2139-
// All operations that update consistent index must be called sequentially
2140-
// from applyAll function.
2141-
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2142-
// the go routine created below.
2143-
s.KV().Commit()
2136+
d := GetMembershipInfoInV2Format(lg, s.cluster)
2137+
if toDisk {
2138+
s.Logger().Info(
2139+
"triggering snapshot",
2140+
zap.String("local-member-id", s.MemberID().String()),
2141+
zap.Uint64("local-member-applied-index", ep.appliedi),
2142+
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
2143+
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
2144+
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
2145+
)
2146+
// commit kv to write metadata (for example: consistent index) to disk.
2147+
//
2148+
// This guarantees that Backend's consistent_index is >= index of last snapshot.
2149+
//
2150+
// KV().commit() updates the consistent index in backend.
2151+
// All operations that update consistent index must be called sequentially
2152+
// from applyAll function.
2153+
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2154+
// the go routine created below.
2155+
s.KV().Commit()
2156+
}
21442157

21452158
// For backward compatibility, generate v2 snapshot from v3 state.
21462159
snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
@@ -2152,23 +2165,25 @@ func (s *EtcdServer) snapshot(ep *etcdProgress) {
21522165
}
21532166
lg.Panic("failed to create snapshot", zap.Error(err))
21542167
}
2168+
ep.memorySnapshotIndex = ep.appliedi
21552169

21562170
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
21572171

2158-
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2159-
if err = s.r.storage.SaveSnap(snap); err != nil {
2160-
lg.Panic("failed to save snapshot", zap.Error(err))
2161-
}
2162-
ep.diskSnapshotIndex = ep.appliedi
2163-
ep.memorySnapshotIndex = ep.appliedi
2164-
if err = s.r.storage.Release(snap); err != nil {
2165-
lg.Panic("failed to release wal", zap.Error(err))
2166-
}
2172+
if toDisk {
2173+
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2174+
if err = s.r.storage.SaveSnap(snap); err != nil {
2175+
lg.Panic("failed to save snapshot", zap.Error(err))
2176+
}
2177+
ep.diskSnapshotIndex = ep.appliedi
2178+
if err = s.r.storage.Release(snap); err != nil {
2179+
lg.Panic("failed to release wal", zap.Error(err))
2180+
}
21672181

2168-
lg.Info(
2169-
"saved snapshot",
2170-
zap.Uint64("snapshot-index", snap.Metadata.Index),
2171-
)
2182+
lg.Info(
2183+
"saved snapshot to disk",
2184+
zap.Uint64("snapshot-index", snap.Metadata.Index),
2185+
)
2186+
}
21722187
}
21732188

21742189
func (s *EtcdServer) compactRaftLog(snapi uint64) {
@@ -2189,7 +2204,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
21892204
if snapi > s.Cfg.SnapshotCatchUpEntries {
21902205
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
21912206
}
2192-
21932207
err := s.r.raftStorage.Compact(compacti)
21942208
if err != nil {
21952209
// the compaction was done asynchronously with the progress of raft.
@@ -2199,7 +2213,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
21992213
}
22002214
lg.Panic("failed to compact", zap.Error(err))
22012215
}
2202-
lg.Info(
2216+
lg.Debug(
22032217
"compacted Raft logs",
22042218
zap.Uint64("compact-index", compacti),
22052219
)

server/etcdserver/server_test.go

+61-3
Original file line numberDiff line numberDiff line change
@@ -627,8 +627,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
627627
}
628628
}
629629

630-
// TestSnapshot should snapshot the store and cut the persistent
631-
func TestSnapshot(t *testing.T) {
630+
// TestSnapshotDisk should save the snapshot to disk and release old snapshots
631+
func TestSnapshotDisk(t *testing.T) {
632632
revertFunc := verify.DisableVerifications()
633633
defer revertFunc()
634634

@@ -680,7 +680,7 @@ func TestSnapshot(t *testing.T) {
680680
}
681681
}()
682682
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
683-
srv.snapshot(&ep)
683+
srv.snapshot(&ep, true)
684684
<-ch
685685
if len(st.Action()) != 0 {
686686
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
@@ -693,6 +693,64 @@ func TestSnapshot(t *testing.T) {
693693
}
694694
}
695695

696+
func TestSnapshotMemory(t *testing.T) {
697+
revertFunc := verify.DisableVerifications()
698+
defer revertFunc()
699+
700+
be, _ := betesting.NewDefaultTmpBackend(t)
701+
defer betesting.Close(t, be)
702+
703+
s := raft.NewMemoryStorage()
704+
s.Append([]raftpb.Entry{{Index: 1}})
705+
st := mockstore.NewRecorderStream()
706+
p := mockstorage.NewStorageRecorderStream("")
707+
r := newRaftNode(raftNodeConfig{
708+
lg: zaptest.NewLogger(t),
709+
Node: newNodeNop(),
710+
raftStorage: s,
711+
storage: p,
712+
})
713+
srv := &EtcdServer{
714+
lgMu: new(sync.RWMutex),
715+
lg: zaptest.NewLogger(t),
716+
r: *r,
717+
v2store: st,
718+
consistIndex: cindex.NewConsistentIndex(be),
719+
}
720+
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
721+
defer func() {
722+
assert.NoError(t, srv.kv.Close())
723+
}()
724+
srv.be = be
725+
726+
cl := membership.NewCluster(zaptest.NewLogger(t))
727+
srv.cluster = cl
728+
729+
ch := make(chan struct{}, 1)
730+
731+
go func() {
732+
gaction, _ := p.Wait(1)
733+
defer func() { ch <- struct{}{} }()
734+
735+
if len(gaction) != 0 {
736+
t.Errorf("len(action) = %d, want 0", len(gaction))
737+
return
738+
}
739+
}()
740+
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
741+
srv.snapshot(&ep, false)
742+
<-ch
743+
if len(st.Action()) != 0 {
744+
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
745+
}
746+
if ep.diskSnapshotIndex != 0 {
747+
t.Errorf("ep.diskSnapshotIndex = %d, want 0", ep.diskSnapshotIndex)
748+
}
749+
if ep.memorySnapshotIndex != 1 {
750+
t.Errorf("ep.memorySnapshotIndex = %d, want 1", ep.memorySnapshotIndex)
751+
}
752+
}
753+
696754
// TestSnapshotOrdering ensures raft persists snapshot onto disk before
697755
// snapshot db is applied.
698756
func TestSnapshotOrdering(t *testing.T) {

tests/integration/v3_watch_restore_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
111111
//
112112
// Since there is no way to confirm server has compacted the log, we
113113
// use log monitor to watch and expect "compacted Raft logs" content.
114-
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
114+
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
115+
// For now let's use snapshot log which should be equivalent to compaction.
116+
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)
115117

116118
// After RecoverPartition, leader L will send snapshot to slow F_m0
117119
// follower, because F_m0(index:8) is 'out of date' compared to

0 commit comments

Comments
 (0)