Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 245 additions & 11 deletions server/src/log_storage/fjall_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

use crate::Result;
use crate::log_storage::{Entry, HardState, LogStorage};
use crate::util::usize_to_u64;
use fjall::{PersistMode, UserValue};
use fjall::UserValue;
use raft::eraftpb::{ConfState, Snapshot};
use raft::{GetEntriesContext, RaftState, Storage, StorageError};

Expand Down Expand Up @@ -56,8 +55,33 @@ impl FjallStorage {
None => 0,
};

// Load hard state from disk if it exists
let mut raft_state = RaftState::default();
if let (Some(commit), Some(term), Some(vote)) = (
hard_state_keyspace.get(COMMIT_KEY)?,
hard_state_keyspace.get(TERM_KEY)?,
hard_state_keyspace.get(VOTE_KEY)?,
) {
let commit_bytes = commit
.as_ref()
.try_into()
.map_err(|_| "invalid commit value in hard state")?;
let term_bytes = term
.as_ref()
.try_into()
.map_err(|_| "invalid term value in hard state")?;
let vote_bytes = vote
.as_ref()
.try_into()
.map_err(|_| "invalid vote value in hard state")?;

raft_state.hard_state.commit = u64::from_le_bytes(commit_bytes);
raft_state.hard_state.term = u64::from_le_bytes(term_bytes);
raft_state.hard_state.vote = u64::from_le_bytes(vote_bytes);
}

