forked from tikv/raft-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsistency.rs
More file actions
71 lines (65 loc) · 2.54 KB
/
consistency.rs
File metadata and controls
71 lines (65 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
use hashbrown::HashMap;
use crate::Result;
use crate::file_pipe_log::ReplayMachine;
use crate::log_batch::{LogItemBatch, LogItemContent};
use crate::pipe_log::{FileId, LogQueue};
/// A `ConsistencyChecker` scans for log entry holes in a log queue. It will
/// return a list of corrupted raft groups along with their last valid log
/// index.
#[derive(Default)]
pub struct ConsistencyChecker {
// Mappings from raft group id to (first-index, last-index).
raft_groups: HashMap<u64, (u64, u64)>,
// Mappings from raft group id to last valid index.
corrupted: HashMap<u64, u64>,
}
impl ConsistencyChecker {
pub fn finish(self) -> HashMap<u64, u64> {
self.corrupted
}
}
impl ReplayMachine for ConsistencyChecker {
fn replay(&mut self, item_batch: LogItemBatch, _file_id: FileId) -> Result<()> {
for item in item_batch.iter() {
if let LogItemContent::EntryIndexes(ents) = &item.content {
if !ents.0.is_empty() {
let incoming_first_index = ents.0.first().unwrap().index;
let incoming_last_index = ents.0.last().unwrap().index;
let index_range = self
.raft_groups
.entry(item.raft_group_id)
.or_insert((incoming_first_index, incoming_last_index));
if index_range.1 + 1 < incoming_first_index {
self.corrupted
.entry(item.raft_group_id)
.or_insert(index_range.1);
}
index_range.1 = incoming_last_index;
}
}
}
Ok(())
}
fn merge(&mut self, mut rhs: Self, _queue: LogQueue) -> Result<()> {
let mut corrupted_between_rhs: HashMap<u64, u64> = HashMap::default();
for (id, (first, last)) in rhs.raft_groups.drain() {
self.raft_groups
.entry(id)
.and_modify(|(_, l)| {
if *l + 1 < first {
corrupted_between_rhs.insert(id, *l);
}
*l = last;
})
.or_insert((first, last));
}
for (id, last_index) in corrupted_between_rhs.drain() {
self.corrupted.entry(id).or_insert(last_index);
}
for (id, last_index) in rhs.corrupted.drain() {
self.corrupted.entry(id).or_insert(last_index);
}
Ok(())
}
}