@@ -202,67 +202,135 @@ func (r *raftNode) getLatestTickTs() time.Time {
202
202
// start prepares and starts raftNode in a new goroutine. It is no longer safe
203
203
// to modify the fields after it has been started.
204
204
func (r * raftNode ) start (rh * raftReadyHandler ) {
205
+ type hardStateMessage struct {
206
+ message raftpb.Message
207
+ hardState raftpb.HardState
208
+ }
209
+ toAppend := make (chan hardStateMessage , 1 )
210
+ toApply := make (chan raftpb.Message , 1 )
211
+
212
+ go func () {
213
+ for {
214
+ select {
215
+ case messageHardState := <- toAppend :
216
+ m := messageHardState .message
217
+
218
+ // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
219
+ // ensure that recovery after a snapshot restore is possible.
220
+ if m .Snapshot != nil {
221
+ r .lg .Info ("Save snap" , zap .Any ("snap" , m .Snapshot ))
222
+ // gofail: var raftBeforeSaveSnap struct{}
223
+ if err := r .storage .SaveSnap (* m .Snapshot ); err != nil {
224
+ r .lg .Fatal ("failed to save Raft snapshot" , zap .Error (err ))
225
+ }
226
+ // gofail: var raftAfterSaveSnap struct{}
227
+ }
228
+
229
+ // gofail: var raftBeforeSave struct{}
230
+ r .lg .Info ("Save entries" , zap .Any ("entries" , m .Entries ))
231
+ if err := r .storage .Save (messageHardState .hardState , m .Entries ); err != nil {
232
+ r .lg .Fatal ("failed to save Raft hard state and entries" , zap .Error (err ))
233
+ }
234
+ //if !raft.IsEmptyHardState(hardState) {
235
+ // proposalsCommitted.Set(float64(hardState.Commit))
236
+ //}
237
+ // gofail: var raftAfterSave struct{}
238
+
239
+ if m .Snapshot != nil {
240
+ r .lg .Info ("Sync wal" )
241
+ // Force WAL to fsync its hard state before Release() releases
242
+ // old data from the WAL. Otherwise could get an error like:
243
+ // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
244
+ // See https://github.com/etcd-io/etcd/issues/10219 for more details.
245
+ if err := r .storage .Sync (); err != nil {
246
+ r .lg .Fatal ("failed to sync Raft snapshot" , zap .Error (err ))
247
+ }
248
+
249
+ // etcdserver now claim the snapshot has been persisted onto the disk
250
+ // gofail: var raftBeforeApplySnap struct{}
251
+ r .raftStorage .ApplySnapshot (* m .Snapshot )
252
+ r .lg .Info ("applied incoming Raft snapshot" , zap .Uint64 ("snapshot-index" , m .Snapshot .Metadata .Index ))
253
+ // gofail: var raftAfterApplySnap struct{}
254
+
255
+ if err := r .storage .Release (* m .Snapshot ); err != nil {
256
+ r .lg .Fatal ("failed to release Raft wal" , zap .Error (err ))
257
+ }
258
+ // gofail: var raftAfterWALRelease struct{}
259
+ }
260
+ r .lg .Info ("Append entries" , zap .Any ("entries" , m .Entries ))
261
+ r .raftStorage .Append (m .Entries )
262
+ r .lg .Info ("Append sent responses" , zap .Any ("entries" , m .Responses ))
263
+ r .transport .Send (m .Responses )
264
+ case <- r .stopped :
265
+ return
266
+ }
267
+ }
268
+ }()
269
+
270
+ go func () {
271
+ for {
272
+ select {
273
+ case m := <- toApply :
274
+ ap , stopped := r .handleApply (rh , m .Entries , m .Snapshot )
275
+ if stopped {
276
+ return
277
+ }
278
+ ap .NotifyRaftLogPersisted ()
279
+ confChanged := includesConfigChange (m .Entries )
280
+ if confChanged {
281
+ if ap .WaitForApply (r .stopped ) {
282
+ return
283
+ }
284
+ }
285
+ // gofail: var raftBeforeFollowerSend struct{}
286
+ r .transport .Send (m .Responses )
287
+ // gofail: var raftBeforeAdvance struct{}
288
+ if confChanged {
289
+ ap .NotifyRaftAdvanced ()
290
+ }
291
+ case <- r .stopped :
292
+ return
293
+ }
294
+ }
295
+ }()
205
296
206
297
go func () {
207
298
defer r .onStop ()
208
- islead := false
209
299
210
300
for {
211
301
select {
212
302
case <- r .ticker .C :
213
303
r .tick ()
214
304
case rd := <- r .Ready ():
305
+ r .lg .Info ("Ready" )
215
306
if rd .SoftState != nil {
216
- r .lg .Info ("Handle soft state " , zap .Any ("soft-state" , rd .SoftState ))
217
- islead = r .handleSoftState (rh , rd .SoftState , rd .RaftState )
307
+ r .lg .Info ("SoftState " , zap .Any ("soft-state" , rd .SoftState ))
308
+ r .handleSoftState (rh , rd .SoftState , rd .RaftState )
218
309
}
219
310
if len (rd .ReadStates ) != 0 {
220
- r .lg .Info ("Handle ready states " , zap .Any ("ready-state" , rd .ReadStates ))
311
+ r .lg .Info ("ReadyStates " , zap .Any ("ready-state" , rd .ReadStates ))
221
312
if r .handleReadyStates (rd .ReadStates ) {
222
313
return
223
314
}
224
315
}
225
-
226
- r .lg .Info ("Apply entries" , zap .Any ("entries" , rd .CommittedEntries ))
227
- ap , stopped := r .handleApply (rh , rd .CommittedEntries , rd .Snapshot )
228
- if stopped {
229
- return
230
- }
231
-
232
- // the leader can write to its disk in parallel with replicating to the followers and then
233
- // writing to their disks.
234
- // For more details, check raft thesis 10.2.1
235
- if islead {
236
- r .lg .Info ("Send messages" , zap .Any ("messages" , rd .Messages ))
237
- // gofail: var raftBeforeLeaderSend struct{}
238
- r .transport .Send (r .processMessages (rd .Messages ))
239
- }
240
- r .lg .Info ("Handle hard state" , zap .Any ("hard-state" , rd .HardState ))
241
- r .handleHardStateAndSnapshot (rd .Snapshot , rd .HardState , rd .Entries , ap )
242
- r .lg .Info ("Append entries" , zap .Any ("entries" , rd .Entries ))
243
- r .raftStorage .Append (rd .Entries )
244
- var processedMessages []raftpb.Message
245
- if ! islead {
246
- // finish processing incoming messages before we signal notifyc chan
247
- processedMessages = r .processMessages (rd .Messages )
316
+ if ! raft .IsEmptyHardState (rd .HardState ) {
317
+ r .lg .Info ("HardState" , zap .Any ("hard-state" , rd .HardState ))
248
318
}
249
- ap .NotifyRaftLogPersisted ()
250
- confChanged := includesConfigChange (rd .CommittedEntries )
251
- if ! islead {
252
- if confChanged {
253
- if ap .WaitForApply (r .stopped ) {
254
- return
319
+ for _ , m := range rd .Messages {
320
+ switch m .To {
321
+ case raft .LocalApplyThread :
322
+ r .lg .Info ("Message apply" , zap .Any ("message" , m ))
323
+ toApply <- m
324
+ case raft .LocalAppendThread :
325
+ r .lg .Info ("Message append" , zap .Any ("message" , m ))
326
+ toAppend <- hardStateMessage {
327
+ hardState : rd .HardState ,
328
+ message : m ,
255
329
}
330
+ default :
331
+ r .lg .Info ("Message sent" , zap .Any ("message" , m ))
332
+ r .transport .Send ([]raftpb.Message {m })
256
333
}
257
- // gofail: var raftBeforeFollowerSend struct{}
258
- r .lg .Info ("Send messages" , zap .Any ("messages" , rd .Messages ))
259
- r .transport .Send (processedMessages )
260
- }
261
- // gofail: var raftBeforeAdvance struct{}
262
- r .lg .Info ("Advance" )
263
- r .Advance ()
264
- if confChanged {
265
- ap .NotifyRaftAdvanced ()
266
334
}
267
335
case <- r .stopped :
268
336
return
@@ -307,15 +375,17 @@ func (r *raftNode) handleReadyStates(rs []raft.ReadState) bool {
307
375
return false
308
376
}
309
377
310
- func (r * raftNode ) handleApply (rh * raftReadyHandler , committedEntries []raftpb.Entry , snapshot raftpb.Snapshot ) (* toApply , bool ) {
378
+ func (r * raftNode ) handleApply (rh * raftReadyHandler , committedEntries []raftpb.Entry , snapshot * raftpb.Snapshot ) (* toApply , bool ) {
311
379
notifyc := make (chan struct {}, 1 )
312
380
raftAdvancedC := make (chan struct {}, 1 )
313
381
ap := toApply {
314
382
entries : committedEntries ,
315
- snapshot : snapshot ,
316
383
notifyc : notifyc ,
317
384
raftAdvancedC : raftAdvancedC ,
318
385
}
386
+ if snapshot != nil {
387
+ ap .snapshot = * snapshot
388
+ }
319
389
320
390
var ci uint64
321
391
if len (ap .entries ) != 0 {
@@ -335,51 +405,10 @@ func (r *raftNode) handleApply(rh *raftReadyHandler, committedEntries []raftpb.E
335
405
return & ap , false
336
406
}
337
407
338
- func (r * raftNode ) handleHardStateAndSnapshot (snapshot raftpb.Snapshot , hardState raftpb.HardState , entries []raftpb.Entry , ap * toApply ) {
339
- // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
340
- // ensure that recovery after a snapshot restore is possible.
341
- if ! raft .IsEmptySnap (snapshot ) {
342
- // gofail: var raftBeforeSaveSnap struct{}
343
- if err := r .storage .SaveSnap (snapshot ); err != nil {
344
- r .lg .Fatal ("failed to save Raft snapshot" , zap .Error (err ))
345
- }
346
- // gofail: var raftAfterSaveSnap struct{}
347
- }
348
-
349
- // gofail: var raftBeforeSave struct{}
350
- if err := r .storage .Save (hardState , entries ); err != nil {
351
- r .lg .Fatal ("failed to save Raft hard state and entries" , zap .Error (err ))
352
- }
353
- if ! raft .IsEmptyHardState (hardState ) {
354
- proposalsCommitted .Set (float64 (hardState .Commit ))
355
- }
356
- // gofail: var raftAfterSave struct{}
357
-
358
- if ! raft .IsEmptySnap (snapshot ) {
359
- // Force WAL to fsync its hard state before Release() releases
360
- // old data from the WAL. Otherwise could get an error like:
361
- // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
362
- // See https://github.com/etcd-io/etcd/issues/10219 for more details.
363
- if err := r .storage .Sync (); err != nil {
364
- r .lg .Fatal ("failed to sync Raft snapshot" , zap .Error (err ))
365
- }
366
-
367
- // etcdserver now claim the snapshot has been persisted onto the disk
368
- ap .NotifySnapshotPersisted ()
369
- r .handleSnapshot (snapshot )
370
- }
408
+ func (r * raftNode ) handleHardStateAndSnapshot (snapshot * raftpb.Snapshot , hardState raftpb.HardState , entries []raftpb.Entry ) {
371
409
}
372
410
373
411
func (r * raftNode ) handleSnapshot (snapshot raftpb.Snapshot ) {
374
- // gofail: var raftBeforeApplySnap struct{}
375
- r .raftStorage .ApplySnapshot (snapshot )
376
- r .lg .Info ("applied incoming Raft snapshot" , zap .Uint64 ("snapshot-index" , snapshot .Metadata .Index ))
377
- // gofail: var raftAfterApplySnap struct{}
378
-
379
- if err := r .storage .Release (snapshot ); err != nil {
380
- r .lg .Fatal ("failed to release Raft wal" , zap .Error (err ))
381
- }
382
- // gofail: var raftAfterWALRelease struct{}
383
412
}
384
413
385
414
func includesConfigChange (entries []raftpb.Entry ) bool {
@@ -442,13 +471,7 @@ func (r *raftNode) apply() chan toApply {
442
471
}
443
472
444
473
func (r * raftNode ) stop () {
445
- select {
446
- case r .stopped <- struct {}{}:
447
- // Not already stopped, so trigger it
448
- case <- r .done :
449
- // Has already been stopped - no need to do anything
450
- return
451
- }
474
+ close (r .stopped )
452
475
// Block until the stop has been acknowledged by start()
453
476
<- r .done
454
477
}
0 commit comments