Ok(Self {
raft_state: RaftState::default(),
raft_state,
first_log_index,
last_log_index,
db,
Expand Down Expand Up @@ -93,12 +117,17 @@ impl Storage for FjallStorage {
_context: GetEntriesContext,
) -> raft::Result<Vec<raft::prelude::Entry>> {
assert!(low <= high, "low {low} is larger than high {high}");
assert!(high <= self.last_index()? + 1);
if low < self.first_log_index {
return Err(raft::Error::Store(StorageError::Compacted));
}
if high > self.last_log_index + 1 {
return Err(raft::Error::Store(StorageError::Unavailable));
}

let size = std::cmp::min(high - low, max_size.into().unwrap_or(u64::MAX));
let high = low + size;
let low = low.to_le_bytes();
let high = high.to_le_bytes();
let low = raft_index_to_key(low);
let high = raft_index_to_key(high);

let entries = self
.log_keyspace
Expand All @@ -121,8 +150,13 @@ impl Storage for FjallStorage {
if idx == 0 {
return Ok(0);
}
if idx < self.first_log_index {
return Err(raft::Error::Store(StorageError::Compacted));
}

let entry = self.entry(idx)?.expect("invalid index");
let entry = self
.entry(idx)?
.ok_or(raft::Error::Store(StorageError::Unavailable))?;
Ok(entry.term)
}

Expand All @@ -145,21 +179,50 @@ impl LogStorage for FjallStorage {
}

fn append(&mut self, entries: &[raft::eraftpb::Entry]) -> raft::Result<()> {
let mut batch = self.db.batch().durability(Some(PersistMode::SyncAll));
if entries.is_empty() {
return Ok(());
}

assert!(
self.first_log_index <= entries[0].index,
"overwrite compacted raft logs, compacted: {}, append: {}",
self.first_log_index - 1,
entries[0].index,
);
assert!(
self.last_log_index + 1 >= entries[0].index,
"raft logs should be continuous, last index: {}, new appended: {}",
self.last_log_index,
entries[0].index,
);

let mut batch = self.db.batch();
// Remove all entries overwritten by the new entries.
for index in entries[0].index..=self.last_log_index {
batch.remove(&self.log_keyspace, raft_index_to_key(index));
}
for entry in entries {
let entry: Entry = entry.into();
batch.insert(&self.log_keyspace, entry.index.to_le_bytes(), entry);
batch.insert(&self.log_keyspace, raft_index_to_key(entry.index), entry);
}
batch.commit().map_err(fjall_error_to_raft_error)?;

self.last_log_index += usize_to_u64(entries.len());
self.last_log_index = entries.last().expect("empty entries return early").index;

assert!(
self.raft_state.hard_state.commit <= self.last_log_index,
"commit index is invalid, commit: {}, last_log_index: {}",
self.raft_state.hard_state.commit,
self.last_log_index
);

Ok(())
}

fn set_hard_state(&mut self, hard_state: raft::prelude::HardState) -> raft::Result<()> {
self.raft_state.hard_state = hard_state;
let HardState { commit, term, vote } = (&self.raft_state.hard_state).into();
let mut batch = self.db.batch().durability(Some(PersistMode::SyncAll));
let mut batch = self.db.batch();
batch.insert(&self.hard_state_keyspace, COMMIT_KEY, commit.to_le_bytes());
batch.insert(&self.hard_state_keyspace, TERM_KEY, term.to_le_bytes());
batch.insert(&self.hard_state_keyspace, VOTE_KEY, vote.to_le_bytes());
Expand All @@ -176,6 +239,12 @@ impl LogStorage for FjallStorage {
}
}

fn raft_index_to_key(index: u64) -> [u8; size_of::<u64>()] {
// Use big endian so that the lexicographical ordering of keys is consistent with the numeric
// ordering of indices.
index.to_be_bytes()
}

impl TryFrom<UserValue> for Entry {
type Error = Box<dyn std::error::Error + Send + Sync>;

Expand All @@ -196,3 +265,168 @@ fn fjall_error_to_raft_error(e: fjall::Error) -> raft::Error {
e => raft::Error::Store(StorageError::Other(e.into())),
}
}

#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use protobuf::{CachedSize, UnknownFields};
use raft::Storage;
use raft::prelude::EntryType;
use tempfile::TempDir;

#[test]
fn test_fjall_storage_orders_entries_by_numeric_index() {
let temp_dir = TempDir::new().unwrap();
let db = fjall::Database::builder(temp_dir.path()).open().unwrap();
let log_keyspace = db
.keyspace("log", fjall::KeyspaceCreateOptions::default)
.unwrap();
let hard_state_keyspace = db
.keyspace("hard_state", fjall::KeyspaceCreateOptions::default)
.unwrap();

let mut storage = FjallStorage::new(db.clone(), log_keyspace, hard_state_keyspace).unwrap();

let make_entry = |index: u64, term: u64| raft::prelude::Entry {
entry_type: EntryType::EntryNormal,
term,
index,
data: Bytes::from_static(b"payload"),
context: Bytes::new(),
sync_log: false,
unknown_fields: UnknownFields::default(),
cached_size: CachedSize::default(),
};

let entries = vec![make_entry(1, 1), make_entry(2, 1), make_entry(3, 1)];

storage.append(&entries).unwrap();

assert_eq!(storage.first_index().unwrap(), 1);
assert_eq!(storage.last_index().unwrap(), 3);

let fetched = storage
.entries(1, 4, None, GetEntriesContext::empty(false))
.unwrap();

let fetched_indexes: Vec<_> = fetched.into_iter().map(|e| e.index).collect();
assert_eq!(fetched_indexes, vec![1, 2, 3]);

drop(storage);

let log_keyspace = db
.keyspace("log", fjall::KeyspaceCreateOptions::default)
.unwrap();
let hard_state_keyspace = db
.keyspace("hard_state", fjall::KeyspaceCreateOptions::default)
.unwrap();
let storage = FjallStorage::new(db, log_keyspace, hard_state_keyspace).unwrap();

let fetched = storage
.entries(1, 4, None, GetEntriesContext::empty(false))
.unwrap();

let fetched_indexes: Vec<_> = fetched.into_iter().map(|e| e.index).collect();
assert_eq!(fetched_indexes, vec![1, 2, 3]);
}

#[test]
fn test_fjall_storage_append_truncates_log_at_overwrite() {
let temp_dir = TempDir::new().unwrap();
let db = fjall::Database::builder(temp_dir.path()).open().unwrap();
let log_keyspace = db
.keyspace("log", fjall::KeyspaceCreateOptions::default)
.unwrap();
let hard_state_keyspace = db
.keyspace("hard_state", fjall::KeyspaceCreateOptions::default)
.unwrap();

let mut storage = FjallStorage::new(db, log_keyspace, hard_state_keyspace).unwrap();

let make_entry = |index: u64, term: u64| raft::prelude::Entry {
entry_type: EntryType::EntryNormal,
term,
index,
data: Bytes::from_static(b"payload"),
context: Bytes::new(),
sync_log: false,
unknown_fields: UnknownFields::default(),
cached_size: CachedSize::default(),
};

// Append 1, 2, 3
storage
.append(&[make_entry(1, 1), make_entry(2, 1), make_entry(3, 1)])
.unwrap();
assert_eq!(storage.last_index().unwrap(), 3);

// Overwrite at index 2 with new entry at term 2
storage.append(&[make_entry(2, 2)]).unwrap();
assert_eq!(storage.last_index().unwrap(), 2);

let entries = storage
.entries(1, 3, None, GetEntriesContext::empty(false))
.unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].index, 1);
assert_eq!(entries[0].term, 1);
assert_eq!(entries[1].index, 2);
assert_eq!(entries[1].term, 2);

// Ensure 3 is gone
assert!(storage.entry(3).unwrap().is_none());
}

#[test]
fn test_fjall_storage_append_truncates_log_at_overwrite_with_offset() {
let temp_dir = TempDir::new().unwrap();
let db = fjall::Database::builder(temp_dir.path()).open().unwrap();
let log_keyspace = db
.keyspace("log", fjall::KeyspaceCreateOptions::default)
.unwrap();
let hard_state_keyspace = db
.keyspace("hard_state", fjall::KeyspaceCreateOptions::default)
.unwrap();

let mut storage = FjallStorage::new(db, log_keyspace, hard_state_keyspace).unwrap();
// Force first_log_index to 10
storage.first_log_index = 10;
storage.last_log_index = 9;

let make_entry = |index: u64, term: u64| raft::prelude::Entry {
entry_type: EntryType::EntryNormal,
term,
index,
data: Bytes::from_static(b"payload"),
context: Bytes::new(),
sync_log: false,
unknown_fields: UnknownFields::default(),
cached_size: CachedSize::default(),
};

// Pretend we have entries 10, 11, 12
// We need to set first_log_index manually or by compacting,
// but new() initializes first_log_index from the first key in log_keyspace.

storage
.append(&[make_entry(10, 1), make_entry(11, 1), make_entry(12, 1)])
.unwrap();
assert_eq!(storage.first_index().unwrap(), 10);
assert_eq!(storage.last_index().unwrap(), 12);

// Overwrite at index 11
storage.append(&[make_entry(11, 2)]).unwrap();
assert_eq!(storage.last_index().unwrap(), 11);

let entries = storage
.entries(10, 12, None, GetEntriesContext::empty(false))
.unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].index, 10);
assert_eq!(entries[1].index, 11);
assert_eq!(entries[1].term, 2);

