From 205b003064e1c8956d9a6bcf0e63015e48749f39 Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Fri, 4 Apr 2025 23:50:16 +0800 Subject: [PATCH] Change: Add `RaftStateMachine::save_snapshot()` method Previously, `RaftStateMachine::install_snapshot()` was responsible for two tasks: - Replacing the state machine with the given snapshot. - Persisting the snapshot to disk. This commit introduces a new method, `RaftStateMachine::save_snapshot()`, to handle the task of persisting snapshots. With this change: - `install_snapshot()` no longer needs to persist snapshots, though it can still do so without causing harm. - The responsibility for persisting snapshots is now clearly separated. - Part of #1333 Upgrade tip: - Implement snapshot persistence logic in the new `save_snapshot()` method. - It is recommended to remove snapshot persistence logic from `install_snapshot()` for better separation of concerns. --- cluster_benchmark/tests/benchmark/store.rs | 40 +++++---- .../raft-kv-memstore-grpc/src/store/mod.rs | 39 ++++++--- .../raft-kv-memstore-network-v2/src/store.rs | 37 ++++++-- .../src/store.rs | 39 ++++++--- .../src/store.rs | 39 ++++++--- examples/raft-kv-memstore/src/store/mod.rs | 39 +++++---- examples/raft-kv-rocksdb/src/store.rs | 77 +++++++++++++---- examples/rocksstore/src/lib.rs | 86 +++++++++++++------ openraft/src/core/sm/worker.rs | 14 ++- .../engine/handler/following_handler/mod.rs | 7 +- openraft/src/storage/v2/raft_state_machine.rs | 36 ++++++++ openraft/src/testing/log/suite.rs | 8 +- stores/memstore/src/lib.rs | 41 +++++---- .../client_api/t16_with_state_machine.rs | 4 + 14 files changed, 363 insertions(+), 143 deletions(-) diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index a3cf116c3..b00fe2603 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -162,22 +162,14 @@ impl RaftSnapshotBuilder for Arc { snapshot_id, }; - let snapshot = StoredSnapshot { - meta: meta.clone(), - data: data.clone(), + let snapshot = Snapshot { + meta, + snapshot: Cursor::new(data), }; - { - let mut current_snapshot = self.current_snapshot.write().await; - *current_snapshot = Some(snapshot); - } - - tracing::info!(snapshot_size, "log compaction complete"); + self.save_snapshot(&snapshot).await?; - Ok(Snapshot { - meta, - snapshot: Cursor::new(data), - }) + Ok(snapshot) } } @@ -304,9 +296,25 @@ impl RaftStateMachine for Arc { *sm = new_sm; } - // Update current snapshot. - let mut current_snapshot = self.current_snapshot.write().await; - *current_snapshot = Some(new_snapshot); + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let new_snapshot = StoredSnapshot { + meta: snapshot.meta.clone(), + data: snapshot.snapshot.clone().into_inner(), + }; + + let mut current = self.current_snapshot.write().await; + + // Only save it if the new snapshot contains more recent data than the current snapshot. + + let current_last = current.as_ref().and_then(|s| s.meta.last_log_id); + if new_snapshot.meta.last_log_id <= current_last { + return Ok(()); + } + + *current = Some(new_snapshot); Ok(()) } diff --git a/examples/raft-kv-memstore-grpc/src/store/mod.rs b/examples/raft-kv-memstore-grpc/src/store/mod.rs index a527d78a2..bb708741c 100644 --- a/examples/raft-kv-memstore-grpc/src/store/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/store/mod.rs @@ -141,23 +141,42 @@ impl RaftStateMachine for Arc { async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), StorageError> { tracing::info!("install snapshot"); - let new_snapshot = StoredSnapshot { - meta: meta.clone(), - data: snapshot, - }; - // Update the state machine. { - let d: pb::StateMachineData = prost::Message::decode(new_snapshot.data.as_ref()) - .map_err(|e| StorageError::read_snapshot(None, &e))?; + let d: pb::StateMachineData = + prost::Message::decode(snapshot.as_ref()).map_err(|e| StorageError::read_snapshot(None, &e))?; let mut state_machine = self.state_machine.lock().unwrap(); *state_machine = d; } - // Update current snapshot. - let mut current_snapshot = self.current_snapshot.lock().unwrap(); - *current_snapshot = Some(new_snapshot); + let snapshot = Snapshot { + meta: meta.clone(), + snapshot, + }; + + self.save_snapshot(&snapshot).await?; + + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let new_snapshot = StoredSnapshot { + meta: snapshot.meta.clone(), + data: snapshot.snapshot.clone(), + }; + + let mut current = self.current_snapshot.lock().unwrap(); + + // Only save it if the new snapshot contains more recent data than the current snapshot. + + let current_last = current.as_ref().and_then(|s| s.meta.last_log_id); + if new_snapshot.meta.last_log_id <= current_last { + return Ok(()); + } + + *current = Some(new_snapshot); + Ok(()) } diff --git a/examples/raft-kv-memstore-network-v2/src/store.rs b/examples/raft-kv-memstore-network-v2/src/store.rs index 40405200d..299393943 100644 --- a/examples/raft-kv-memstore-network-v2/src/store.rs +++ b/examples/raft-kv-memstore-network-v2/src/store.rs @@ -165,21 +165,40 @@ impl RaftStateMachine for Arc { async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), StorageError> { tracing::info!("install snapshot"); - let new_snapshot = StoredSnapshot { - meta: meta.clone(), - data: snapshot, - }; - // Update the state machine. { - let updated_state_machine: StateMachineData = new_snapshot.data.clone(); + let updated_state_machine: StateMachineData = snapshot.clone(); let mut state_machine = self.state_machine.lock().unwrap(); *state_machine = updated_state_machine; } - // Update current snapshot. - let mut current_snapshot = self.current_snapshot.lock().unwrap(); - *current_snapshot = Some(new_snapshot); + let snapshot = Snapshot { + meta: meta.clone(), + snapshot, + }; + + self.save_snapshot(&snapshot).await?; + + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let new_snapshot = StoredSnapshot { + meta: snapshot.meta.clone(), + data: snapshot.snapshot.clone(), + }; + + let mut current = self.current_snapshot.lock().unwrap(); + + // Only save it if the new snapshot contains more recent data than the current snapshot. + + let current_last = current.as_ref().and_then(|s| s.meta.last_log_id); + if new_snapshot.meta.last_log_id <= current_last { + return Ok(()); + } + + *current = Some(new_snapshot); + Ok(()) } diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs index 24623a1cd..8207d37e9 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs @@ -123,20 +123,14 @@ impl RaftSnapshotBuilder for Arc { snapshot_id: snapshot_id.clone(), }; - let snapshot = StoredSnapshot { - meta: meta.clone(), - data: snapshot_id.clone(), + let snapshot = Snapshot { + meta, + snapshot: snapshot_id, }; - { - let mut current_snapshot = self.current_snapshot.lock().unwrap(); - *current_snapshot = Some(snapshot); - } + self.save_snapshot(&snapshot).await?; - Ok(Snapshot { - meta, - snapshot: snapshot_id, - }) + Ok(snapshot) } } @@ -201,9 +195,26 @@ impl RaftStateMachine for Arc { *state_machine = updated_state_machine; } - // Update current snapshot. - let mut current_snapshot = self.current_snapshot.lock().unwrap(); - *current_snapshot = Some(new_snapshot); + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let new_snapshot = StoredSnapshot { + meta: snapshot.meta.clone(), + data: snapshot.snapshot.clone(), + }; + + let mut current = self.current_snapshot.lock().unwrap(); + + // Only save it if the new snapshot contains more recent data than the current snapshot. + + let current_last = current.as_ref().and_then(|s| s.meta.last_log_id); + + if new_snapshot.meta.last_log_id <= current_last { + return Ok(()); + } + + *current = Some(new_snapshot); Ok(()) } diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs index a5a3c6e62..f5620d6f0 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/store.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -157,20 +157,14 @@ impl RaftSnapshotBuilder for Rc { snapshot_id, }; - let snapshot = StoredSnapshot { - meta: meta.clone(), - data: data.clone(), + let snapshot = Snapshot { + meta, + snapshot: Cursor::new(data), }; - { - let mut current_snapshot = self.current_snapshot.borrow_mut(); - *current_snapshot = Some(snapshot); - } + self.save_snapshot(&snapshot).await?; - Ok(Snapshot { - meta, - snapshot: Cursor::new(data), - }) + Ok(snapshot) } } @@ -238,9 +232,26 @@ impl RaftStateMachine for Rc { *state_machine = updated_state_machine; } - // Update current snapshot. - let mut current_snapshot = self.current_snapshot.borrow_mut(); - *current_snapshot = Some(new_snapshot); + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let new_snapshot = StoredSnapshot { + meta: snapshot.meta.clone(), + data: snapshot.snapshot.clone().into_inner(), + }; + + let mut current = self.current_snapshot.borrow_mut(); + + // Only save it if the new snapshot contains more recent data than the current snapshot. + + let current_last = current.as_ref().and_then(|s| s.meta.last_log_id); + if new_snapshot.meta.last_log_id <= current_last { + return Ok(()); + } + + *current = Some(new_snapshot); + Ok(()) } diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index 72182a2ec..8b4961b48 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -98,9 +98,6 @@ impl RaftSnapshotBuilder for Arc { let last_applied_log = state_machine.last_applied_log; let last_membership = state_machine.last_membership.clone(); - // Lock the current snapshot before releasing the lock on the state machine, to avoid a race - // condition on the written snapshot - let mut current_snapshot = self.current_snapshot.write().await; drop(state_machine); let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed) + 1; @@ -116,17 +113,14 @@ impl RaftSnapshotBuilder for Arc { snapshot_id, }; - let snapshot = StoredSnapshot { + let snapshot = Snapshot { meta: meta.clone(), - data: data.clone(), + snapshot: Cursor::new(data.clone()), }; - *current_snapshot = Some(snapshot); + self.save_snapshot(&snapshot).await?; - Ok(Snapshot { - meta, - snapshot: Cursor::new(data), - }) + Ok(snapshot) } } @@ -203,13 +197,26 @@ impl RaftStateMachine for Arc { let mut state_machine = self.state_machine.write().await; *state_machine = updated_state_machine; - // Lock the current snapshot before releasing the lock on the state machine, to avoid a race - // condition on the written snapshot - let mut current_snapshot = self.current_snapshot.write().await; - drop(state_machine); + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let new_snapshot = StoredSnapshot { + meta: snapshot.meta.clone(), + data: snapshot.snapshot.clone().into_inner(), + }; + + let mut current = self.current_snapshot.write().await; + + // Only save it if the new snapshot contains more recent data than the current snapshot. + + let current_last = current.as_ref().and_then(|s| s.meta.last_log_id); + if new_snapshot.meta.last_log_id <= current_last { + return Ok(()); + } + + *current = Some(new_snapshot); - // Update current snapshot. - *current_snapshot = Some(new_snapshot); Ok(()) } diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 3340a627d..439bc8040 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -13,6 +13,8 @@ use openraft::RaftSnapshotBuilder; use openraft_rocksstore::log_store::RocksLogStore; use rocksdb::ColumnFamily; use rocksdb::ColumnFamilyDescriptor; +use rocksdb::Direction; +use rocksdb::IteratorMode; use rocksdb::Options; use rocksdb::DB; use serde::Deserialize; @@ -100,17 +102,14 @@ impl RaftSnapshotBuilder for StateMachineStore { snapshot_id, }; - let snapshot = StoredSnapshot { - meta: meta.clone(), - data: kv_json.clone(), + let snapshot = Snapshot { + meta, + snapshot: Cursor::new(kv_json), }; - self.set_current_snapshot_(snapshot)?; + self.save_snapshot(&snapshot).await?; - Ok(Snapshot { - meta, - snapshot: Cursor::new(kv_json), - }) + Ok(snapshot) } } @@ -146,18 +145,49 @@ impl StateMachineStore { Ok(()) } + /// List all snapshots in the db, returns the last fn get_current_snapshot_(&self) -> StorageResult> { - Ok(self - .db - .get_cf(self.store(), b"snapshot") - .map_err(|e| StorageError::read(&e))? - .and_then(|v| serde_json::from_slice(&v).ok())) + let mut last_snapshot_key = None; + + let it = self.db.iterator_cf(self.store(), IteratorMode::From(b"snapshot-", Direction::Forward)); + + for kv in it { + let (key, _value) = kv.map_err(|e| StorageError::read(&e))?; + if key.starts_with(b"snapshot-") { + last_snapshot_key = Some(key.to_vec()); + } else { + break; + } + } + let Some(key) = last_snapshot_key else { + return Ok(None); + }; + + let data = self.db.get_cf(self.store(), &key).map_err(|e| StorageError::read(&e))?.unwrap(); + + let snap: StoredSnapshot = serde_json::from_slice(&data).map_err(|e| StorageError::read_snapshot(None, &e))?; + + Ok(Some(snap)) } + /// Save snapshot by last-log-id and when reading, get the last one as the current. + /// + /// So that writing an old one won't overwrite the newer one. + /// In a real world application, the old ones should be cleaned. fn set_current_snapshot_(&self, snap: StoredSnapshot) -> StorageResult<()> { + let last_log_id = snap.meta.last_log_id.as_ref(); + + let id_str = Self::order_preserved_log_id_string(last_log_id); + + let key = format!("snapshot-{id_str}"); self.db - .put_cf(self.store(), b"snapshot", serde_json::to_vec(&snap).unwrap().as_slice()) + .put_cf( + self.store(), + key.as_bytes(), + serde_json::to_vec(&snap).unwrap().as_slice(), + ) .map_err(|e| StorageError::write_snapshot(Some(snap.meta.signature()), &e))?; + self.flush(ErrorSubject::Snapshot(Some(snap.meta.signature())), ErrorVerb::Write)?; Ok(()) } @@ -170,6 +200,14 @@ impl StateMachineStore { fn store(&self) -> &ColumnFamily { self.db.cf_handle("store").unwrap() } + + fn order_preserved_log_id_string(log_id: Option<&LogId>) -> String { + let term = log_id.map(|l| l.committed_leader_id().term).unwrap_or_default(); + let node_id = log_id.map(|l| l.committed_leader_id().node_id).unwrap_or_default(); + let index = log_id.map(|l| l.index).unwrap_or_default(); + + format!("{:020}-{:020}-{:020}", term, node_id, index) + } } impl RaftStateMachine for StateMachineStore { @@ -227,7 +265,16 @@ impl RaftStateMachine for StateMachineStore { data: snapshot.into_inner(), }; - self.update_state_machine_(new_snapshot.clone()).await?; + self.update_state_machine_(new_snapshot).await?; + + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let new_snapshot = StoredSnapshot { + meta: snapshot.meta.clone(), + data: snapshot.snapshot.clone().into_inner(), + }; self.set_current_snapshot_(new_snapshot)?; diff --git a/examples/rocksstore/src/lib.rs b/examples/rocksstore/src/lib.rs index 22d9b8700..41838524f 100644 --- a/examples/rocksstore/src/lib.rs +++ b/examples/rocksstore/src/lib.rs @@ -32,6 +32,8 @@ use openraft::StorageError; use openraft::StoredMembership; use rand::Rng; use rocksdb::ColumnFamilyDescriptor; +use rocksdb::Direction; +use rocksdb::IteratorMode; use rocksdb::Options; use rocksdb::DB; use serde::Deserialize; @@ -115,6 +117,14 @@ impl RocksStateMachine { state_machine } + + fn order_preserved_log_id_string(log_id: Option<&LogId>) -> String { + let term = log_id.map(|l| l.committed_leader_id().term).unwrap_or_default(); + let node_id = log_id.map(|l| l.committed_leader_id().node_id).unwrap_or_default(); + let index = log_id.map(|l| l.index).unwrap_or_default(); + + format!("{:020}-{:020}-{:020}", term, node_id, index) + } } impl RaftSnapshotBuilder for RocksStateMachine { @@ -141,22 +151,14 @@ impl RaftSnapshotBuilder for RocksStateMachine { snapshot_id, }; - let snapshot = RocksSnapshot { + let snapshot = Snapshot { meta: meta.clone(), - data: data.clone(), + snapshot: Cursor::new(data), }; - let serialized_snapshot = serde_json::to_vec(&snapshot) - .map_err(|e| StorageError::write_snapshot(Some(meta.signature()), AnyError::new(&e)))?; + self.save_snapshot(&snapshot).await?; - self.db - .put_cf(self.db.cf_handle("sm_meta").unwrap(), "snapshot", serialized_snapshot) - .map_err(|e| StorageError::write_snapshot(Some(meta.signature()), AnyError::new(&e)))?; - - Ok(Snapshot { - meta, - snapshot: Cursor::new(data), - }) + Ok(snapshot) } } @@ -229,13 +231,33 @@ impl RaftStateMachine for RocksStateMachine { self.sm = updated_state_machine; - // Save snapshot + self.db.flush_wal(true).map_err(|e| StorageError::write_snapshot(Some(meta.signature()), &e))?; + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let last_log_id = snapshot.meta.last_log_id.as_ref(); + + let id_str = Self::order_preserved_log_id_string(last_log_id); + + let key = format!("snapshot-{id_str}"); + + let meta = snapshot.meta.clone(); + + let new_snapshot = RocksSnapshot { + meta: meta.clone(), + data: snapshot.snapshot.clone().into_inner(), + }; let serialized_snapshot = serde_json::to_vec(&new_snapshot) .map_err(|e| StorageError::write_snapshot(Some(meta.signature()), AnyError::new(&e)))?; self.db - .put_cf(self.db.cf_handle("sm_meta").unwrap(), "snapshot", serialized_snapshot) + .put_cf( + self.db.cf_handle("sm_meta").unwrap(), + key.as_bytes(), + serialized_snapshot, + ) .map_err(|e| StorageError::write_snapshot(Some(meta.signature()), AnyError::new(&e)))?; self.db.flush_wal(true).map_err(|e| StorageError::write_snapshot(Some(meta.signature()), &e))?; @@ -243,24 +265,36 @@ impl RaftStateMachine for RocksStateMachine { } async fn get_current_snapshot(&mut self) -> Result>, StorageError> { - let x = self - .db - .get_cf(self.db.cf_handle("sm_meta").unwrap(), "snapshot") - .map_err(|e| StorageError::write_snapshot(None, AnyError::new(&e)))?; + let mut last_snapshot_key = None; + + let it = self.db.iterator_cf( + self.db.cf_handle("sm_meta").unwrap(), + IteratorMode::From(b"snapshot-", Direction::Forward), + ); - let bytes = match x { - Some(x) => x, - None => return Ok(None), + for kv in it { + let (key, _value) = kv.map_err(|e| StorageError::read(&e))?; + if key.starts_with(b"snapshot-") { + last_snapshot_key = Some(key.to_vec()); + } else { + break; + } + } + let Some(key) = last_snapshot_key else { + return Ok(None); }; - let snapshot: RocksSnapshot = - serde_json::from_slice(&bytes).map_err(|e| StorageError::write_snapshot(None, AnyError::new(&e)))?; + let data = self + .db + .get_cf(self.db.cf_handle("sm_meta").unwrap(), &key) + .map_err(|e| StorageError::read(&e))? + .unwrap(); - let data = snapshot.data.clone(); + let snap: RocksSnapshot = serde_json::from_slice(&data).map_err(|e| StorageError::read_snapshot(None, &e))?; Ok(Some(Snapshot { - meta: snapshot.meta, - snapshot: Cursor::new(data), + meta: snap.meta, + snapshot: Cursor::new(snap.data), })) } } diff --git a/openraft/src/core/sm/worker.rs b/openraft/src/core/sm/worker.rs index d55beef0b..5e8c78771 100644 --- a/openraft/src/core/sm/worker.rs +++ b/openraft/src/core/sm/worker.rs @@ -124,7 +124,19 @@ where // GetSnapshot does not respond to RaftCore } Command::InstallFullSnapshot { io_id, snapshot } => { - tracing::info!("{}: install complete snapshot", func_name!()); + tracing::info!( + "{}: install snapshot step-1(save): save snapshot: {}", + func_name!(), + snapshot.meta + ); + + self.state_machine.save_snapshot(&snapshot).await?; + + tracing::info!( + "{}: install snapshot step-2(install): replace state machine with snapshot: {}", + func_name!(), + snapshot.meta + ); let meta = snapshot.meta.clone(); self.state_machine.install_snapshot(&meta, snapshot.snapshot).await?; diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index c7a5dc6c3..038076af2 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -282,12 +282,13 @@ where C: RaftTypeConfig tracing::info!("install_full_snapshot: meta:{:?}", meta); let snap_last_log_id = meta.last_log_id.clone(); + let committed = self.state.committed().cloned(); - if snap_last_log_id.as_ref() <= self.state.committed() { + if snap_last_log_id <= committed { tracing::info!( "No need to install snapshot; snapshot last_log_id({}) <= committed({})", snap_last_log_id.display(), - self.state.committed().display() + committed.display() ); return None; } @@ -309,7 +310,7 @@ where C: RaftTypeConfig if let Some(local) = local { if local != snap_last_log_id { // Conflict, delete all non-committed logs. - self.truncate_logs(self.state.committed().next_index()); + self.truncate_logs(committed.next_index()); } } diff --git a/openraft/src/storage/v2/raft_state_machine.rs b/openraft/src/storage/v2/raft_state_machine.rs index c2886976c..a19b5f65d 100644 --- a/openraft/src/storage/v2/raft_state_machine.rs +++ b/openraft/src/storage/v2/raft_state_machine.rs @@ -101,6 +101,11 @@ where C: RaftTypeConfig /// /// A snapshot created from an earlier call to `begin_receiving_snapshot` which provided the /// snapshot. + #[since( + version = "0.10.0", + date = "2025-04-04", + change = "no need to save snapshot in this method, implement [`Self::save_snapshot`] instead" + )] #[since(version = "0.10.0", change = "SnapshotData without Box")] async fn install_snapshot( &mut self, @@ -108,6 +113,37 @@ where C: RaftTypeConfig snapshot: C::SnapshotData, ) -> Result<(), StorageError>; + /// Inform state machine to save a snapshot. + /// + /// The saved snapshot should then be retrievable with [`Self::get_current_snapshot`]. + /// + /// This method is called when a snapshot has a higher last-log-id than the current one but + /// smaller than the last-applied-log-id of the state machine. In this case, the state machine + /// only needs to persist the provided new snapshot, without replacing the state machine's + /// current state(without calling [`Self::install_snapshot`]). + /// + /// ### When this method is called + /// + /// When a snapshot needs to be both saved and used to replace the state machine, OpenRaft will: + /// - First call this method to persist the snapshot + /// - Then call [`Self::install_snapshot`] to replace the state machine with the new snapshot + /// + /// When a snapshot needs to be saved but is not newer than the state machine (e.g., when a + /// leader replicates an already built snapshot to a follower with an up-to-date state machine): + /// - Only this method will be called + /// + /// ### Implementation notes + /// + /// Before this method was added, [`Self::install_snapshot`] was responsible to replace the + /// state machine and to save the snapshot. + /// + /// By implementing this method, [`Self::install_snapshot`] no longer needs to save the + /// snapshot, since OpenRaft will always call this method first, then + /// [`Self::install_snapshot`] when a snapshot needs to be both saved and used to replace + /// the state machine. + #[since(version = "0.10.0")] + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError>; + /// Get a readable handle to the current snapshot. /// /// ### implementation algorithm diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index b90e98358..86604c632 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -1333,15 +1333,19 @@ where ); tracing::info!("--- install snapshot on follower state machine"); - sm_f.install_snapshot(&ss1_cur.meta, ss1_cur.snapshot).await?; + sm_f.save_snapshot(&ss1_cur).await?; - tracing::info!("--- check correctness of installed snapshot"); + tracing::info!("--- check correctness of saved snapshot"); // ... by requesting whole snapshot let ss2 = sm_f.get_current_snapshot().await?.expect("uninitialized snapshot"); assert_eq!( ss2.meta, ss1.meta, "snapshot metadata not updated correctly on follower sm" ); + + // Replace the state machine with the snapshot + sm_f.install_snapshot(&ss1_cur.meta, ss1_cur.snapshot).await?; + // ... by checking smstore state assert_eq!(sm_f.applied_state().await?, snapshot_applied_state); Ok(()) diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index 18cc353a0..345a4b93f 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -285,8 +285,6 @@ impl RaftSnapshotBuilder for Arc { } } - let snapshot_size = data.len(); - let snapshot_idx = { let mut l = self.snapshot_idx.lock().unwrap(); *l += 1; @@ -305,22 +303,14 @@ impl RaftSnapshotBuilder for Arc { snapshot_id, }; - let snapshot = MemStoreSnapshot { + let snapshot = Snapshot { meta: meta.clone(), - data: data.clone(), + snapshot: Cursor::new(data.clone()), }; - { - let mut current_snapshot = self.current_snapshot.write().await; - *current_snapshot = Some(snapshot); - } + self.save_snapshot(&snapshot).await?; - tracing::info!(snapshot_size, "log compaction complete"); - - Ok(Snapshot { - meta, - snapshot: Cursor::new(data), - }) + Ok(snapshot) } } @@ -524,9 +514,26 @@ impl RaftStateMachine for Arc { *sm = new_sm; } - // Update current snapshot. - let mut current_snapshot = self.current_snapshot.write().await; - *current_snapshot = Some(new_snapshot); + Ok(()) + } + + async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> { + let stored = MemStoreSnapshot { + meta: snapshot.meta.clone(), + data: snapshot.snapshot.clone().into_inner(), + }; + + let mut current = self.current_snapshot.write().await; + + // Only save it if the new snapshot contains more recent data than the current snapshot. + + let current_last = current.as_ref().and_then(|s| s.meta.last_log_id); + if stored.meta.last_log_id <= current_last { + return Ok(()); + } + + *current = Some(stored); + Ok(()) } diff --git a/tests/tests/client_api/t16_with_state_machine.rs b/tests/tests/client_api/t16_with_state_machine.rs index 8ae60e542..592ca4c71 100644 --- a/tests/tests/client_api/t16_with_state_machine.rs +++ b/tests/tests/client_api/t16_with_state_machine.rs @@ -126,6 +126,10 @@ async fn with_state_machine_wrong_sm_type() -> Result<()> { todo!() } + async fn save_snapshot(&mut self, _snapshot: &Snapshot) -> Result<(), StorageError> { + todo!() + } + async fn get_current_snapshot(&mut self) -> Result>, Err> { todo!() }