Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 2 additions & 39 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,6 @@ env:
CARGO_TERM_COLOR: always

jobs:
nightly:
runs-on: ${{ matrix.os }}
strategy:
matrix:
# tests with sanitizer on `ubuntu-latest` is supported by
# self-defined `nihtly test` job, so we don't need to test it here.
os: [ macos-latest ]
steps:
- uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Install nightly toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2025-04-03
override: true
components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1
with:
sharedKey: ${{ matrix.os }}
- name: Cache dependencies
if: ${{ matrix.os == 'ubuntu-latest' }}
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install grcov; fi
- name: Format
run: |
make format
git diff --exit-code
- name: Clippy
run: make clippy
env:
EXTRA_CARGO_ARGS: '--fix'
- name: Run tests
run: make test
env:
RUST_BACKTRACE: 1
EXTRA_CARGO_ARGS: '--verbose'
stable:
runs-on: ${{ matrix.os }}
strategy:
Expand Down Expand Up @@ -83,7 +46,7 @@ jobs:
WITH_STABLE_TOOLCHAIN: 'force'
coverage:
runs-on: ubuntu-latest
needs: nightly
needs: stable
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -92,7 +55,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2025-04-03
toolchain: nightly-2026-01-30
override: true
components: llvm-tools-preview
- uses: Swatinem/rust-cache@v1
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "raft-engine"
version = "0.4.2"
authors = ["The TiKV Project Developers"]
edition = "2018"
edition = "2024"
rust-version = "1.85.0"
Comment on lines +5 to 6
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the repository structure
git ls-files | head -20

Repository: tikv/raft-engine

Length of output: 360


🏁 Script executed:

# Check for 'gen' usage as identifier
rg -n --type rust '\bgen\b' -g '!target/' -g '!Cargo.toml' | head -50

Repository: tikv/raft-engine

Length of output: 250


🏁 Script executed:

# Check unsafe fn patterns that might be affected by 2024 edition changes
ast-grep --pattern 'unsafe fn $NAME($$$) { $$$ }'

Repository: tikv/raft-engine

Length of output: 1751


🏁 Script executed:

# Let's also check the full Cargo.toml context
cat Cargo.toml

Repository: tikv/raft-engine

Length of output: 2619


Edition jump from 2018 → 2024 is significant — confirm this aligns with project requirements and downstream consumers.

The 2024 edition introduces breaking changes across two skipped editions (2021 and 2024), including the unsafe_op_in_unsafe_fn lint and new lifetime capture rules in impl Trait. The MSRV of 1.85.0 correctly matches the 2024 edition requirement.

A code scan shows no gen identifier conflicts (only .gen() method calls from rand) and no problematic unsafe function patterns, so the migration itself appears sound. Ensure CI passes cleanly with this edition, and verify compatibility with downstream consumers like TiKV that will depend on 2024-edition artifacts.

🤖 Prompt for AI Agents
In `@Cargo.toml` around lines 5 - 6, You bumped the Cargo.toml edition to "2024"
while setting rust-version = "1.85.0"; confirm this change aligns with project
and downstream requirements by running full CI and integration tests, validate
downstream consumers (e.g., TiKV) can consume 2024-edition artifacts, scan the
codebase for patterns affected by the edition (search for unsafe functions that
may trigger the unsafe_op_in_unsafe_fn lint and uses of impl Trait with inferred
lifetime captures), and if any incompatibilities appear either fix the offending
code or revert the edition change until downstream/CI compatibility is ensured.

