Skip to content

Commit d0cff01

Browse files
committed
Run a separate in memory snapshot to reduce number of entries stored in raft memory storage
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 3de0018 commit d0cff01

File tree

3 files changed

+58
-34
lines changed

3 files changed

+58
-34
lines changed

server/etcdserver/api/membership/cluster.go

-6
Original file line numberDiff line numberDiff line change
@@ -899,12 +899,6 @@ func (c *RaftCluster) Store(store v2store.Store) {
899899
if m.ClientURLs != nil {
900900
mustUpdateMemberAttrInStore(c.lg, store, m)
901901
}
902-
c.lg.Info(
903-
"snapshot storing member",
904-
zap.String("id", m.ID.String()),
905-
zap.Strings("peer-urls", m.PeerURLs),
906-
zap.Bool("is-learner", m.IsLearner),
907-
)
908902
}
909903
for id := range c.removed {
910904
//We do not need to delete the member since the store is empty.

server/etcdserver/server.go

+57-27
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ const (
109109
readyPercentThreshold = 0.9
110110

111111
DowngradeEnabledPath = "/downgrade/enabled"
112+
memorySnapshotCount = 10
112113
)
113114

114115
var (
@@ -291,9 +292,10 @@ type EtcdServer struct {
291292
clusterVersionChanged *notify.Notifier
292293

293294
*AccessController
294-
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295+
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295296
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
296-
forceSnapshot bool
297+
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
298+
forceDiskSnapshot bool
297299
corruptionChecker CorruptionChecker
298300
}
299301

@@ -741,10 +743,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
741743
}
742744

743745
type etcdProgress struct {
744-
confState raftpb.ConfState
745-
snapi uint64
746-
appliedt uint64
747-
appliedi uint64
746+
confState raftpb.ConfState
747+
diskSnapshotIndex uint64
748+
memorySnapshotIndex uint64
749+
appliedt uint64
750+
appliedi uint64
748751
}
749752

750753
// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
@@ -809,10 +812,11 @@ func (s *EtcdServer) run() {
809812
s.r.start(rh)
810813

811814
ep := etcdProgress{
812-
confState: sn.Metadata.ConfState,
813-
snapi: sn.Metadata.Index,
814-
appliedt: sn.Metadata.Term,
815-
appliedi: sn.Metadata.Index,
815+
confState: sn.Metadata.ConfState,
816+
diskSnapshotIndex: sn.Metadata.Index,
817+
memorySnapshotIndex: sn.Metadata.Index,
818+
appliedt: sn.Metadata.Term,
819+
appliedi: sn.Metadata.Index,
816820
}
817821

818822
defer func() {
@@ -998,15 +1002,15 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
9981002
lg := s.Logger()
9991003
lg.Info(
10001004
"applying snapshot",
1001-
zap.Uint64("current-snapshot-index", ep.snapi),
1005+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10021006
zap.Uint64("current-applied-index", ep.appliedi),
10031007
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10041008
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
10051009
)
10061010
defer func() {
10071011
lg.Info(
10081012
"applied snapshot",
1009-
zap.Uint64("current-snapshot-index", ep.snapi),
1013+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10101014
zap.Uint64("current-applied-index", ep.appliedi),
10111015
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10121016
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
@@ -1017,7 +1021,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
10171021
if toApply.snapshot.Metadata.Index <= ep.appliedi {
10181022
lg.Panic(
10191023
"unexpected leader snapshot from outdated index",
1020-
zap.Uint64("current-snapshot-index", ep.snapi),
1024+
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
10211025
zap.Uint64("current-applied-index", ep.appliedi),
10221026
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
10231027
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
@@ -1132,7 +1136,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
11321136

11331137
ep.appliedt = toApply.snapshot.Metadata.Term
11341138
ep.appliedi = toApply.snapshot.Metadata.Index
1135-
ep.snapi = ep.appliedi
1139+
ep.diskSnapshotIndex = ep.appliedi
1140+
ep.memorySnapshotIndex = ep.appliedi
11361141
ep.confState = toApply.snapshot.Metadata.ConfState
11371142

11381143
// As backends and implementations like alarmsStore changed, we need
@@ -1188,31 +1193,37 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
11881193
}
11891194

11901195
func (s *EtcdServer) ForceSnapshot() {
1191-
s.forceSnapshot = true
1196+
s.forceDiskSnapshot = true
11921197
}
11931198

11941199
func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
1195-
if !s.shouldSnapshot(ep) {
1200+
if !s.shouldSnapshotToDisk(ep) {
1201+
if ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount {
1202+
s.snapshotToMemory(ep.appliedi, ep.confState)
1203+
s.compactRaftLog(ep.appliedi)
1204+
ep.memorySnapshotIndex = ep.appliedi
1205+
}
11961206
return
11971207
}
1208+
//TODO: Remove disk snapshot in v3.7
11981209
lg := s.Logger()
11991210
lg.Info(
12001211
"triggering snapshot",
12011212
zap.String("local-member-id", s.MemberID().String()),
12021213
zap.Uint64("local-member-applied-index", ep.appliedi),
1203-
zap.Uint64("local-member-snapshot-index", ep.snapi),
1214+
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
12041215
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
1205-
zap.Bool("snapshot-forced", s.forceSnapshot),
1216+
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
12061217
)
1207-
s.forceSnapshot = false
1218+
s.forceDiskSnapshot = false
12081219

1209-
s.snapshot(ep.appliedi, ep.confState)
1220+
s.snapshotToDisk(ep.appliedi, ep.confState)
12101221
s.compactRaftLog(ep.appliedi)
1211-
ep.snapi = ep.appliedi
1222+
ep.diskSnapshotIndex = ep.appliedi
12121223
}
12131224

1214-
func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
1215-
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
1225+
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
1226+
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
12161227
}
12171228

12181229
func (s *EtcdServer) hasMultipleVotingMembers() bool {
@@ -2132,7 +2143,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
21322143
}
21332144

21342145
// TODO: non-blocking snapshot
2135-
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
2146+
func (s *EtcdServer) snapshotToDisk(snapi uint64, confState raftpb.ConfState) {
21362147
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
21372148
// commit kv to write metadata (for example: consistent index) to disk.
21382149
//
@@ -2174,6 +2185,25 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
21742185
)
21752186
}
21762187

2188+
func (s *EtcdServer) snapshotToMemory(snapi uint64, confState raftpb.ConfState) {
2189+
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
2190+
2191+
lg := s.Logger()
2192+
2193+
// For backward compatibility, generate v2 snapshot from v3 state.
2194+
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
2195+
if err != nil {
2196+
// the snapshot was done asynchronously with the progress of raft.
2197+
// raft might have already got a newer snapshot.
2198+
if errorspkg.Is(err, raft.ErrSnapOutOfDate) {
2199+
return
2200+
}
2201+
lg.Panic("failed to create snapshot", zap.Error(err))
2202+
}
2203+
2204+
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
2205+
}
2206+
21772207
func (s *EtcdServer) compactRaftLog(snapi uint64) {
21782208
lg := s.Logger()
21792209

@@ -2189,10 +2219,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
21892219

21902220
// keep some in memory log entries for slow followers.
21912221
compacti := uint64(1)
2192-
if snapi > s.Cfg.SnapshotCatchUpEntries {
2193-
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
2222+
if snapi <= s.Cfg.SnapshotCatchUpEntries {
2223+
return
21942224
}
2195-
2225+
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
21962226
err := s.r.raftStorage.Compact(compacti)
21972227
if err != nil {
21982228
// the compaction was done asynchronously with the progress of raft.

server/etcdserver/server_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ func TestSnapshot(t *testing.T) {
649649
}
650650
}()
651651

652-
srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
652+
srv.snapshotToDisk(1, raftpb.ConfState{Voters: []uint64{1}})
653653
<-ch
654654
if len(st.Action()) != 0 {
655655
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))

0 commit comments

Comments
 (0)