From 681a6e754c99feeec58a70878f4488e46adbe132 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Thu, 19 Mar 2026 20:19:53 -0400 Subject: [PATCH] Fix bugs in FjallStorage This commit updates the integration tests to use FjallStorage and fix bugs in FjallStorage. --- server/src/log_storage/fjall_storage.rs | 256 +++++++++++++++++++++++- server/src/raft_node.rs | 11 +- server/tests/integration_test.rs | 12 +- 3 files changed, 258 insertions(+), 21 deletions(-) diff --git a/server/src/log_storage/fjall_storage.rs b/server/src/log_storage/fjall_storage.rs index c7fd80f..4e3ac90 100644 --- a/server/src/log_storage/fjall_storage.rs +++ b/server/src/log_storage/fjall_storage.rs @@ -5,8 +5,7 @@ use crate::Result; use crate::log_storage::{Entry, HardState, LogStorage}; -use crate::util::usize_to_u64; -use fjall::{PersistMode, UserValue}; +use fjall::UserValue; use raft::eraftpb::{ConfState, Snapshot}; use raft::{GetEntriesContext, RaftState, Storage, StorageError}; @@ -56,8 +55,33 @@ impl FjallStorage { None => 0, }; + // Load hard state from disk if it exists + let mut raft_state = RaftState::default(); + if let (Some(commit), Some(term), Some(vote)) = ( + hard_state_keyspace.get(COMMIT_KEY)?, + hard_state_keyspace.get(TERM_KEY)?, + hard_state_keyspace.get(VOTE_KEY)?, + ) { + let commit_bytes = commit + .as_ref() + .try_into() + .map_err(|_| "invalid commit value in hard state")?; + let term_bytes = term + .as_ref() + .try_into() + .map_err(|_| "invalid term value in hard state")?; + let vote_bytes = vote + .as_ref() + .try_into() + .map_err(|_| "invalid vote value in hard state")?; + + raft_state.hard_state.commit = u64::from_le_bytes(commit_bytes); + raft_state.hard_state.term = u64::from_le_bytes(term_bytes); + raft_state.hard_state.vote = u64::from_le_bytes(vote_bytes); + } + Ok(Self { - raft_state: RaftState::default(), + raft_state, first_log_index, last_log_index, db, @@ -93,12 +117,17 @@ impl Storage for FjallStorage { _context: GetEntriesContext, ) -> raft::Result> { assert!(low <= high, "low {low} is larger than high {high}"); - assert!(high <= self.last_index()? + 1); + if low < self.first_log_index { + return Err(raft::Error::Store(StorageError::Compacted)); + } + if high > self.last_log_index + 1 { + return Err(raft::Error::Store(StorageError::Unavailable)); + } let size = std::cmp::min(high - low, max_size.into().unwrap_or(u64::MAX)); let high = low + size; - let low = low.to_le_bytes(); - let high = high.to_le_bytes(); + let low = raft_index_to_key(low); + let high = raft_index_to_key(high); let entries = self .log_keyspace @@ -121,8 +150,13 @@ impl Storage for FjallStorage { if idx == 0 { return Ok(0); } + if idx < self.first_log_index { + return Err(raft::Error::Store(StorageError::Compacted)); + } - let entry = self.entry(idx)?.expect("invalid index"); + let entry = self + .entry(idx)? + .ok_or(raft::Error::Store(StorageError::Unavailable))?; Ok(entry.term) } @@ -145,21 +179,50 @@ impl LogStorage for FjallStorage { } fn append(&mut self, entries: &[raft::eraftpb::Entry]) -> raft::Result<()> { - let mut batch = self.db.batch().durability(Some(PersistMode::SyncAll)); + if entries.is_empty() { + return Ok(()); + } + + assert!( + self.first_log_index <= entries[0].index, + "overwrite compacted raft logs, compacted: {}, append: {}", + self.first_log_index - 1, + entries[0].index, + ); + assert!( + self.last_log_index + 1 >= entries[0].index, + "raft logs should be continuous, last index: {}, new appended: {}", + self.last_log_index, + entries[0].index, + ); + + let mut batch = self.db.batch(); + // Remove all entries overwritten by the new entries. + for index in entries[0].index..=self.last_log_index { + batch.remove(&self.log_keyspace, raft_index_to_key(index)); + } for entry in entries { let entry: Entry = entry.into(); - batch.insert(&self.log_keyspace, entry.index.to_le_bytes(), entry); + batch.insert(&self.log_keyspace, raft_index_to_key(entry.index), entry); } batch.commit().map_err(fjall_error_to_raft_error)?; - self.last_log_index += usize_to_u64(entries.len()); + self.last_log_index = entries.last().expect("empty entries return early").index; + + assert!( + self.raft_state.hard_state.commit <= self.last_log_index, + "commit index is invalid, commit: {}, last_log_index: {}", + self.raft_state.hard_state.commit, + self.last_log_index + ); + Ok(()) } fn set_hard_state(&mut self, hard_state: raft::prelude::HardState) -> raft::Result<()> { self.raft_state.hard_state = hard_state; let HardState { commit, term, vote } = (&self.raft_state.hard_state).into(); - let mut batch = self.db.batch().durability(Some(PersistMode::SyncAll)); + let mut batch = self.db.batch(); batch.insert(&self.hard_state_keyspace, COMMIT_KEY, commit.to_le_bytes()); batch.insert(&self.hard_state_keyspace, TERM_KEY, term.to_le_bytes()); batch.insert(&self.hard_state_keyspace, VOTE_KEY, vote.to_le_bytes()); @@ -176,6 +239,12 @@ impl LogStorage for FjallStorage { } } +fn raft_index_to_key(index: u64) -> [u8; size_of::()] { + // Use big endian so that the lexicographical ordering of keys is consistent with the numeric + // ordering of indices. + index.to_be_bytes() +} + impl TryFrom for Entry { type Error = Box; @@ -196,3 +265,168 @@ fn fjall_error_to_raft_error(e: fjall::Error) -> raft::Error { e => raft::Error::Store(StorageError::Other(e.into())), } } + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use protobuf::{CachedSize, UnknownFields}; + use raft::Storage; + use raft::prelude::EntryType; + use tempfile::TempDir; + + #[test] + fn test_fjall_storage_orders_entries_by_numeric_index() { + let temp_dir = TempDir::new().unwrap(); + let db = fjall::Database::builder(temp_dir.path()).open().unwrap(); + let log_keyspace = db + .keyspace("log", fjall::KeyspaceCreateOptions::default) + .unwrap(); + let hard_state_keyspace = db + .keyspace("hard_state", fjall::KeyspaceCreateOptions::default) + .unwrap(); + + let mut storage = FjallStorage::new(db.clone(), log_keyspace, hard_state_keyspace).unwrap(); + + let make_entry = |index: u64, term: u64| raft::prelude::Entry { + entry_type: EntryType::EntryNormal, + term, + index, + data: Bytes::from_static(b"payload"), + context: Bytes::new(), + sync_log: false, + unknown_fields: UnknownFields::default(), + cached_size: CachedSize::default(), + }; + + let entries = vec![make_entry(1, 1), make_entry(2, 1), make_entry(3, 1)]; + + storage.append(&entries).unwrap(); + + assert_eq!(storage.first_index().unwrap(), 1); + assert_eq!(storage.last_index().unwrap(), 3); + + let fetched = storage + .entries(1, 4, None, GetEntriesContext::empty(false)) + .unwrap(); + + let fetched_indexes: Vec<_> = fetched.into_iter().map(|e| e.index).collect(); + assert_eq!(fetched_indexes, vec![1, 2, 3]); + + drop(storage); + + let log_keyspace = db + .keyspace("log", fjall::KeyspaceCreateOptions::default) + .unwrap(); + let hard_state_keyspace = db + .keyspace("hard_state", fjall::KeyspaceCreateOptions::default) + .unwrap(); + let storage = FjallStorage::new(db, log_keyspace, hard_state_keyspace).unwrap(); + + let fetched = storage + .entries(1, 4, None, GetEntriesContext::empty(false)) + .unwrap(); + + let fetched_indexes: Vec<_> = fetched.into_iter().map(|e| e.index).collect(); + assert_eq!(fetched_indexes, vec![1, 2, 3]); + } + + #[test] + fn test_fjall_storage_append_truncates_log_at_overwrite() { + let temp_dir = TempDir::new().unwrap(); + let db = fjall::Database::builder(temp_dir.path()).open().unwrap(); + let log_keyspace = db + .keyspace("log", fjall::KeyspaceCreateOptions::default) + .unwrap(); + let hard_state_keyspace = db + .keyspace("hard_state", fjall::KeyspaceCreateOptions::default) + .unwrap(); + + let mut storage = FjallStorage::new(db, log_keyspace, hard_state_keyspace).unwrap(); + + let make_entry = |index: u64, term: u64| raft::prelude::Entry { + entry_type: EntryType::EntryNormal, + term, + index, + data: Bytes::from_static(b"payload"), + context: Bytes::new(), + sync_log: false, + unknown_fields: UnknownFields::default(), + cached_size: CachedSize::default(), + }; + + // Append 1, 2, 3 + storage + .append(&[make_entry(1, 1), make_entry(2, 1), make_entry(3, 1)]) + .unwrap(); + assert_eq!(storage.last_index().unwrap(), 3); + + // Overwrite at index 2 with new entry at term 2 + storage.append(&[make_entry(2, 2)]).unwrap(); + assert_eq!(storage.last_index().unwrap(), 2); + + let entries = storage + .entries(1, 3, None, GetEntriesContext::empty(false)) + .unwrap(); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].index, 1); + assert_eq!(entries[0].term, 1); + assert_eq!(entries[1].index, 2); + assert_eq!(entries[1].term, 2); + + // Ensure 3 is gone + assert!(storage.entry(3).unwrap().is_none()); + } + + #[test] + fn test_fjall_storage_append_truncates_log_at_overwrite_with_offset() { + let temp_dir = TempDir::new().unwrap(); + let db = fjall::Database::builder(temp_dir.path()).open().unwrap(); + let log_keyspace = db + .keyspace("log", fjall::KeyspaceCreateOptions::default) + .unwrap(); + let hard_state_keyspace = db + .keyspace("hard_state", fjall::KeyspaceCreateOptions::default) + .unwrap(); + + let mut storage = FjallStorage::new(db, log_keyspace, hard_state_keyspace).unwrap(); + // Force first_log_index to 10 + storage.first_log_index = 10; + storage.last_log_index = 9; + + let make_entry = |index: u64, term: u64| raft::prelude::Entry { + entry_type: EntryType::EntryNormal, + term, + index, + data: Bytes::from_static(b"payload"), + context: Bytes::new(), + sync_log: false, + unknown_fields: UnknownFields::default(), + cached_size: CachedSize::default(), + }; + + // Pretend we have entries 10, 11, 12 + // We need to set first_log_index manually or by compacting, + // but new() initializes first_log_index from the first key in log_keyspace. + + storage + .append(&[make_entry(10, 1), make_entry(11, 1), make_entry(12, 1)]) + .unwrap(); + assert_eq!(storage.first_index().unwrap(), 10); + assert_eq!(storage.last_index().unwrap(), 12); + + // Overwrite at index 11 + storage.append(&[make_entry(11, 2)]).unwrap(); + assert_eq!(storage.last_index().unwrap(), 11); + + let entries = storage + .entries(10, 12, None, GetEntriesContext::empty(false)) + .unwrap(); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].index, 10); + assert_eq!(entries[1].index, 11); + assert_eq!(entries[1].term, 2); + + assert!(storage.entry(12).unwrap().is_none()); + } +} diff --git a/server/src/raft_node.rs b/server/src/raft_node.rs index 1cd3df5..b444b3e 100644 --- a/server/src/raft_node.rs +++ b/server/src/raft_node.rs @@ -18,7 +18,7 @@ use protobuf::{Message as ProtobufMessage, ProtobufResult}; use raft::prelude::{ConfState, Entry}; use raft::{Config, RawNode}; use std::collections::HashMap; -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::select; use tokio::sync::{mpsc, oneshot}; use tokio::task::{JoinHandle, spawn_blocking}; @@ -122,10 +122,9 @@ impl Node { /// /// Returns an error if Raft processing fails. pub async fn run(mut self, cancellation_token: CancellationToken) -> Result<()> { - let mut timeout = self.raft_tick_interval; + let mut ticker = tokio::time::interval(self.raft_tick_interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { - let start = Instant::now(); - select! { message = self.rx.recv() => { match message { @@ -201,14 +200,12 @@ impl Node { } None => break, } - timeout = timeout.saturating_sub(start.elapsed()); } - () = tokio::time::sleep(timeout) => { + _ = ticker.tick() => { if self.raft_group.tick().await { self.on_ready().await?; } - timeout = self.raft_tick_interval; } () = cancellation_token.cancelled() => break, diff --git a/server/tests/integration_test.rs b/server/tests/integration_test.rs index 9b9619e..0c047af 100644 --- a/server/tests/integration_test.rs +++ b/server/tests/integration_test.rs @@ -1,3 +1,4 @@ +use andross_server::log_storage::FjallStorage; use andross_server::service::kv_service_client::KvServiceClient; use andross_server::service::{CommandRequest, CommandResponse}; use andross_server::{ @@ -5,7 +6,7 @@ use andross_server::{ test_database_from_state, }; use bytes::Bytes; -use raft::storage::MemStorage; +use fjall::KeyspaceCreateOptions; use std::collections::HashMap; use std::time::Duration; use tempfile::TempDir; @@ -252,13 +253,18 @@ async fn start_servers(mut server_init_state: ServerInitState) -> Vec