Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,22 @@ jobs:
- uses: Swatinem/rust-cache@v1
with:
sharedKey: ${{ matrix.os }}-stable
- name: Install gRPC dependencies
run: |
sudo apt-get update
sudo apt-get install -y pkg-config libgrpc-dev libgrpc++-dev protobuf-compiler-grpc
- name: Clippy
run: make clippy
env:
WITH_STABLE_TOOLCHAIN: 'force'
GRPCIO_SYS_USE_PKG_CONFIG: '1'
- name: Run tests
run: make test
env:
RUST_BACKTRACE: 1
EXTRA_CARGO_ARGS: '--verbose'
WITH_STABLE_TOOLCHAIN: 'force'
GRPCIO_SYS_USE_PKG_CONFIG: '1'
coverage:
runs-on: ubuntu-latest
needs: nightly
Expand All @@ -98,6 +104,10 @@ jobs:
- uses: Swatinem/rust-cache@v1
with:
sharedKey: ubuntu-latest
- name: Install gRPC dependencies
run: |
sudo apt-get update
sudo apt-get install -y pkg-config libgrpc-dev libgrpc++-dev protobuf-compiler-grpc
- name: Install grcov
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi
- name: Run tests
Expand All @@ -106,6 +116,7 @@ jobs:
RUSTFLAGS: '-Cinstrument-coverage'
LLVM_PROFILE_FILE: '%p-%m.profraw'
EXTRA_CARGO_ARGS: '--verbose'
GRPCIO_SYS_USE_PKG_CONFIG: '1'
- name: Run grcov
run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov
- name: Upload
Expand Down
12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
name = "raft-engine"
version = "0.4.2"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.75.0"
edition = "2024"
rust-version = "1.85.0"
description = "A persistent storage engine for Multi-Raft logs"
readme = "README.md"
repository = "https://github.com/tikv/raft-engine"
Expand Down Expand Up @@ -58,7 +58,7 @@ protobuf = "2"
rayon = "1.5"
rhai = { version = "1.7", features = ["sync"], optional = true }
scopeguard = "1.1"
serde = { version = "=1.0.194", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
serde_repr = "0.1"
strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0"
Expand All @@ -67,14 +67,18 @@ thiserror = "1.0"
criterion = "0.4"
ctor = "0.2"
env_logger = "0.10"
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = [
grpcio = { version = "=0.10.2", default-features = false, features = [
"protobuf-codec",
] }
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-8.5", default-features = false, features = [
"protobuf-codec",
] }
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = [
"protobuf-codec",
] }
rand = "0.8"
rand_distr = "0.4"
serde_json = "=1.0.146"
tempfile = "3.6"
toml = "0.8"

Expand Down
29 changes: 19 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,31 @@ format:
cargo ${TOOLCHAIN_ARGS} fmt --all

CLIPPY_WHITELIST += -A clippy::bool_assert_comparison
CMAKE_COMPAT := CMAKE="$(CURDIR)/scripts/cmake-wrapper.sh"
## Run clippy.
clippy:
# Fresh lockfile resolution can pull grpcio/grpcio-sys versions that fail on
# macOS arm64 CI images. Force compatible selections before linting.
@if cargo ${TOOLCHAIN_ARGS} tree --all-features -i 'grpcio@0.13.0' >/dev/null 2>&1; then \
cargo ${TOOLCHAIN_ARGS} update -p grpcio@0.13.0 --precise 0.10.2; \
fi
@if cargo ${TOOLCHAIN_ARGS} tree --all-features -i 'grpcio-sys@0.10.1+1.44.0' >/dev/null 2>&1; then \
cargo ${TOOLCHAIN_ARGS} update -p grpcio-sys@0.10.1+1.44.0 --precise 0.10.3+1.44.0-patched; \
fi
ifdef WITH_NIGHTLY_FEATURES
cargo ${TOOLCHAIN_ARGS} clippy --all --features nightly_group,failpoints --all-targets -- -D clippy::all ${CLIPPY_WHITELIST}
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} clippy --all --features nightly_group,failpoints --all-targets -- -D clippy::all ${CLIPPY_WHITELIST}
else
cargo ${TOOLCHAIN_ARGS} clippy --all --features failpoints --all-targets -- -D clippy::all ${CLIPPY_WHITELIST}
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} clippy --all --features failpoints --all-targets -- -D clippy::all ${CLIPPY_WHITELIST}
endif

