Skip to content

Change: Add: RaftStateMachine::save_snapshot() #1353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
snapshot_id,
};

let snapshot = StoredSnapshot {
meta: meta.clone(),
data: data.clone(),
let snapshot = Snapshot {
meta,
snapshot: Cursor::new(data),
};

{
let mut current_snapshot = self.current_snapshot.write().await;
*current_snapshot = Some(snapshot);
}

tracing::info!(snapshot_size, "log compaction complete");
self.save_snapshot(&snapshot).await?;

Ok(Snapshot {
meta,
snapshot: Cursor::new(data),
})
Ok(snapshot)
}
}

Expand Down Expand Up @@ -304,9 +296,25 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
*sm = new_sm;
}

// Update current snapshot.
let mut current_snapshot = self.current_snapshot.write().await;
*current_snapshot = Some(new_snapshot);
Ok(())
}

async fn save_snapshot(&mut self, snapshot: &Snapshot<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let new_snapshot = StoredSnapshot {
meta: snapshot.meta.clone(),
data: snapshot.snapshot.clone().into_inner(),
};

let mut current = self.current_snapshot.write().await;

// Only save it if the new snapshot contains more recent data than the current snapshot.

let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
if new_snapshot.meta.last_log_id <= current_last {
return Ok(());
}

*current = Some(new_snapshot);
Ok(())
}

Expand Down
39 changes: 29 additions & 10 deletions examples/raft-kv-memstore-grpc/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,42 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), StorageError> {
tracing::info!("install snapshot");

let new_snapshot = StoredSnapshot {
meta: meta.clone(),
data: snapshot,
};

// Update the state machine.
{
let d: pb::StateMachineData = prost::Message::decode(new_snapshot.data.as_ref())
.map_err(|e| StorageError::read_snapshot(None, &e))?;
let d: pb::StateMachineData =
prost::Message::decode(snapshot.as_ref()).map_err(|e| StorageError::read_snapshot(None, &e))?;

let mut state_machine = self.state_machine.lock().unwrap();
*state_machine = d;
}

// Update current snapshot.
let mut current_snapshot = self.current_snapshot.lock().unwrap();
*current_snapshot = Some(new_snapshot);
let snapshot = Snapshot {
meta: meta.clone(),
snapshot,
};

self.save_snapshot(&snapshot).await?;

Ok(())
}

async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
let new_snapshot = StoredSnapshot {
meta: snapshot.meta.clone(),
data: snapshot.snapshot.clone(),
};

let mut current = self.current_snapshot.lock().unwrap();

// Only save it if the new snapshot contains more recent data than the current snapshot.

let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
if new_snapshot.meta.last_log_id <= current_last {
return Ok(());
}

*current = Some(new_snapshot);

Ok(())
}

Expand Down
37 changes: 28 additions & 9 deletions examples/raft-kv-memstore-network-v2/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,40 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), StorageError> {
tracing::info!("install snapshot");

let new_snapshot = StoredSnapshot {
meta: meta.clone(),
data: snapshot,
};

// Update the state machine.
{
let updated_state_machine: StateMachineData = new_snapshot.data.clone();
let updated_state_machine: StateMachineData = snapshot.clone();
let mut state_machine = self.state_machine.lock().unwrap();
*state_machine = updated_state_machine;
}

// Update current snapshot.
let mut current_snapshot = self.current_snapshot.lock().unwrap();
*current_snapshot = Some(new_snapshot);
let snapshot = Snapshot {
meta: meta.clone(),
snapshot,
};

self.save_snapshot(&snapshot).await?;

Ok(())
}

async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
let new_snapshot = StoredSnapshot {
meta: snapshot.meta.clone(),
data: snapshot.snapshot.clone(),
};

let mut current = self.current_snapshot.lock().unwrap();

// Only save it if the new snapshot contains more recent data than the current snapshot.

let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
if new_snapshot.meta.last_log_id <= current_last {
return Ok(());
}

*current = Some(new_snapshot);

Ok(())
}

Expand Down
39 changes: 25 additions & 14 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
snapshot_id: snapshot_id.clone(),
};

