@@ -306,31 +306,35 @@ func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatc
306306
307307// Append the given entries to the raft log and update ps.raftState also delete log entries that will
308308// never be committed
309- func (ps * PeerStorage ) Append (entries []eraftpb.Entry , raftWB * engine_util.WriteBatch ) error {
309+ func (ps * PeerStorage ) Append (nodeId uint64 , entries []eraftpb.Entry , raftWB * engine_util.WriteBatch ) error {
310310 // Your Code Here (2B).
311311 if len (entries ) == 0 {
312312 return nil
313313 }
314314
315315 for _ , e := range entries {
316+ log .Infof ("Node %v appending log %v %v" , nodeId , e .Index , e .Term )
316317 raftWB .SetMeta (meta .RaftLogKey (ps .region .Id , e .Index ), & eraftpb.Entry {
317318 EntryType : eraftpb .EntryType_EntryNormal ,
318319 Index : e .Index ,
319320 Term : e .Term ,
320321 Data : e .Data ,
321322 })
322- }
323323
324- lastIndex := entries [len (entries )- 1 ].Index
325- lastTerm := entries [len (entries )- 1 ].Term
326- ps .raftState .LastIndex = lastIndex
327- ps .raftState .LastTerm = lastTerm
324+ if e .Index < ps .raftState .LastIndex {
325+ // This can happen if snapshot applied before calling this extended the last index state further
326+ continue
327+ }
328+ ps .raftState .LastIndex = e .Index
329+ ps .raftState .LastTerm = e .Term
330+ raftWB .SetMeta (meta .RaftStateKey (ps .region .Id ), ps .raftState )
331+ }
328332
329333 // prune more entries after last index
330334 txn := ps .Engines .Raft .NewTransaction (false )
331335 iter := txn .NewIterator (badger .DefaultIteratorOptions )
332336 defer iter .Close ()
333- for iter .Seek (meta .RaftLogKey (ps .region .Id , lastIndex + 1 )); iter .Valid () && ! meta .IsRaftStateKey (iter .Item ().Key ()); iter .Next () {
337+ for iter .Seek (meta .RaftLogKey (ps .region .Id , ps . raftState . LastIndex + 1 )); iter .Valid () && ! meta .IsRaftStateKey (iter .Item ().Key ()); iter .Next () {
334338 logIdx , err := meta .RaftLogIndex (iter .Item ().Key ())
335339 if err != nil {
336340 log .Panic (err )
@@ -344,54 +348,93 @@ func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.Write
344348// Apply the peer with given snapshot
345349func (ps * PeerStorage ) ApplySnapshot (snapshot * eraftpb.Snapshot , kvWB * engine_util.WriteBatch , raftWB * engine_util.WriteBatch ) (* ApplySnapResult , error ) {
346350 log .Infof ("%v begin to apply snapshot" , ps .Tag )
347- snapData := new (rspb.RaftSnapshotData )
348- if err := snapData .Unmarshal (snapshot .Data ); err != nil {
349- return nil , err
350- }
351351
352352 // Hint: things need to do here including: update peer storage state like raftState and applyState, etc,
353353 // and send RegionTaskApply task to region worker through ps.regionSched, also remember call ps.clearMeta
354354 // and ps.clearExtraData to delete stale data
355355 // Your Code Here (2C).
356+ if snapshot .Metadata .Index <= ps .applyState .AppliedIndex {
357+ log .Infof ("%v stale snapshot %v <= %v" , ps .Tag , snapshot .Metadata .Index , ps .applyState .AppliedIndex )
358+ return nil , nil
359+ }
360+ done := make (chan bool )
361+ ps .snapState = snap.SnapState {StateType : snap .SnapState_Applying }
362+ ps .regionSched <- runner.RegionTaskApply {
363+ RegionId : ps .Region ().Id ,
364+ Notifier : done ,
365+ SnapMeta : snapshot .Metadata ,
366+ StartKey : ps .Region ().StartKey ,
367+ EndKey : ps .Region ().EndKey ,
368+ }
369+ <- done
370+
371+ ps .applyState .AppliedIndex = snapshot .Metadata .Index
372+ ps .applyState .TruncatedState = & rspb.RaftTruncatedState {
373+ Index : snapshot .Metadata .Index ,
374+ Term : snapshot .Metadata .Term ,
375+ XXX_NoUnkeyedLiteral : struct {}{},
376+ }
377+ kvWB .SetMeta (meta .ApplyStateKey (ps .Region ().Id ), ps .applyState )
378+
379+ if snapshot .Metadata .Index > ps .raftState .LastIndex {
380+ ps .raftState .LastIndex = snapshot .Metadata .Index
381+ ps .raftState .LastTerm = snapshot .Metadata .Term
382+ raftWB .SetMeta (meta .RaftStateKey (ps .Region ().Id ), ps .raftState )
383+ }
384+
385+ err := ps .clearMeta (raftWB , kvWB )
386+ if err != nil {
387+ panic (err )
388+ }
389+
390+ // ps.clearExtraData()
356391 return nil , nil
357392}
358393
359394// Save memory states to disk.
360395// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
361- func (ps * PeerStorage ) SaveReadyState (ready * raft.Ready ) (* ApplySnapResult , error ) {
396+ func (ps * PeerStorage ) SaveReadyState (nodeId uint64 , ready * raft.Ready ) (* ApplySnapResult , error ) {
362397 // Hint: you may call `Append()` and `ApplySnapshot()` in this function
363398 // Your Code Here (2B/2C).
364399 rd := * ready
365- wb := engine_util.WriteBatch {}
366-
367- err := ps .Append (rd .Entries , & wb )
368- if err != nil {
369- log .Panic (err )
400+ raftWb := engine_util.WriteBatch {}
401+ kvWB := engine_util.WriteBatch {}
402+ /*
403+ Apply snapshot first, then process the new entries to be persisted into the log.
404+ The snapshot will replace the entire log state and the kv state, bring the last applied and committed state
405+ forward.
406+ */
407+ var ret * ApplySnapResult
408+ if rd .Snapshot != nil {
409+ status , err := ps .ApplySnapshot (rd .Snapshot , & kvWB , & raftWb )
410+ if err != nil {
411+ panic (err )
412+ }
413+ ret = status
370414 }
371415
372- currentState , err := meta . GetRaftLocalState ( ps .Engines . Raft , ps . region . Id )
416+ err := ps .Append ( nodeId , rd . Entries , & raftWb )
373417 if err != nil {
374418 log .Panic (err )
375419 }
420+
376421 if rd .HardState .Term != 0 {
377- currentState .HardState .Term = rd .HardState .Term
378- currentState .HardState .Vote = rd .HardState .Vote
379- currentState .HardState .Commit = rd .HardState .Commit
380- }
381- if len (rd .Entries ) > 0 {
382- currentState .LastIndex = rd .Entries [len (rd .Entries )- 1 ].Index
383- currentState .LastTerm = rd .Entries [len (rd .Entries )- 1 ].Term
422+ ps .raftState .HardState .Term = rd .HardState .Term
423+ ps .raftState .HardState .Vote = rd .HardState .Vote
424+ ps .raftState .HardState .Commit = rd .HardState .Commit
425+ raftWb .SetMeta (meta .RaftStateKey (ps .region .Id ), ps .raftState )
384426 }
385427
386- wb .SetMeta (meta .RaftStateKey (ps .region .Id ), currentState )
387-
388- err = ps .Engines .WriteRaft (& wb )
428+ err = ps .Engines .WriteRaft (& raftWb )
429+ if err != nil {
430+ log .Panic (err )
431+ }
432+ err = ps .Engines .WriteKV (& kvWB )
389433 if err != nil {
390434 log .Panic (err )
391435 }
392436
393- // applied is for snapshot?
394- return & ApplySnapResult {}, nil
437+ return ret , nil
395438}
396439
397440func (ps * PeerStorage ) ClearData () {
0 commit comments