A generic Raft consensus library built on openraft with gRPC transport.
- Generic State Machine: Define your own
Request,Response, and state machine types - gRPC Transport: Built-in node-to-node communication via tonic
- Unified Storage: Single redb storage backend supporting both persistent and in-memory modes
- Leader Forwarding: Automatic forwarding of write requests to the current leader
- Snapshot Support: Automatic snapshotting and recovery with configurable thresholds
Add to your Cargo.toml:
[dependencies]
mraft = { git = "https://github.com/MalteJ/mraft" }use mraft::{RaftNode, NodeConfig, StorageBackend, StateMachine};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
// Define your request and response types
#[derive(Clone, Debug, Serialize, Deserialize)]
enum Command {
Set { key: String, value: String },
Delete { key: String },
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
struct Response {
success: bool,
previous_value: Option<String>,
}
// Define your state machine
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
struct KvStore {
data: HashMap<String, String>,
}
impl StateMachine<Command, Response> for KvStore {
type Event = (); // No events needed for this example
fn apply(&mut self, cmd: Command) -> (Response, Vec<Self::Event>) {
match cmd {
Command::Set { key, value } => {
let prev = self.data.insert(key, value);
(Response { success: true, previous_value: prev }, vec![])
}
Command::Delete { key } => {
let prev = self.data.remove(&key);
(Response { success: true, previous_value: prev }, vec![])
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let config = NodeConfig::single(1, "127.0.0.1:5001".to_string());
let mut node: RaftNode<Command, Response, KvStore> = RaftNode::new(config).await?;
node.start().await?;
node.initialize_cluster().await?;
// Write data (replicates to all nodes)
let resp = node.write(Command::Set {
key: "hello".to_string(),
value: "world".to_string(),
}).await?;
println!("Response: {:?}", resp);
// Read state directly
let state = node.get_state().await;
println!("Value: {:?}", state.data.get("hello"));
Ok(())
}mraft uses redb for both persistent and in-memory storage:
use mraft::{NodeConfig, StorageBackend};
use std::path::PathBuf;
// In-memory (for testing)
let config = NodeConfig {
id: 1,
listen_addr: "127.0.0.1:5001".to_string(),
peers: Default::default(),
storage: StorageBackend::Memory,
raft_config: None,
};
// Persistent (for production)
let config = NodeConfig {
id: 1,
listen_addr: "127.0.0.1:5001".to_string(),
peers: Default::default(),
storage: StorageBackend::Persistent {
path: PathBuf::from("/var/lib/myapp/raft.db"),
},
raft_config: None,
};use std::collections::BTreeMap;
let config = NodeConfig {
id: 1,
listen_addr: "127.0.0.1:5001".to_string(),
peers: BTreeMap::from([
(2, "127.0.0.1:5002".to_string()),
(3, "127.0.0.1:5003".to_string()),
]),
storage: StorageBackend::Memory,
raft_config: None,
};
let mut node: RaftNode<Command, Response, KvStore> = RaftNode::new(config).await?;
node.start().await?;
node.initialize_cluster().await?; // Only on first node!let config = NodeConfig {
id: 2, // or 3
listen_addr: "127.0.0.1:5002".to_string(), // or 5003
peers: BTreeMap::from([
(1, "127.0.0.1:5001".to_string()),
(3, "127.0.0.1:5003".to_string()),
]),
storage: StorageBackend::Memory,
raft_config: None,
};
let mut node: RaftNode<Command, Response, KvStore> = RaftNode::new(config).await?;
node.start().await?;
// Don't call initialize_cluster() on joining nodes// Write to leader (fails if not leader)
let resp = node.write(command).await?;
// Forward to leader if not leader
let resp = node.write_or_forward(command).await?;
// Explicitly forward to leader
let resp = node.forward_to_leader(command).await?;// Check if this node is the leader
if node.is_leader() {
println!("I am the leader");
}
// Get current leader ID
if let Some(leader_id) = node.current_leader() {
println!("Leader is node {}", leader_id);
}
// Wait for leader election
let leader = node.wait_for_leader(Duration::from_secs(5)).await;
// Get Raft metrics
let metrics = node.metrics();
println!("Commit index: {:?}", metrics.last_applied);// Read current state (local, no consensus)
let state: KvStore = node.get_state().await;// Graceful shutdown
node.shutdown().await?;┌─────────────────────────────────────────┐
│ RaftNode<Req, Resp, S> │
├─────────────────────────────────────────┤
│ write() / write_or_forward() │ ← Client writes
├─────────────────────────────────────────┤
│ openraft::Raft │ ← Consensus
├─────────────────────────────────────────┤
│ GrpcNetwork │ ← Node-to-node RPCs
├─────────────────────────────────────────┤
│ Store<Req, Resp, S> │
│ ├── Log Storage (redb) │
│ └── StateMachine<Req, Resp> │ ← Your business logic
└─────────────────────────────────────────┘
- Rust 2024 edition (nightly or stable with edition = "2024")
- openraft 0.9
- tokio runtime
MIT