description = "A persistent storage engine for Multi-Raft logs"
readme = "README.md"
Expand Down Expand Up @@ -58,7 +58,7 @@ protobuf = "2"
rayon = "1.5"
rhai = { version = "1.7", features = ["sync"], optional = true }
scopeguard = "1.1"
serde = { version = "=1.0.194", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
serde_repr = "0.1"
strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/fork.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::path::Path;
use std::sync::Arc;

use raft_engine::env::DefaultFileSystem;
use raft_engine::Config;
use raft_engine::Engine;
use raft_engine::env::DefaultFileSystem;

fn main() {
let mut args = std::env::args();
Expand Down
10 changes: 6 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::{info, warn};
use serde::{Deserialize, Serialize};

use crate::pipe_log::Version;
use crate::{util::ReadableSize, Result};
use crate::{Result, util::ReadableSize};

const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512;
const MIN_RECOVERY_THREADS: usize = 1;
Expand Down Expand Up @@ -343,9 +343,11 @@ mod tests {
let mut load: Config = toml::from_str(old).unwrap();
load.sanitize().unwrap();
// Downgrade to older version.
assert!(toml::to_string(&load)
.unwrap()
.contains("tolerate-corrupted-tail-records"));
assert!(
toml::to_string(&load)
.unwrap()
.contains("tolerate-corrupted-tail-records")
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

use hashbrown::HashMap;

use crate::Result;
use crate::file_pipe_log::ReplayMachine;
use crate::log_batch::{LogItemBatch, LogItemContent};
use crate::pipe_log::{FileId, LogQueue};
use crate::Result;

/// A `ConsistencyChecker` scans for log entry holes in a log queue. It will
/// return a list of corrupted raft groups along with their last valid log
Expand Down
68 changes: 40 additions & 28 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
use std::cell::{Cell, RefCell};
use std::marker::PhantomData;
use std::path::Path;
use std::sync::{mpsc, Arc, Mutex};
use std::sync::{Arc, Mutex, mpsc};
use std::thread::{Builder as ThreadBuilder, JoinHandle};
use std::time::{Duration, Instant};

use log::{error, info};
use protobuf::{parse_from_bytes, Message};
use protobuf::{Message, parse_from_bytes};

use crate::config::{Config, RecoveryMode};
use crate::consistency::ConsistencyChecker;
Expand All @@ -22,7 +22,7 @@ use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
use crate::purge::{PurgeHook, PurgeManager};
use crate::write_barrier::{WriteBarrier, Writer};
use crate::{perf_context, Error, GlobalStats, Result};
use crate::{Error, GlobalStats, Result, perf_context};

const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30);
/// Max times for `write`.
Expand Down Expand Up @@ -106,11 +106,13 @@ where
let memtables_clone = memtables.clone();
let metrics_flusher = ThreadBuilder::new()
.name("re-metrics".into())
.spawn(move || loop {
stats_clone.flush_metrics();
memtables_clone.flush_metrics();
if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() {
break;
.spawn(move || {
loop {
stats_clone.flush_metrics();
memtables_clone.flush_metrics();
if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() {
break;
}
}
})?;

Expand Down Expand Up @@ -648,14 +650,14 @@ where
pub(crate) mod tests {
use super::*;
use crate::env::{ObfuscatedFileSystem, Permission};
use crate::file_pipe_log::{parse_reserved_file_name, FileNameExt};
use crate::file_pipe_log::{FileNameExt, parse_reserved_file_name};
use crate::log_batch::AtomicGroupBuilder;
use crate::pipe_log::Version;
use crate::test_util::{generate_entries, PanicGuard};
use crate::test_util::{PanicGuard, generate_entries};
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use rand::{thread_rng, Rng};
use rand::{Rng, thread_rng};
use std::collections::{BTreeSet, HashSet};
use std::fs::OpenOptions;
use std::path::PathBuf;
Expand Down Expand Up @@ -1231,9 +1233,11 @@ pub(crate) mod tests {
// GC all log entries. Won't trigger purge because total size is not enough.
let count = engine.compact_to(1, 100);
assert_eq!(count, 100);
assert!(!engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
!engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);

// Append more logs to make total size greater than `purge_threshold`.
for index in 100..250 {
Expand All @@ -1243,9 +1247,11 @@ pub(crate) mod tests {
// GC first 101 log entries.
assert_eq!(engine.compact_to(1, 101), 1);
// Needs to purge because the total size is greater than `purge_threshold`.
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);

let old_min_file_seq = engine.file_span(LogQueue::Append).0;
let will_force_compact = engine.purge_expired_files().unwrap();
Expand All @@ -1259,9 +1265,11 @@ pub(crate) mod tests {

assert_eq!(engine.compact_to(1, 102), 1);
// Needs to purge because the total size is greater than `purge_threshold`.
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);
let will_force_compact = engine.purge_expired_files().unwrap();
// The region needs to be force compacted because the threshold is reached.
assert!(!will_force_compact.is_empty());
Expand Down Expand Up @@ -1350,9 +1358,11 @@ pub(crate) mod tests {
engine.append(11, 1, 11, Some(&data));

// The engine needs purge, and all old entries should be rewritten.
assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);
assert!(engine.purge_expired_files().unwrap().is_empty());
assert!(engine.file_span(LogQueue::Append).0 > 1);

Expand Down Expand Up @@ -1386,9 +1396,11 @@ pub(crate) mod tests {
}
}

assert!(engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append));
assert!(
engine
.purge_manager
.needs_rewrite_log_files(LogQueue::Append)
);
assert!(engine.purge_expired_files().unwrap().is_empty());
}

Expand All @@ -1410,7 +1422,7 @@ pub(crate) mod tests {
let empty_entry = Entry::new();
assert_eq!(empty_entry.compute_size(), 0);
log_batch
.add_entries::<Entry>(0, &[empty_entry.clone()])
.add_entries::<Entry>(0, std::slice::from_ref(&empty_entry))
.unwrap();
engine.write(&mut log_batch, false).unwrap();
let empty_state = RaftLocalState::new();
Expand All @@ -1420,7 +1432,7 @@ pub(crate) mod tests {
.unwrap();
engine.write(&mut log_batch, false).unwrap();
log_batch
.add_entries::<Entry>(2, &[empty_entry.clone()])
.add_entries::<Entry>(2, std::slice::from_ref(&empty_entry))
.unwrap();
log_batch
.put_message(2, b"key".to_vec(), &empty_state)
Expand Down
4 changes: 2 additions & 2 deletions src/env/log_fd/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use log::error;
use std::io::Result as IoResult;
use std::os::unix::io::RawFd;

use nix::NixPath;
use nix::errno::Errno;
use nix::fcntl::{self, OFlag};
use nix::sys::stat::Mode;
use nix::sys::uio::{pread, pwrite};
use nix::unistd::{close, ftruncate, lseek, Whence};
use nix::NixPath;
use nix::unistd::{Whence, close, ftruncate, lseek};

fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
let kind = std::io::Error::from(e).kind();
Expand Down
2 changes: 1 addition & 1 deletion src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::env::{DefaultFileSystem, FileSystem, Permission, WriteExt};

Expand Down
4 changes: 2 additions & 2 deletions src/file_pipe_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod pipe;
mod pipe_builder;
mod reader;

pub use format::{parse_reserved_file_name, FileNameExt};
pub use format::{FileNameExt, parse_reserved_file_name};
pub use pipe::DualPipes as FilePipeLog;
pub use pipe_builder::{
DefaultMachineFactory, DualPipesBuilder as FilePipeLogBuilder, RecoveryConfig, ReplayMachine,
Expand Down Expand Up @@ -173,7 +173,7 @@ pub mod debug {
use crate::env::DefaultFileSystem;
use crate::log_batch::{Command, LogBatch};
use crate::pipe_log::{FileBlockHandle, LogFileContext, LogQueue, Version};
use crate::test_util::{generate_entries, PanicGuard};
use crate::test_util::{PanicGuard, generate_entries};
use raft::eraftpb::Entry;

#[test]
Expand Down
12 changes: 7 additions & 5 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use crate::metrics::*;
use crate::pipe_log::{
FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes,
};
use crate::{perf_context, Error, Result};
use crate::{Error, Result, perf_context};

use super::format::{build_reserved_file_name, FileNameExt, LogFileFormat};
use super::format::{FileNameExt, LogFileFormat, build_reserved_file_name};
use super::log_file::build_file_reader;
use super::log_file::{build_file_writer, LogFileWriter};
use super::log_file::{LogFileWriter, build_file_writer};

pub type PathId = usize;
pub type Paths = Vec<PathBuf>;
Expand Down Expand Up @@ -614,8 +614,10 @@ mod tests {
// Only one thread can hold file lock
let r2 = new_test_pipes(&cfg);

assert!(format!("{}", r2.err().unwrap())
.contains("maybe another instance is using this directory"));
assert!(
format!("{}", r2.err().unwrap())
.contains("maybe another instance is using this directory")
);
}

#[test]
Expand Down
12 changes: 6 additions & 6 deletions src/file_pipe_log/pipe_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ use crate::config::{Config, RecoveryMode};
use crate::env::{FileSystem, Handle, Permission};
use crate::errors::is_no_space_err;
use crate::event_listener::EventListener;
use crate::log_batch::{LogItemBatch, LOG_BATCH_HEADER_LEN};
use crate::log_batch::{LOG_BATCH_HEADER_LEN, LogItemBatch};
use crate::pipe_log::{FileId, FileSeq, LogQueue};
use crate::util::{Factory, ReadableSize};
use crate::{Error, Result};

use super::format::{
build_reserved_file_name, lock_file_path, parse_reserved_file_name, FileNameExt, LogFileFormat,
FileNameExt, LogFileFormat, build_reserved_file_name, lock_file_path, parse_reserved_file_name,
};
use super::log_file::build_file_reader;
use super::pipe::{
find_available_dir, DualPipes, File, PathId, Paths, SinglePipe, DEFAULT_FIRST_FILE_SEQ,
DEFAULT_FIRST_FILE_SEQ, DualPipes, File, PathId, Paths, SinglePipe, find_available_dir,
};
use super::reader::LogItemBatchFileReader;

Expand Down Expand Up @@ -240,9 +240,9 @@ impl<F: FileSystem> DualPipesBuilder<F> {
self.scan_dir(&dir, lock)?;
}

self.append_file_names.sort_by(|a, b| a.seq.cmp(&b.seq));
self.rewrite_file_names.sort_by(|a, b| a.seq.cmp(&b.seq));
self.recycled_file_names.sort_by(|a, b| a.seq.cmp(&b.seq));
self.append_file_names.sort_by_key(|a| a.seq);
self.rewrite_file_names.sort_by_key(|a| a.seq);
self.recycled_file_names.sort_by_key(|a| a.seq);
Ok(())
}

Expand Down
Loading
Loading