assert!(storage.entry(12).unwrap().is_none());
}
}
11 changes: 4 additions & 7 deletions server/src/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use protobuf::{Message as ProtobufMessage, ProtobufResult};
use raft::prelude::{ConfState, Entry};
use raft::{Config, RawNode};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use std::time::Duration;
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio::task::{JoinHandle, spawn_blocking};
Expand Down Expand Up @@ -122,10 +122,9 @@ impl Node {
///
/// Returns an error if Raft processing fails.
pub async fn run(mut self, cancellation_token: CancellationToken) -> Result<()> {
let mut timeout = self.raft_tick_interval;
let mut ticker = tokio::time::interval(self.raft_tick_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
let start = Instant::now();

select! {
message = self.rx.recv() => {
match message {
Expand Down Expand Up @@ -201,14 +200,12 @@ impl Node {
}
None => break,
}
timeout = timeout.saturating_sub(start.elapsed());
}

() = tokio::time::sleep(timeout) => {
_ = ticker.tick() => {
if self.raft_group.tick().await {
self.on_ready().await?;
}
timeout = self.raft_tick_interval;
}

() = cancellation_token.cancelled() => break,
Expand Down
12 changes: 9 additions & 3 deletions server/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use andross_server::log_storage::FjallStorage;
use andross_server::service::kv_service_client::KvServiceClient;
use andross_server::service::{CommandRequest, CommandResponse};
use andross_server::{
AddrConfig, AndrossConfig, Command, parse_uri, start_server, test_database,
test_database_from_state,
};
use bytes::Bytes;
use raft::storage::MemStorage;
use fjall::KeyspaceCreateOptions;
use std::collections::HashMap;
use std::time::Duration;
use tempfile::TempDir;
Expand Down Expand Up @@ -252,13 +253,18 @@ async fn start_servers(mut server_init_state: ServerInitState) -> Vec<ServerHand
(db, temp_dir)
}
};
let log_keyspace = db.keyspace("log", KeyspaceCreateOptions::default).unwrap();
let hard_state_keyspace = db
.keyspace("hard_state", KeyspaceCreateOptions::default)
.unwrap();
let log_storage = FjallStorage::new(db.clone(), log_keyspace, hard_state_keyspace).unwrap();
let config = AndrossConfig {
id: node_id,
addr_config: AddrConfig::TcpListener(listener),
peers,
raft_tick_interval: Duration::from_millis(1),
raft_tick_interval: Duration::from_millis(100),
default_request_timeout: Duration::from_secs(5),
log_storage: MemStorage::new(),
log_storage,
db,
cancellation_token: cancellation_token.clone(),
};
Expand Down
Loading