let snapshot = StoredSnapshot {
meta: meta.clone(),
data: snapshot_id.clone(),
let snapshot = Snapshot {
meta,
snapshot: snapshot_id,
};

{
let mut current_snapshot = self.current_snapshot.lock().unwrap();
*current_snapshot = Some(snapshot);
}
self.save_snapshot(&snapshot).await?;

Ok(Snapshot {
meta,
snapshot: snapshot_id,
})
Ok(snapshot)
}
}

Expand Down Expand Up @@ -201,9 +195,26 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
*state_machine = updated_state_machine;
}

// Update current snapshot.
let mut current_snapshot = self.current_snapshot.lock().unwrap();
*current_snapshot = Some(new_snapshot);
Ok(())
}

async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
let new_snapshot = StoredSnapshot {
meta: snapshot.meta.clone(),
data: snapshot.snapshot.clone(),
};

let mut current = self.current_snapshot.lock().unwrap();

// Only save it if the new snapshot contains more recent data than the current snapshot.

let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);

if new_snapshot.meta.last_log_id <= current_last {
return Ok(());
}

*current = Some(new_snapshot);
Ok(())
}

Expand Down
39 changes: 25 additions & 14 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Rc<StateMachineStore> {
snapshot_id,
};

let snapshot = StoredSnapshot {
meta: meta.clone(),
data: data.clone(),
let snapshot = Snapshot {
meta,
snapshot: Cursor::new(data),
};

{
let mut current_snapshot = self.current_snapshot.borrow_mut();
*current_snapshot = Some(snapshot);
}
self.save_snapshot(&snapshot).await?;

Ok(Snapshot {
meta,
snapshot: Cursor::new(data),
})
Ok(snapshot)
}
}

Expand Down Expand Up @@ -238,9 +232,26 @@ impl RaftStateMachine<TypeConfig> for Rc<StateMachineStore> {
*state_machine = updated_state_machine;
}

// Update current snapshot.
let mut current_snapshot = self.current_snapshot.borrow_mut();
*current_snapshot = Some(new_snapshot);
Ok(())
}

async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
let new_snapshot = StoredSnapshot {
meta: snapshot.meta.clone(),
data: snapshot.snapshot.clone().into_inner(),
};

let mut current = self.current_snapshot.borrow_mut();

// Only save it if the new snapshot contains more recent data than the current snapshot.

let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
if new_snapshot.meta.last_log_id <= current_last {
return Ok(());
}

*current = Some(new_snapshot);

Ok(())
}

Expand Down
39 changes: 23 additions & 16 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
let last_applied_log = state_machine.last_applied_log;
let last_membership = state_machine.last_membership.clone();

// Lock the current snapshot before releasing the lock on the state machine, to avoid a race
// condition on the written snapshot
let mut current_snapshot = self.current_snapshot.write().await;
drop(state_machine);

let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed) + 1;
Expand All @@ -116,17 +113,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
snapshot_id,
};

let snapshot = StoredSnapshot {
let snapshot = Snapshot {
meta: meta.clone(),
data: data.clone(),
snapshot: Cursor::new(data.clone()),
};

*current_snapshot = Some(snapshot);
self.save_snapshot(&snapshot).await?;

Ok(Snapshot {
meta,
snapshot: Cursor::new(data),
})
Ok(snapshot)
}
}

Expand Down Expand Up @@ -203,13 +197,26 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
let mut state_machine = self.state_machine.write().await;
*state_machine = updated_state_machine;

// Lock the current snapshot before releasing the lock on the state machine, to avoid a race
// condition on the written snapshot
let mut current_snapshot = self.current_snapshot.write().await;
drop(state_machine);
Ok(())
}

async fn save_snapshot(&mut self, snapshot: &Snapshot<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let new_snapshot = StoredSnapshot {
meta: snapshot.meta.clone(),
data: snapshot.snapshot.clone().into_inner(),
};

let mut current = self.current_snapshot.write().await;

// Only save it if the new snapshot contains more recent data than the current snapshot.

let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
if new_snapshot.meta.last_log_id <= current_last {
return Ok(());
}

*current = Some(new_snapshot);

// Update current snapshot.
*current_snapshot = Some(new_snapshot);
Ok(())
}

Expand Down
Loading
Loading