@@ -192,67 +192,135 @@ func (r *raftNode) tick() {
192192// start prepares and starts raftNode in a new goroutine. It is no longer safe
193193// to modify the fields after it has been started.
194194func (r * raftNode ) start (rh * raftReadyHandler ) {
195+ type hardStateMessage struct {
196+ message raftpb.Message
197+ hardState raftpb.HardState
198+ }
199+ toAppend := make (chan hardStateMessage , 1 )
200+ toApply := make (chan raftpb.Message , 1 )
201+
202+ go func () {
203+ for {
204+ select {
205+ case messageHardState := <- toAppend :
206+ m := messageHardState .message
207+
208+ // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
209+ // ensure that recovery after a snapshot restore is possible.
210+ if m .Snapshot != nil {
211+ r .lg .Info ("Save snap" , zap .Any ("snap" , m .Snapshot ))
212+ // gofail: var raftBeforeSaveSnap struct{}
213+ if err := r .storage .SaveSnap (* m .Snapshot ); err != nil {
214+ r .lg .Fatal ("failed to save Raft snapshot" , zap .Error (err ))
215+ }
216+ // gofail: var raftAfterSaveSnap struct{}
217+ }
218+
219+ // gofail: var raftBeforeSave struct{}
220+ r .lg .Info ("Save entries" , zap .Any ("entries" , m .Entries ))
221+ if err := r .storage .Save (messageHardState .hardState , m .Entries ); err != nil {
222+ r .lg .Fatal ("failed to save Raft hard state and entries" , zap .Error (err ))
223+ }
224+ //if !raft.IsEmptyHardState(hardState) {
225+ // proposalsCommitted.Set(float64(hardState.Commit))
226+ //}
227+ // gofail: var raftAfterSave struct{}
228+
229+ if m .Snapshot != nil {
230+ r .lg .Info ("Sync wal" )
231+ // Force WAL to fsync its hard state before Release() releases
232+ // old data from the WAL. Otherwise could get an error like:
233+ // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
234+ // See https://github.com/etcd-io/etcd/issues/10219 for more details.
235+ if err := r .storage .Sync (); err != nil {
236+ r .lg .Fatal ("failed to sync Raft snapshot" , zap .Error (err ))
237+ }
238+
239+ // etcdserver now claim the snapshot has been persisted onto the disk
240+ // gofail: var raftBeforeApplySnap struct{}
241+ r .raftStorage .ApplySnapshot (* m .Snapshot )
242+ r .lg .Info ("applied incoming Raft snapshot" , zap .Uint64 ("snapshot-index" , m .Snapshot .Metadata .Index ))
243+ // gofail: var raftAfterApplySnap struct{}
244+
245+ if err := r .storage .Release (* m .Snapshot ); err != nil {
246+ r .lg .Fatal ("failed to release Raft wal" , zap .Error (err ))
247+ }
248+ // gofail: var raftAfterWALRelease struct{}
249+ }
250+ r .lg .Info ("Append entries" , zap .Any ("entries" , m .Entries ))
251+ r .raftStorage .Append (m .Entries )
252+ r .lg .Info ("Append sent responses" , zap .Any ("entries" , m .Responses ))
253+ r .transport .Send (m .Responses )
254+ case <- r .stopped :
255+ return
256+ }
257+ }
258+ }()
259+
260+ go func () {
261+ for {
262+ select {
263+ case m := <- toApply :
264+ ap , stopped := r .handleApply (rh , m .Entries , m .Snapshot )
265+ if stopped {
266+ return
267+ }
268+ ap .NotifyRaftLogPersisted ()
269+ confChanged := includesConfigChange (m .Entries )
270+ if confChanged {
271+ if ap .WaitForApply (r .stopped ) {
272+ return
273+ }
274+ }
275+ // gofail: var raftBeforeFollowerSend struct{}
276+ r .transport .Send (m .Responses )
277+ // gofail: var raftBeforeAdvance struct{}
278+ if confChanged {
279+ ap .NotifyRaftAdvanced ()
280+ }
281+ case <- r .stopped :
282+ return
283+ }
284+ }
285+ }()
195286
196287 go func () {
197288 defer r .onStop ()
198- islead := false
199289
200290 for {
201291 select {
202292 case <- r .ticker .C :
203293 r .tick ()
204294 case rd := <- r .Ready ():
295+ r .lg .Info ("Ready" )
205296 if rd .SoftState != nil {
206- r .lg .Info ("Handle soft state " , zap .Any ("soft-state" , rd .SoftState ))
207- islead = r .handleSoftState (rh , rd .SoftState , rd .RaftState )
297+ r .lg .Info ("SoftState " , zap .Any ("soft-state" , rd .SoftState ))
298+ r .handleSoftState (rh , rd .SoftState , rd .RaftState )
208299 }
209300 if len (rd .ReadStates ) != 0 {
210- r .lg .Info ("Handle ready states " , zap .Any ("ready-state" , rd .ReadStates ))
301+ r .lg .Info ("ReadyStates " , zap .Any ("ready-state" , rd .ReadStates ))
211302 if r .handleReadyStates (rd .ReadStates ) {
212303 return
213304 }
214305 }
215-
216- r .lg .Info ("Apply entries" , zap .Any ("entries" , rd .CommittedEntries ))
217- ap , stopped := r .handleApply (rh , rd .CommittedEntries , rd .Snapshot )
218- if stopped {
219- return
220- }
221-
222- // the leader can write to its disk in parallel with replicating to the followers and then
223- // writing to their disks.
224- // For more details, check raft thesis 10.2.1
225- if islead {
226- r .lg .Info ("Send messages" , zap .Any ("messages" , rd .Messages ))
227- // gofail: var raftBeforeLeaderSend struct{}
228- r .transport .Send (r .processMessages (rd .Messages ))
229- }
230- r .lg .Info ("Handle hard state" , zap .Any ("hard-state" , rd .HardState ))
231- r .handleHardStateAndSnapshot (rd .Snapshot , rd .HardState , rd .Entries , ap )
232- r .lg .Info ("Append entries" , zap .Any ("entries" , rd .Entries ))
233- r .raftStorage .Append (rd .Entries )
234- var processedMessages []raftpb.Message
235- if ! islead {
236- // finish processing incoming messages before we signal notifyc chan
237- processedMessages = r .processMessages (rd .Messages )
306+ if ! raft .IsEmptyHardState (rd .HardState ) {
307+ r .lg .Info ("HardState" , zap .Any ("hard-state" , rd .HardState ))
238308 }
239- ap .NotifyRaftLogPersisted ()
240- confChanged := includesConfigChange (rd .CommittedEntries )
241- if ! islead {
242- if confChanged {
243- if ap .WaitForApply (r .stopped ) {
244- return
309+ for _ , m := range rd .Messages {
310+ switch m .To {
311+ case raft .LocalApplyThread :
312+ r .lg .Info ("Message apply" , zap .Any ("message" , m ))
313+ toApply <- m
314+ case raft .LocalAppendThread :
315+ r .lg .Info ("Message append" , zap .Any ("message" , m ))
316+ toAppend <- hardStateMessage {
317+ hardState : rd .HardState ,
318+ message : m ,
245319 }
320+ default :
321+ r .lg .Info ("Message sent" , zap .Any ("message" , m ))
322+ r .transport .Send ([]raftpb.Message {m })
246323 }
247- // gofail: var raftBeforeFollowerSend struct{}
248- r .lg .Info ("Send messages" , zap .Any ("messages" , rd .Messages ))
249- r .transport .Send (processedMessages )
250- }
251- // gofail: var raftBeforeAdvance struct{}
252- r .lg .Info ("Advance" )
253- r .Advance ()
254- if confChanged {
255- ap .NotifyRaftAdvanced ()
256324 }
257325 case <- r .stopped :
258326 return
@@ -297,15 +365,17 @@ func (r *raftNode) handleReadyStates(rs []raft.ReadState) bool {
297365 return false
298366}
299367
300- func (r * raftNode ) handleApply (rh * raftReadyHandler , committedEntries []raftpb.Entry , snapshot raftpb.Snapshot ) (* toApply , bool ) {
368+ func (r * raftNode ) handleApply (rh * raftReadyHandler , committedEntries []raftpb.Entry , snapshot * raftpb.Snapshot ) (* toApply , bool ) {
301369 notifyc := make (chan struct {}, 1 )
302370 raftAdvancedC := make (chan struct {}, 1 )
303371 ap := toApply {
304372 entries : committedEntries ,
305- snapshot : snapshot ,
306373 notifyc : notifyc ,
307374 raftAdvancedC : raftAdvancedC ,
308375 }
376+ if snapshot != nil {
377+ ap .snapshot = * snapshot
378+ }
309379
310380 var ci uint64
311381 if len (ap .entries ) != 0 {
@@ -325,51 +395,10 @@ func (r *raftNode) handleApply(rh *raftReadyHandler, committedEntries []raftpb.E
325395 return & ap , false
326396}
327397
328- func (r * raftNode ) handleHardStateAndSnapshot (snapshot raftpb.Snapshot , hardState raftpb.HardState , entries []raftpb.Entry , ap * toApply ) {
329- // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
330- // ensure that recovery after a snapshot restore is possible.
331- if ! raft .IsEmptySnap (snapshot ) {
332- // gofail: var raftBeforeSaveSnap struct{}
333- if err := r .storage .SaveSnap (snapshot ); err != nil {
334- r .lg .Fatal ("failed to save Raft snapshot" , zap .Error (err ))
335- }
336- // gofail: var raftAfterSaveSnap struct{}
337- }
338-
339- // gofail: var raftBeforeSave struct{}
340- if err := r .storage .Save (hardState , entries ); err != nil {
341- r .lg .Fatal ("failed to save Raft hard state and entries" , zap .Error (err ))
342- }
343- if ! raft .IsEmptyHardState (hardState ) {
344- proposalsCommitted .Set (float64 (hardState .Commit ))
345- }
346- // gofail: var raftAfterSave struct{}
347-
348- if ! raft .IsEmptySnap (snapshot ) {
349- // Force WAL to fsync its hard state before Release() releases
350- // old data from the WAL. Otherwise could get an error like:
351- // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
352- // See https://github.com/etcd-io/etcd/issues/10219 for more details.
353- if err := r .storage .Sync (); err != nil {
354- r .lg .Fatal ("failed to sync Raft snapshot" , zap .Error (err ))
355- }
356-
357- // etcdserver now claim the snapshot has been persisted onto the disk
358- ap .NotifySnapshotPersisted ()
359- r .handleSnapshot (snapshot )
360- }
398+ func (r * raftNode ) handleHardStateAndSnapshot (snapshot * raftpb.Snapshot , hardState raftpb.HardState , entries []raftpb.Entry ) {
361399}
362400
363401func (r * raftNode ) handleSnapshot (snapshot raftpb.Snapshot ) {
364- // gofail: var raftBeforeApplySnap struct{}
365- r .raftStorage .ApplySnapshot (snapshot )
366- r .lg .Info ("applied incoming Raft snapshot" , zap .Uint64 ("snapshot-index" , snapshot .Metadata .Index ))
367- // gofail: var raftAfterApplySnap struct{}
368-
369- if err := r .storage .Release (snapshot ); err != nil {
370- r .lg .Fatal ("failed to release Raft wal" , zap .Error (err ))
371- }
372- // gofail: var raftAfterWALRelease struct{}
373402}
374403
375404func includesConfigChange (entries []raftpb.Entry ) bool {
@@ -432,13 +461,7 @@ func (r *raftNode) apply() chan toApply {
432461}
433462
434463func (r * raftNode ) stop () {
435- select {
436- case r .stopped <- struct {}{}:
437- // Not already stopped, so trigger it
438- case <- r .done :
439- // Has already been stopped - no need to do anything
440- return
441- }
464+ close (r .stopped )
442465 // Block until the stop has been acknowledged by start()
443466 <- r .done
444467}
0 commit comments