## Run tests.
test:
ifdef WITH_NIGHTLY_FEATURES
cargo ${TOOLCHAIN_ARGS} test --all --features nightly_group ${EXTRA_CARGO_ARGS} -- --nocapture
cargo ${TOOLCHAIN_ARGS} test --test failpoints --features nightly_group,failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} test --all --features nightly_group ${EXTRA_CARGO_ARGS} -- --nocapture
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} test --test failpoints --features nightly_group,failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture
else
cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture
cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture
endif

## Run tests with various features for maximum code coverage.
Expand All @@ -66,10 +75,10 @@ test_matrix:
$(error Must run test matrix with nightly features. Please reset WITH_STABLE_TOOLCHAIN.)
else
test_matrix: test
cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture
cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture
cargo ${TOOLCHAIN_ARGS} test --all --features nightly_group,std_fs ${EXTRA_CARGO_ARGS} -- --nocapture
cargo ${TOOLCHAIN_ARGS} test --test failpoints --features nightly_group,std_fs,failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} test --all --features nightly_group,std_fs ${EXTRA_CARGO_ARGS} -- --nocapture
${CMAKE_COMPAT} cargo ${TOOLCHAIN_ARGS} test --test failpoints --features nightly_group,std_fs,failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture
endif

## Build raft-engine-ctl.
Expand Down
2 changes: 1 addition & 1 deletion examples/fork.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::path::Path;
use std::sync::Arc;

use raft_engine::env::DefaultFileSystem;
use raft_engine::Config;
use raft_engine::Engine;
use raft_engine::env::DefaultFileSystem;

