@@ -109,6 +109,7 @@ const (
109
109
readyPercentThreshold = 0.9
110
110
111
111
DowngradeEnabledPath = "/downgrade/enabled"
112
+ memorySnapshotCount = 100
112
113
)
113
114
114
115
var (
@@ -293,6 +294,7 @@ type EtcdServer struct {
293
294
* AccessController
294
295
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
295
296
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
297
+ // TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
296
298
forceDiskSnapshot bool
297
299
corruptionChecker CorruptionChecker
298
300
}
@@ -1195,17 +1197,27 @@ func (s *EtcdServer) ForceSnapshot() {
1195
1197
}
1196
1198
1197
1199
func (s * EtcdServer ) snapshotIfNeededAndCompactRaftLog (ep * etcdProgress ) {
1198
- if ! s .shouldSnapshot (ep ) {
1200
+ //TODO: Remove disk snapshot in v3.7
1201
+ shouldSnapshotToDisk := s .shouldSnapshotToDisk (ep )
1202
+ shouldSnapshotToMemory := s .shouldSnapshotToMemory (ep )
1203
+ if ! shouldSnapshotToDisk && ! shouldSnapshotToMemory {
1199
1204
return
1200
1205
}
1201
- s .snapshot (ep )
1206
+ if shouldSnapshotToDisk {
1207
+ s .forceDiskSnapshot = false
1208
+ }
1209
+ s .snapshot (ep , shouldSnapshotToDisk )
1202
1210
s .compactRaftLog (ep .appliedi )
1203
1211
}
1204
1212
1205
- func (s * EtcdServer ) shouldSnapshot (ep * etcdProgress ) bool {
1213
+ func (s * EtcdServer ) shouldSnapshotToDisk (ep * etcdProgress ) bool {
1206
1214
return (s .forceDiskSnapshot && ep .appliedi != ep .diskSnapshotIndex ) || (ep .appliedi - ep .diskSnapshotIndex > s .Cfg .SnapshotCount )
1207
1215
}
1208
1216
1217
+ func (s * EtcdServer ) shouldSnapshotToMemory (ep * etcdProgress ) bool {
1218
+ return ep .appliedi > ep .memorySnapshotIndex + memorySnapshotCount
1219
+ }
1220
+
1209
1221
func (s * EtcdServer ) hasMultipleVotingMembers () bool {
1210
1222
return s .cluster != nil && len (s .cluster .VotingMemberIDs ()) > 1
1211
1223
}
@@ -2119,28 +2131,29 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
2119
2131
}
2120
2132
2121
2133
// TODO: non-blocking snapshot
2122
- func (s * EtcdServer ) snapshot (ep * etcdProgress ) {
2134
+ func (s * EtcdServer ) snapshot (ep * etcdProgress , toDisk bool ) {
2123
2135
lg := s .Logger ()
2124
- lg .Info (
2125
- "triggering snapshot" ,
2126
- zap .String ("local-member-id" , s .MemberID ().String ()),
2127
- zap .Uint64 ("local-member-applied-index" , ep .appliedi ),
2128
- zap .Uint64 ("local-member-snapshot-index" , ep .diskSnapshotIndex ),
2129
- zap .Uint64 ("local-member-snapshot-count" , s .Cfg .SnapshotCount ),
2130
- zap .Bool ("snapshot-forced" , s .forceDiskSnapshot ),
2131
- )
2132
- s .forceDiskSnapshot = false
2133
- d := GetMembershipInfoInV2Format (s .Logger (), s .cluster )
2134
- // commit kv to write metadata (for example: consistent index) to disk.
2135
- //
2136
- // This guarantees that Backend's consistent_index is >= index of last snapshot.
2137
- //
2138
- // KV().commit() updates the consistent index in backend.
2139
- // All operations that update consistent index must be called sequentially
2140
- // from applyAll function.
2141
- // So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2142
- // the go routine created below.
2143
- s .KV ().Commit ()
2136
+ d := GetMembershipInfoInV2Format (lg , s .cluster )
2137
+ if toDisk {
2138
+ s .Logger ().Info (
2139
+ "triggering snapshot" ,
2140
+ zap .String ("local-member-id" , s .MemberID ().String ()),
2141
+ zap .Uint64 ("local-member-applied-index" , ep .appliedi ),
2142
+ zap .Uint64 ("local-member-snapshot-index" , ep .diskSnapshotIndex ),
2143
+ zap .Uint64 ("local-member-snapshot-count" , s .Cfg .SnapshotCount ),
2144
+ zap .Bool ("snapshot-forced" , s .forceDiskSnapshot ),
2145
+ )
2146
+ // commit kv to write metadata (for example: consistent index) to disk.
2147
+ //
2148
+ // This guarantees that Backend's consistent_index is >= index of last snapshot.
2149
+ //
2150
+ // KV().commit() updates the consistent index in backend.
2151
+ // All operations that update consistent index must be called sequentially
2152
+ // from applyAll function.
2153
+ // So KV().Commit() cannot run in parallel with toApply. It has to be called outside
2154
+ // the go routine created below.
2155
+ s .KV ().Commit ()
2156
+ }
2144
2157
2145
2158
// For backward compatibility, generate v2 snapshot from v3 state.
2146
2159
snap , err := s .r .raftStorage .CreateSnapshot (ep .appliedi , & ep .confState , d )
@@ -2152,23 +2165,25 @@ func (s *EtcdServer) snapshot(ep *etcdProgress) {
2152
2165
}
2153
2166
lg .Panic ("failed to create snapshot" , zap .Error (err ))
2154
2167
}
2168
+ ep .memorySnapshotIndex = ep .appliedi
2155
2169
2156
2170
verifyConsistentIndexIsLatest (lg , snap , s .consistIndex .ConsistentIndex ())
2157
2171
2158
- // SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2159
- if err = s . r . storage . SaveSnap ( snap ); err != nil {
2160
- lg . Panic ( "failed to save snapshot" , zap . Error ( err ))
2161
- }
2162
- ep . diskSnapshotIndex = ep . appliedi
2163
- ep .memorySnapshotIndex = ep .appliedi
2164
- if err = s .r .storage .Release (snap ); err != nil {
2165
- lg .Panic ("failed to release wal" , zap .Error (err ))
2166
- }
2172
+ if toDisk {
2173
+ // SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
2174
+ if err = s . r . storage . SaveSnap ( snap ); err != nil {
2175
+ lg . Panic ( "failed to save snapshot" , zap . Error ( err ))
2176
+ }
2177
+ ep .diskSnapshotIndex = ep .appliedi
2178
+ if err = s .r .storage .Release (snap ); err != nil {
2179
+ lg .Panic ("failed to release wal" , zap .Error (err ))
2180
+ }
2167
2181
2168
- lg .Info (
2169
- "saved snapshot" ,
2170
- zap .Uint64 ("snapshot-index" , snap .Metadata .Index ),
2171
- )
2182
+ lg .Info (
2183
+ "saved snapshot to disk" ,
2184
+ zap .Uint64 ("snapshot-index" , snap .Metadata .Index ),
2185
+ )
2186
+ }
2172
2187
}
2173
2188
2174
2189
func (s * EtcdServer ) compactRaftLog (snapi uint64 ) {
@@ -2189,7 +2204,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
2189
2204
if snapi > s .Cfg .SnapshotCatchUpEntries {
2190
2205
compacti = snapi - s .Cfg .SnapshotCatchUpEntries
2191
2206
}
2192
-
2193
2207
err := s .r .raftStorage .Compact (compacti )
2194
2208
if err != nil {
2195
2209
// the compaction was done asynchronously with the progress of raft.
@@ -2199,7 +2213,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
2199
2213
}
2200
2214
lg .Panic ("failed to compact" , zap .Error (err ))
2201
2215
}
2202
- lg .Info (
2216
+ lg .Debug (
2203
2217
"compacted Raft logs" ,
2204
2218
zap .Uint64 ("compact-index" , compacti ),
2205
2219
)
0 commit comments