Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub struct Config {
/// The maximum raft log numbers that applied_index can be ahead of
/// persisted_index.
pub max_apply_unpersisted_log_limit: u64,
/// Number of Raft ticks between follower read index request retries.
pub raft_read_index_retry_interval_ticks: usize,
// follower will reject this follower request to avoid falling behind leader too far,
// when the read index is ahead of the sum between the applied index and
// follower_read_max_log_gap,
Expand Down Expand Up @@ -459,6 +461,7 @@ impl Default for Config {
raft_log_gc_count_limit: None,
raft_log_gc_size_limit: None,
max_apply_unpersisted_log_limit: 1024,
raft_read_index_retry_interval_ticks: 4,
follower_read_max_log_gap: 100,
raft_log_reserve_max_ticks: 6,
raft_engine_purge_interval: ReadableDuration::secs(10),
Expand Down Expand Up @@ -1048,6 +1051,12 @@ impl Config {
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["raft_log_gc_size_limit"])
.set(self.raft_log_gc_size_limit.unwrap_or_default().0 as f64);
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["max_apply_unpersisted_log_limit"])
.set(self.max_apply_unpersisted_log_limit as f64);
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["raft_read_index_retry_interval_ticks"])
.set(self.raft_read_index_retry_interval_ticks as f64);
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["raft_log_reserve_max_ticks"])
.set(self.raft_log_reserve_max_ticks as f64);
Expand Down
95 changes: 10 additions & 85 deletions components/raftstore/src/store/read_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,18 @@ impl<C: ErrorCallback> ReadIndexQueue<C> {
}
/// Check it's necessary to retry pending read requests or not.
/// Return true if all such conditions are satisfied:
/// 1. more than an election timeout elapsed from the last request push;
/// 2. more than an election timeout elapsed from the last retry;
/// 1. More than the retry interval (in ticks) has elapsed since the last
/// request push.
/// 2. More than the retry interval (in ticks) has elapsed since the last
/// retry.
/// 3. there are still unresolved requests in the queue.
pub fn check_needs_retry(&mut self, cfg: &Config) -> bool {
if self.reads.len() == self.ready_cnt {
return false;
}

if self.retry_countdown == usize::MAX {
self.retry_countdown = cfg.raft_election_timeout_ticks - 1;
self.retry_countdown = cfg.raft_read_index_retry_interval_ticks - 1;
return false;
}

Expand All @@ -143,7 +145,7 @@ impl<C: ErrorCallback> ReadIndexQueue<C> {
return false;
}

self.retry_countdown = cfg.raft_election_timeout_ticks;
self.retry_countdown = cfg.raft_read_index_retry_interval_ticks;
true
}

Expand Down Expand Up @@ -289,28 +291,10 @@ impl<C: ErrorCallback> ReadIndexQueue<C> {
if min_changed_offset != usize::MAX {
self.ready_cnt = cmp::max(self.ready_cnt, max_changed_offset + 1);
}
if max_changed_offset > 0 {
self.fold(min_changed_offset, max_changed_offset);
}
}

fn fold(&mut self, min_changed_offset: usize, max_changed_offset: usize) {
let mut r_idx = self.reads[max_changed_offset].read_index.unwrap();
let mut check_offset = max_changed_offset - 1;
loop {
let l_idx = self.reads[check_offset].read_index.unwrap_or(u64::MAX);
if l_idx > r_idx {
self.reads[check_offset].read_index = Some(r_idx);
} else if check_offset < min_changed_offset {
break;
} else {
r_idx = l_idx;
}
if check_offset == 0 {
break;
}
check_offset -= 1;
}
// NOTE: We should not try to fold these read index requests anymore,
// an earlier request can rely a higher committed index due to txn
// lock when 1pc/async-commit is used.
// See https://github.com/tikv/tikv/issues/17018 for more details.
}

pub fn gc(&mut self) {
Expand Down Expand Up @@ -507,65 +491,6 @@ mod tests {
use super::*;
use crate::store::Callback;

#[test]
fn test_read_queue_fold() {
let mut queue = ReadIndexQueue::<Callback<KvTestSnapshot>> {
handled_cnt: 125,
..Default::default()
};
for _ in 0..100 {
let id = Uuid::new_v4();
queue.reads.push_back(ReadIndexRequest::with_command(
id,
RaftCmdRequest::default(),
Callback::None,
Timespec::new(0, 0),
));

let offset = queue.handled_cnt + queue.reads.len() - 1;
queue.contexts.insert(id, offset);
}

queue.advance_replica_reads(Vec::new());
assert_eq!(queue.ready_cnt, 0);

queue.advance_replica_reads(vec![(queue.reads[0].id, None, 100)]);
assert_eq!(queue.ready_cnt, 1);

queue.advance_replica_reads(vec![(queue.reads[1].id, None, 100)]);
assert_eq!(queue.ready_cnt, 2);

queue.advance_replica_reads(vec![
(queue.reads[80].id, None, 80),
(queue.reads[84].id, None, 100),
(queue.reads[82].id, None, 70),
(queue.reads[78].id, None, 120),
(queue.reads[77].id, None, 40),
]);
assert_eq!(queue.ready_cnt, 85);

queue.advance_replica_reads(vec![
(queue.reads[20].id, None, 80),
(queue.reads[24].id, None, 100),
(queue.reads[22].id, None, 70),
(queue.reads[18].id, None, 120),
(queue.reads[17].id, None, 40),
]);
assert_eq!(queue.ready_cnt, 85);

for i in 0..78 {
assert_eq!(queue.reads[i].read_index.unwrap(), 40, "#{} failed", i);
}
for i in 78..83 {
assert_eq!(queue.reads[i].read_index.unwrap(), 70, "#{} failed", i);
}
for i in 84..85 {
assert_eq!(queue.reads[i].read_index.unwrap(), 100, "#{} failed", i);
}

queue.clear_all(None);
}

#[test]
fn test_become_leader_then_become_follower() {
let mut queue = ReadIndexQueue::<Callback<KvTestSnapshot>> {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ fn test_serde_custom_tikv_config() {
raft_log_gc_threshold: 12,
raft_log_gc_count_limit: Some(12),
raft_log_gc_size_limit: Some(ReadableSize::kb(1)),
raft_read_index_retry_interval_ticks: 123,
follower_read_max_log_gap: 100,
raft_log_reserve_max_ticks: 100,
raft_engine_purge_interval: ReadableDuration::minutes(20),
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/test-custom.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ raft-log-gc-tick-interval = "12s"
raft-log-gc-threshold = 12
raft-log-gc-count-limit = 12
raft-log-gc-size-limit = "1KB"
raft-read-index-retry-interval-ticks = 123
raft-log-reserve-max-ticks = 100
raft-engine-purge-interval = "20m"
max-manual-flush-rate = 5.0
Expand Down
Loading