fn main() {
let mut args = std::env::args();
Expand Down
12 changes: 12 additions & 0 deletions scripts/cmake-wrapper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail

if [[ $# -gt 0 ]]; then
case "$1" in
--build|--install|--open|--help|--version)
exec cmake "$@"
;;
esac
fi

exec cmake -DCMAKE_POLICY_VERSION_MINIMUM=3.5 "$@"
10 changes: 6 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::{info, warn};
use serde::{Deserialize, Serialize};

use crate::pipe_log::Version;
use crate::{util::ReadableSize, Result};
use crate::{Result, util::ReadableSize};

const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512;
const MIN_RECOVERY_THREADS: usize = 1;
Expand Down Expand Up @@ -343,9 +343,11 @@ mod tests {
let mut load: Config = toml::from_str(old).unwrap();
load.sanitize().unwrap();
// Downgrade to older version.
assert!(toml::to_string(&load)
.unwrap()
.contains("tolerate-corrupted-tail-records"));
assert!(
toml::to_string(&load)
.unwrap()
.contains("tolerate-corrupted-tail-records")
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

use hashbrown::HashMap;

use crate::Result;
use crate::file_pipe_log::ReplayMachine;
use crate::log_batch::{LogItemBatch, LogItemContent};
use crate::pipe_log::{FileId, LogQueue};
use crate::Result;

/// A `ConsistencyChecker` scans for log entry holes in a log queue. It will
/// return a list of corrupted raft groups along with their last valid log
Expand Down
68 changes: 40 additions & 28 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
use std::cell::{Cell, RefCell};
use std::marker::PhantomData;
use std::path::Path;
use std::sync::{mpsc, Arc, Mutex};
use std::sync::{Arc, Mutex, mpsc};
use std::thread::{Builder as ThreadBuilder, JoinHandle};
use std::time::{Duration, Instant};

use log::{error, info};
use protobuf::{parse_from_bytes, Message};
use protobuf::{Message, parse_from_bytes};

use crate::config::{Config, RecoveryMode};
use crate::consistency::ConsistencyChecker;
Expand All @@ -22,7 +22,7 @@ use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
use crate::purge::{PurgeHook, PurgeManager};
use crate::write_barrier::{WriteBarrier, Writer};
use crate::{perf_context, Error, GlobalStats, Result};
use crate::{Error, GlobalStats, Result, perf_context};

const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);
/// Max times for `write`.
Expand Down Expand Up @@ -106,11 +106,13 @@ where
let memtables_clone = memtables.clone();
let metrics_flusher = ThreadBuilder::new()
.name("re-metrics".into())
.spawn(move || loop {
stats_clone.flush_metrics();
memtables_clone.flush_metrics();
if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() {
break;
.spawn(move || {
loop {
stats_clone.flush_metrics();
memtables_clone.flush_metrics();
if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() {
break;
}
}
})?;

Expand Down Expand Up @@ -648,14 +650,14 @@ where
pub(crate) mod tests {
use super::*;
use crate::env::{ObfuscatedFileSystem, Permission};
use crate::file_pipe_log::{parse_reserved_file_name, FileNameExt};
use crate::file_pipe_log::{FileNameExt, parse_reserved_file_name};
use crate::log_batch::AtomicGroupBuilder;
use crate::pipe_log::Version;
use crate::test_util::{generate_entries, PanicGuard};
use crate::test_util::{PanicGuard, generate_entries};
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use rand::{thread_rng, Rng};
use rand::{Rng, thread_rng};
use std::collections::{BTreeSet, HashSet};
use std::fs::OpenOptions;
use std::path::PathBuf;
Expand Down Expand Up @@ -1231,9 +1233,11 @@ pub(crate) mod tests {
// GC all log entries. Won't trigger purge because total size is not enough.
let count = engine.compact_to(1, 100);
assert_eq!(count, 100);
assert!(!engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
!engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);

// Append more logs to make total size greater than `purge_threshold`.
for index in 100..250 {
Expand All @@ -1243,9 +1247,11 @@ pub(crate) mod tests {
// GC first 101 log entries.
assert_eq!(engine.compact_to(1, 101), 1);
// Needs to purge because the total size is greater than `purge_threshold`.
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);

let old_min_file_seq = engine.file_span(LogQueue::Append).0;
let will_force_compact = engine.purge_expired_files().unwrap();
Expand All @@ -1259,9 +1265,11 @@ pub(crate) mod tests {

assert_eq!(engine.compact_to(1, 102), 1);
// Needs to purge because the total size is greater than `purge_threshold`.
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);
let will_force_compact = engine.purge_expired_files().unwrap();
// The region needs to be force compacted because the threshold is reached.
assert!(!will_force_compact.is_empty());
Expand Down Expand Up @@ -1350,9 +1358,11 @@ pub(crate) mod tests {
engine.append(11, 1, 11, Some(&data));

// The engine needs purge, and all old entries should be rewritten.
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);
assert!(engine.purge_expired_files().unwrap().is_empty());
assert!(engine.file_span(LogQueue::Append).0 > 1);

Expand Down Expand Up @@ -1386,9 +1396,11 @@ pub(crate) mod tests {
}
}

assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);
assert!(engine.purge_expired_files().unwrap().is_empty());
}

Expand All @@ -1410,7 +1422,7 @@ pub(crate) mod tests {
let empty_entry = Entry::new();
assert_eq!(empty_entry.compute_size(), 0);
log_batch
.add_entries::<Entry>(0, &[empty_entry.clone()])
.add_entries::<Entry>(0, std::slice::from_ref(&empty_entry))
.unwrap();
engine.write(&mut log_batch, false).unwrap();
let empty_state = RaftLocalState::new();
Expand All @@ -1420,7 +1432,7 @@ pub(crate) mod tests {
.unwrap();
engine.write(&mut log_batch, false).unwrap();
log_batch
.add_entries::<Entry>(2, &[empty_entry.clone()])
.add_entries::<Entry>(2, std::slice::from_ref(&empty_entry))
.unwrap();
log_batch
.put_message(2, b"key".to_vec(), &empty_state)
Expand Down
4 changes: 2 additions & 2 deletions src/env/log_fd/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use log::error;
use std::io::Result as IoResult;
use std::os::unix::io::RawFd;

use nix::NixPath;
use nix::errno::Errno;
use nix::fcntl::{self, OFlag};
use nix::sys::stat::Mode;
use nix::sys::uio::{pread, pwrite};
use nix::unistd::{close, ftruncate, lseek, Whence};
use nix::NixPath;
use nix::unistd::{Whence, close, ftruncate, lseek};

fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
let kind = std::io::Error::from(e).kind();
Expand Down
2 changes: 1 addition & 1 deletion src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::env::{DefaultFileSystem, FileSystem, Permission, WriteExt};

Expand Down
3 changes: 1 addition & 2 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,12 @@ impl<F: FileSystem> LogFileWriter<F> {
}
self.capacity += alloc;
}
self.writer.write_all(buf).map_err(|e| {
self.writer.write_all(buf).inspect_err(|_| {
self.writer
.seek(SeekFrom::Start(self.written as u64))
.unwrap_or_else(|e| {
panic!("failed to reseek after write failure: {}", e);
});
e
})?;
self.written = new_written;
Ok(())
Expand Down
Loading
Loading