Skip to content

Commit 3f30099

Browse files
committed
Run a separate in memory snapshot to reduce number of entries stored in raft memory storage
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 3de0018 commit 3f30099

File tree

3 files changed

+71
-55
lines changed

3 files changed

+71
-55
lines changed

server/etcdserver/api/membership/cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
899899
if m.ClientURLs != nil {
900900
mustUpdateMemberAttrInStore(c.lg, store, m)
901901
}
902-
c.lg.Info(
902+
c.lg.Debug(
903903
"snapshot storing member",
904904
zap.String("id", m.ID.String()),
905905
zap.Strings("peer-urls", m.PeerURLs),

server/etcdserver/server.go

+67-53
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ const (
109109
readyPercentThreshold = 0.9
110110

111111
DowngradeEnabledPath = "/downgrade/enabled"
112+
memorySnapshotCount = 100
112113
)
113114

114115
var (
@@ -291,9 +292,10 @@ type EtcdServer struct {
291292
clusterVersionChanged *notify.Notifier
292293

293294
*AccessController
294-
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295+
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295296
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
296-
forceSnapshot bool
297+
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
298+
forceDiskSnapshot bool
297299
corruptionChecker CorruptionChecker
298300
}
299301

@@ -741,10 +743,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
741743
}
742744

743745
type etcdProgress struct {
744-
confState raftpb.ConfState
745-
snapi uint64
746-
appliedt uint64
747-
appliedi uint64
746+
confState raftpb.ConfState
747+
diskSnapshotIndex uint64
748+
memorySnapshotIndex uint64
749+
appliedt uint64
750+
appliedi uint64
748751
}
749752

750753
// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
@@ -809,10 +812,11 @@ func (s *EtcdServer) run() {
809812
s.r.start(rh)
810813

811814
ep := etcdProgress{
812-
confState: sn.Metadata.ConfState,
813-
snapi: sn.Metadata.Index,
814-
appliedt: sn.Metadata.Term,
815-
appliedi: sn.Metadata.Index,
815+
confState: sn.Metadata.ConfState,
816+
diskSnapshotIndex: sn.Metadata.Index,
817+
memorySnapshotIndex: sn.Metadata.Index,
818+
appliedt: sn.Metadata.Term,
819+
appliedi: sn.Metadata.Index,
816820
}
817821

818822
defer func() {
@@ -998,15 +1002,15 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
9981002
lg := s.Logger()
9991003
lg.Info(
10001004
"applying snapshot",
1001-
zap.Uint64("current-snapshot-index", ep.snapi),
1005+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10021006
zap.Uint64("current-applied-index", ep.appliedi),
10031007
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10041008
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
10051009
)
10061010
defer func() {
10071011
lg.Info(
10081012
"applied snapshot",
1009-
zap.Uint64("current-snapshot-index", ep.snapi),
1013+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10101014
zap.Uint64("current-applied-index", ep.appliedi),
10111015
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10121016
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
@@ -1017,7 +1021,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
10171021
if toApply.snapshot.Metadata.Index <= ep.appliedi {
10181022
lg.Panic(
10191023
"unexpected leader snapshot from outdated index",
1020-
zap.Uint64("current-snapshot-index", ep.snapi),
1024+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10211025
zap.Uint64("current-applied-index", ep.appliedi),
10221026
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10231027
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
@@ -1132,7 +1136,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
11321136

11331137
ep.appliedt = toApply.snapshot.Metadata.Term
11341138
ep.appliedi = toApply.snapshot.Metadata.Index
1135-
ep.snapi = ep.appliedi
1139+
ep.diskSnapshotIndex = ep.appliedi
1140+
ep.memorySnapshotIndex = ep.appliedi
11361141
ep.confState = toApply.snapshot.Metadata.ConfState
11371142

11381143
// As backends and implementations like alarmsStore changed, we need
@@ -1188,31 +1193,36 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
11881193
}
11891194

11901195
func (s *EtcdServer) ForceSnapshot() {
1191-
s.forceSnapshot = true
1196+
s.forceDiskSnapshot = true
11921197
}
11931198

11941199
func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
1195-
if !s.shouldSnapshot(ep) {
1200+
if !s.shouldSnapshotToDisk(ep) {
1201+
if ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount {
1202+
s.snapshot(ep, false)
1203+
s.compactRaftLog(ep.appliedi)
1204+
ep.memorySnapshotIndex = ep.appliedi
1205+
}
11961206
return
11971207
}
1208+
//TODO: Remove disk snapshot in v3.7
11981209
lg := s.Logger()
11991210
lg.Info(
12001211
"triggering snapshot",
12011212
zap.String("local-member-id", s.MemberID().String()),
12021213
zap.Uint64("local-member-applied-index", ep.appliedi),
1203-
zap.Uint64("local-member-snapshot-index", ep.snapi),
1214+
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
12041215
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
1205-
zap.Bool("snapshot-forced", s.forceSnapshot),
1216+
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
12061217
)
1207-
s.forceSnapshot = false
1218+
s.forceDiskSnapshot = false
12081219

1209-
s.snapshot(ep.appliedi, ep.confState)
1220+
s.snapshot(ep, true)
12101221
s.compactRaftLog(ep.appliedi)
1211-
ep.snapi = ep.appliedi
12121222
}
12131223

1214-
func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
1215-
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
1224+
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
1225+
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
12161226
}
12171227

12181228
func (s *EtcdServer) hasMultipleVotingMembers() bool {
@@ -2132,23 +2142,24 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
21322142
}
21332143

21342144
// TODO: non-blocking snapshot
2135-
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
2136-
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
2137-
// commit kv to write metadata (for example: consistent index) to disk.
2138-
//
2139-
// This guarantees that Backend's consistent_index is >= index of last snapshot.
2140-
//
2141-
// KV().commit() updates the consistent index in backend.
2142-
// All operations that update consistent index must be called sequentially
2143-
// from applyAll function.
2144-
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2145-
// the go routine created below.
2146-
s.KV().Commit()
2147-
2145+
func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
21482146
lg := s.Logger()
2147+
d := GetMembershipInfoInV2Format(lg, s.cluster)
2148+
if toDisk {
2149+
// commit kv to write metadata (for example: consistent index) to disk.
2150+
//
2151+
// This guarantees that Backend's consistent_index is >= index of last snapshot.
2152+
//
2153+
// KV().commit() updates the consistent index in backend.
2154+
// All operations that update consistent index must be called sequentially
2155+
// from applyAll function.
2156+
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2157+
// the go routine created below.
2158+
s.KV().Commit()
2159+
}
21492160

21502161
// For backward compatibility, generate v2 snapshot from v3 state.
2151-
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
2162+
snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
21522163
if err != nil {
21532164
// the snapshot was done asynchronously with the progress of raft.
21542165
// raft might have already got a newer snapshot.
@@ -2157,21 +2168,25 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
21572168
}
21582169
lg.Panic("failed to create snapshot", zap.Error(err))
21592170
}
2171+
ep.memorySnapshotIndex = ep.appliedi
21602172

21612173
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
21622174

2163-
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2164-
if err = s.r.storage.SaveSnap(snap); err != nil {
2165-
lg.Panic("failed to save snapshot", zap.Error(err))
2166-
}
2167-
if err = s.r.storage.Release(snap); err != nil {
2168-
lg.Panic("failed to release wal", zap.Error(err))
2169-
}
2175+
if toDisk {
2176+
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2177+
if err = s.r.storage.SaveSnap(snap); err != nil {
2178+
lg.Panic("failed to save snapshot", zap.Error(err))
2179+
}
2180+
ep.diskSnapshotIndex = ep.appliedi
2181+
if err = s.r.storage.Release(snap); err != nil {
2182+
lg.Panic("failed to release wal", zap.Error(err))
2183+
}
21702184

2171-
lg.Info(
2172-
"saved snapshot",
2173-
zap.Uint64("snapshot-index", snap.Metadata.Index),
2174-
)
2185+
lg.Info(
2186+
"saved snapshot to disk",
2187+
zap.Uint64("snapshot-index", snap.Metadata.Index),
2188+
)
2189+
}
21752190
}
21762191

21772192
func (s *EtcdServer) compactRaftLog(snapi uint64) {
@@ -2188,11 +2203,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
21882203
}
21892204

21902205
// keep some in memory log entries for slow followers.
2191-
compacti := uint64(1)
2192-
if snapi > s.Cfg.SnapshotCatchUpEntries {
2193-
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
2206+
if snapi <= s.Cfg.SnapshotCatchUpEntries {
2207+
return
21942208
}
2195-
2209+
compacti := snapi - s.Cfg.SnapshotCatchUpEntries
21962210
err := s.r.raftStorage.Compact(compacti)
21972211
if err != nil {
21982212
// the compaction was done asynchronously with the progress of raft.
@@ -2202,7 +2216,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
22022216
}
22032217
lg.Panic("failed to compact", zap.Error(err))
22042218
}
2205-
lg.Info(
2219+
lg.Debug(
22062220
"compacted Raft logs",
22072221
zap.Uint64("compact-index", compacti),
22082222
)

tests/integration/v3_watch_restore_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
110110
//
111111
// Since there is no way to confirm server has compacted the log, we
112112
// use log monitor to watch and expect "compacted Raft logs" content.
113-
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
113+
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
114+
// For now let's use snapshot log which should be equivalent to compaction.
115+
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)
114116

115117
// After RecoverPartition, leader L will send snapshot to slow F_m0
116118
// follower, because F_m0(index:8) is 'out of date' compared to

0 commit comments

Comments
 (0)