Skip to content

Commit 40ee827

Browse files
author
Junlong Gao
committed
Passing all of 3B except split
1 parent 7c1b34e commit 40ee827

File tree

6 files changed

+50
-28
lines changed

6 files changed

+50
-28
lines changed

entrypoint.sh

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ while true; do
1212
iter=$((iter+1))
1313
echo "iteration $iter"
1414

15-
make project1 | tee out
16-
make project2 | tee out
17-
make project3a | tee out
15+
if ! go test ./kv/test_raftstore -run TestConfChangeRecover3B -v | tee output; then
16+
break
17+
fi
18+
#make project1 | tee out
19+
#make project2 | tee out
20+
#make project3a | tee out
1821
done

kv/raftstore/peer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ func (p *peer) sendRaftMessage(msg eraftpb.Message, trans Transport) error {
376376
if toPeer == nil {
377377
return fmt.Errorf("failed to lookup recipient peer %v in region %v", msg.To, p.regionId)
378378
}
379-
log.Debugf("%v, send raft msg %v from %v to %v", p.Tag, msg.MsgType, fromPeer, toPeer)
379+
log.Infof("%v, send raft msg %v from %v to %v", p.Tag, msg.MsgType, fromPeer, toPeer)
380380

381381
sendMsg.FromPeer = &fromPeer
382382
sendMsg.ToPeer = toPeer

kv/raftstore/peer_msg_handler.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,20 @@ func (d *peerMsgHandler) HandleRaftReady() {
5757
if err != nil {
5858
panic(err)
5959
}
60-
// if this node is just joining a group with a snapshot from leader, also restore
61-
// the region state including version/conf numbering
6260
if snapshotRet != nil {
63-
d.ctx.storeMeta.regionRanges.ReplaceOrInsert(&regionItem{region: snapshotRet.Region})
64-
// assert d.Region() == this one
61+
// if this node is just joining a group with a snapshot from leader, also restore
62+
// the region state including version/conf numbering
63+
d.SetRegion(snapshotRet.Region)
64+
65+
d.ctx.storeMeta.regions[d.regionId] = d.Region()
66+
d.ctx.storeMeta.regionRanges.ReplaceOrInsert(&regionItem{region: d.Region()})
67+
68+
d.peer.peerCache = map[uint64]*metapb.Peer{}
69+
for _, p := range d.Region().Peers {
70+
d.insertPeerCache(p)
71+
}
6572
}
73+
6674
applied := d.peerStorage.applyState.AppliedIndex
6775
// send messages here:
6876
for _, m := range rd.Messages {
@@ -77,6 +85,10 @@ func (d *peerMsgHandler) HandleRaftReady() {
7785
// apply the committed entries to the state machine
7886
kvWB := engine_util.WriteBatch{}
7987
for _, m := range rd.CommittedEntries {
88+
if m.Index <= d.peerStorage.applyState.AppliedIndex {
89+
log.Infof("Node %v see stale index %v <= %v", d.PeerId(), m.Index, d.peerStorage.applyState.AppliedIndex)
90+
continue
91+
}
8092
log.Infof("%v applying index %v, region %v", d.Tag, m.Index, d.Region())
8193
if m.EntryType == pb.EntryType_EntryConfChange {
8294
var cc pb.ConfChange
@@ -121,7 +133,7 @@ func (d *peerMsgHandler) HandleRaftReady() {
121133
}
122134
}
123135
if idx < 0 {
124-
panic(idx)
136+
log.Panicf("%v:%v, looking for %v", d.Tag, d.Region(), cc.NodeId)
125137
}
126138
if cc.NodeId == d.PeerId() {
127139
// Don't increase the conf number here. Otherwise when getting restarted,
@@ -152,6 +164,9 @@ func (d *peerMsgHandler) HandleRaftReady() {
152164
}
153165
}
154166

167+
// finally, advance applied state
168+
d.peerStorage.applyState.AppliedIndex = m.Index
169+
kvWB.SetMeta(meta.ApplyStateKey(d.Region().Id), d.peerStorage.applyState)
155170
} else if len(m.Data) > 0 {
156171
var r raft_cmdpb.RaftCmdRequest
157172
err := r.Unmarshal(m.Data)
@@ -161,10 +176,6 @@ func (d *peerMsgHandler) HandleRaftReady() {
161176

162177
log.Debug("Node %v processing index %v, term %v", d.PeerId(), m.Index, m.Term)
163178

164-
if m.Index <= d.peerStorage.applyState.AppliedIndex {
165-
log.Debug("Node %v see stale index %v <= %v", d.PeerId(), m.Index, d.peerStorage.applyState.AppliedIndex)
166-
continue
167-
}
168179
for _, cmd := range r.Requests {
169180
switch cmd.CmdType {
170181
case raft_cmdpb.CmdType_Put:
@@ -422,6 +433,7 @@ func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *
422433
case raft_cmdpb.AdminCmdType_ChangePeer:
423434
// There can be at most one pending peer adding/removing, so we check if this is done.
424435
changeReq := msg.AdminRequest.ChangePeer
436+
log.Infof("Issuing conf change request: %v", changeReq)
425437
switch changeReq.ChangeType {
426438
case pb.ConfChangeType_AddNode:
427439
if d.RaftGroup.Raft.IsMember(changeReq.Peer.Id) {
@@ -447,7 +459,6 @@ func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *
447459
}
448460
cb.Done(resp)
449461
case pb.ConfChangeType_RemoveNode:
450-
log.Infof("Getting remove node request: %v", changeReq)
451462
if !d.RaftGroup.Raft.IsMember(changeReq.Peer.Id) {
452463
// done
453464
log.Infof("Done with node request :%v", changeReq)

kv/raftstore/peer_storage.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,11 @@ func (ps *PeerStorage) Snapshot() (eraftpb.Snapshot, error) {
167167
return snapshot, raft.ErrSnapshotTemporarilyUnavailable
168168
}
169169
ps.snapState.StateType = snap.SnapState_Relax
170-
if snapshot.GetMetadata() != nil && ps.validateSnap(&snapshot) {
171-
return snapshot, nil
170+
if snapshot.GetMetadata() != nil {
171+
ps.snapTriedCnt = 0
172+
if ps.validateSnap(&snapshot) {
173+
return snapshot, nil
174+
}
172175
} else {
173176
log.Warnf("%s failed to try generating snapshot, times: %d", ps.Tag, ps.snapTriedCnt)
174177
}
@@ -358,7 +361,6 @@ func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_ut
358361
}
359362

360363
oldRegion := ps.Region()
361-
ps.region = snapData.Region
362364

363365
err := ps.clearMeta(raftWB, kvWB)
364366
if err != nil {
@@ -367,11 +369,11 @@ func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_ut
367369
done := make(chan bool)
368370
ps.snapState = snap.SnapState{StateType: snap.SnapState_Applying}
369371
ps.regionSched <- &runner.RegionTaskApply{
370-
RegionId: ps.Region().Id,
372+
RegionId: snapData.Region.Id,
371373
Notifier: done,
372374
SnapMeta: snapshot.Metadata,
373-
StartKey: ps.Region().StartKey,
374-
EndKey: ps.Region().EndKey,
375+
StartKey: snapData.Region.StartKey,
376+
EndKey: snapData.Region.EndKey,
375377
}
376378

377379
log.Infof("%v applying snapshot %v %v", ps.Tag, snapshot.Metadata.Index, snapshot.Metadata.Term)
@@ -398,11 +400,11 @@ func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_ut
398400
if term != snapshot.Metadata.Term {
399401
panic(term)
400402
}
401-
log.Infof("%v after applying snapshot, got %v %v %v", ps.Tag, snapshot.Metadata.Index, term, ps.Region())
403+
log.Infof("%v after applying snapshot, got %v %v %v", ps.Tag, snapshot.Metadata.Index, term, snapData.Region)
402404
// ps.clearExtraData()
403405
return &ApplySnapResult{
404406
PrevRegion: oldRegion,
405-
Region: ps.Region(),
407+
Region: snapData.Region,
406408
}, nil
407409
}
408410

kv/test_raftstore/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ func MustSamePeers(left *metapb.Region, right *metapb.Region) {
539539
}
540540
for _, p := range left.GetPeers() {
541541
if FindPeer(right, p.GetStoreId()) == nil {
542-
panic("not found the peer")
542+
log.Infof("not found the peer l:%v vs r:%v", left, right)
543543
}
544544
}
545545
}

raft/raft.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (r *Raft) sendAppend(to uint64) bool {
239239
if err != nil {
240240
panic(err)
241241
}
242-
log.Debugf("Leader %v sending snapshot %v to %v", r.id, r.RaftLog.committed, to)
242+
log.Debugf("Leader %v sending snapshot %v to %v", r.id, snap.Metadata, to)
243243
r.send(pb.Message{
244244
MsgType: pb.MessageType_MsgSnapshot,
245245
To: to,
@@ -292,12 +292,16 @@ func (r *Raft) handleAppendEntries(m pb.Message) {
292292

293293
r.electionElapsed = 0
294294
if !r.RaftLog.CheckMatch(m.Index, m.LogTerm) {
295+
// XXX use committed as a reply can be further optimized by calling r.RaftLog.Compare
296+
// and get the first conflict index. cf. the dissertation.
295297
r.send(pb.Message{
296298
To: m.From,
297299
MsgType: pb.MessageType_MsgAppendResponse,
298300
Reject: true,
301+
302+
Index: r.RaftLog.committed, // hint a reply
299303
})
300-
log.Infof("Node %v reject append (%v,%v)", r.id, m.Index, m.Term)
304+
log.Infof("Node %v reject append (%v,%v), hint %v", r.id, m.Index, m.LogTerm, r.RaftLog.committed)
301305
} else {
302306
if len(m.Entries) == 0 {
303307
r.send(pb.Message{
@@ -362,6 +366,8 @@ func (r *Raft) tryUpdateCommitIdx(newCommitIdx uint64) bool {
362366
log.Panicf("Term goes backwards: %v -> %v", term, r.Term)
363367
}
364368
if term != r.Term {
369+
// assert term < r.Term
370+
// Leader cannot use an older term entry to advance commit index. cf. Figure 8.
365371
return false
366372
}
367373

@@ -388,10 +394,10 @@ func (r *Raft) handleAppendEntriesResp(m pb.Message) {
388394
}
389395

390396
if m.Reject {
391-
// If the response of append is duplicated (or leader sent 2 append entries get 2 same responses),
392-
// don't blindly decrease the next index.
393397
if r.Prs[m.From].Next > 0 {
394-
r.Prs[m.From].Next = max(r.Prs[m.From].Next-1, r.Prs[m.From].Match+1)
398+
// If the response of append is duplicated (or leader sent 2 append entries get 2 same responses),
399+
// don't blindly decrease the next index.
400+
r.Prs[m.From].Next = m.Index + 1
395401
}
396402
r.sendAppend(m.From)
397403
} else {

0 commit comments

Comments
 (0)