Skip to content

Commit 6f419d1

Browse files
committed
Change: Add: RaftStateMachine::save_snapshot()
Before this commit, `RaftStateMachine::install_snapshot()` is responsible to 1) replace the state machine with the provided snapshot and 2) persist the snapshot on disk. Since this commit, a new method `RaftStateMachine::save_snapshot()` is responsible to persist a snapshot. `install_snapshot()` does not have to do this. But doing this has no harm. - Part of #1333 Upgrade tip: Implement persisting snapshot in `save_snapshot()` and it is recommended to remove persisting snpashot from `install_snapshot()`.
1 parent 50cb1cc commit 6f419d1

File tree

12 files changed

+307
-124
lines changed

12 files changed

+307
-124
lines changed

Diff for: cluster_benchmark/tests/benchmark/store.rs

+24-16
Original file line numberDiff line numberDiff line change
@@ -162,22 +162,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
162162
snapshot_id,
163163
};
164164

165-
let snapshot = StoredSnapshot {
166-
meta: meta.clone(),
167-
data: data.clone(),
165+
let snapshot = Snapshot {
166+
meta,
167+
snapshot: Cursor::new(data),
168168
};
169169

170-
{
171-
let mut current_snapshot = self.current_snapshot.write().await;
172-
*current_snapshot = Some(snapshot);
173-
}
174-
175-
tracing::info!(snapshot_size, "log compaction complete");
170+
self.save_snapshot(&snapshot).await?;
176171

177-
Ok(Snapshot {
178-
meta,
179-
snapshot: Cursor::new(data),
180-
})
172+
Ok(snapshot)
181173
}
182174
}
183175

@@ -304,9 +296,25 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
304296
*sm = new_sm;
305297
}
306298

307-
// Update current snapshot.
308-
let mut current_snapshot = self.current_snapshot.write().await;
309-
*current_snapshot = Some(new_snapshot);
299+
Ok(())
300+
}
301+
302+
async fn save_snapshot(&mut self, snapshot: &Snapshot<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
303+
let new_snapshot = StoredSnapshot {
304+
meta: snapshot.meta.clone(),
305+
data: snapshot.snapshot.clone().into_inner(),
306+
};
307+
308+
let mut current = self.current_snapshot.write().await;
309+
310+
// Only save it if the new snapshot contains more recent data than the current snapshot.
311+
312+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id.clone());
313+
if new_snapshot.meta.last_log_id <= current_last {
314+
return Ok(());
315+
}
316+
317+
*current = Some(new_snapshot);
310318
Ok(())
311319
}
312320

Diff for: examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs

+25-14
Original file line numberDiff line numberDiff line change
@@ -123,20 +123,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
123123
snapshot_id: snapshot_id.clone(),
124124
};
125125

126-
let snapshot = StoredSnapshot {
127-
meta: meta.clone(),
128-
data: snapshot_id.clone(),
126+
let snapshot = Snapshot {
127+
meta,
128+
snapshot: snapshot_id,
129129
};
130130

131-
{
132-
let mut current_snapshot = self.current_snapshot.lock().unwrap();
133-
*current_snapshot = Some(snapshot);
134-
}
131+
self.save_snapshot(&snapshot).await?;
135132

136-
Ok(Snapshot {
137-
meta,
138-
snapshot: snapshot_id,
139-
})
133+
Ok(snapshot)
140134
}
141135
}
142136

@@ -201,9 +195,26 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
201195
*state_machine = updated_state_machine;
202196
}
203197

204-
// Update current snapshot.
205-
let mut current_snapshot = self.current_snapshot.lock().unwrap();
206-
*current_snapshot = Some(new_snapshot);
198+
Ok(())
199+
}
200+
201+
async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
202+
let new_snapshot = StoredSnapshot {
203+
meta: snapshot.meta.clone(),
204+
data: snapshot.snapshot.clone(),
205+
};
206+
207+
let mut current = self.current_snapshot.lock().unwrap();
208+
209+
// Only save it if the new snapshot contains more recent data than the current snapshot.
210+
211+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id.clone());
212+
213+
if new_snapshot.meta.last_log_id <= current_last {
214+
return Ok(());
215+
}
216+
217+
*current = Some(new_snapshot);
207218
Ok(())
208219
}
209220

Diff for: examples/raft-kv-memstore-singlethreaded/src/store.rs

+25-14
Original file line numberDiff line numberDiff line change
@@ -157,20 +157,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Rc<StateMachineStore> {
157157
snapshot_id,
158158
};
159159

160-
let snapshot = StoredSnapshot {
161-
meta: meta.clone(),
162-
data: data.clone(),
160+
let snapshot = Snapshot {
161+
meta,
162+
snapshot: Cursor::new(data),
163163
};
164164

165-
{
166-
let mut current_snapshot = self.current_snapshot.borrow_mut();
167-
*current_snapshot = Some(snapshot);
168-
}
165+
self.save_snapshot(&snapshot).await?;
169166

170-
Ok(Snapshot {
171-
meta,
172-
snapshot: Cursor::new(data),
173-
})
167+
Ok(snapshot)
174168
}
175169
}
176170

@@ -238,9 +232,26 @@ impl RaftStateMachine<TypeConfig> for Rc<StateMachineStore> {
238232
*state_machine = updated_state_machine;
239233
}
240234

241-
// Update current snapshot.
242-
let mut current_snapshot = self.current_snapshot.borrow_mut();
243-
*current_snapshot = Some(new_snapshot);
235+
Ok(())
236+
}
237+
238+
async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
239+
let new_snapshot = StoredSnapshot {
240+
meta: snapshot.meta.clone(),
241+
data: snapshot.snapshot.clone().into_inner(),
242+
};
243+
244+
let mut current = self.current_snapshot.borrow_mut();
245+
246+
// Only save it if the new snapshot contains more recent data than the current snapshot.
247+
248+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id.clone());
249+
if new_snapshot.meta.last_log_id <= current_last {
250+
return Ok(());
251+
}
252+
253+
*current = Some(new_snapshot);
254+
244255
Ok(())
245256
}
246257

Diff for: examples/raft-kv-memstore/src/store/mod.rs

+23-16
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,6 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
9898
let last_applied_log = state_machine.last_applied_log;
9999
let last_membership = state_machine.last_membership.clone();
100100

101-
// Lock the current snapshot before releasing the lock on the state machine, to avoid a race
102-
// condition on the written snapshot
103-
let mut current_snapshot = self.current_snapshot.write().await;
104101
drop(state_machine);
105102

106103
let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed) + 1;
@@ -116,17 +113,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
116113
snapshot_id,
117114
};
118115

119-
let snapshot = StoredSnapshot {
116+
let snapshot = Snapshot {
120117
meta: meta.clone(),
121-
data: data.clone(),
118+
snapshot: Cursor::new(data.clone()),
122119
};
123120

124-
*current_snapshot = Some(snapshot);
121+
self.save_snapshot(&snapshot).await?;
125122

126-
Ok(Snapshot {
127-
meta,
128-
snapshot: Cursor::new(data),
129-
})
123+
Ok(snapshot)
130124
}
131125
}
132126

@@ -203,13 +197,26 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
203197
let mut state_machine = self.state_machine.write().await;
204198
*state_machine = updated_state_machine;
205199

206-
// Lock the current snapshot before releasing the lock on the state machine, to avoid a race
207-
// condition on the written snapshot
208-
let mut current_snapshot = self.current_snapshot.write().await;
209-
drop(state_machine);
200+
Ok(())
201+
}
202+
203+
async fn save_snapshot(&mut self, snapshot: &Snapshot<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
204+
let new_snapshot = StoredSnapshot {
205+
meta: snapshot.meta.clone(),
206+
data: snapshot.snapshot.clone().into_inner(),
207+
};
208+
209+
let mut current = self.current_snapshot.write().await;
210+
211+
// Only save it if the new snapshot contains more recent data than the current snapshot.
212+
213+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id.clone());
214+
if new_snapshot.meta.last_log_id <= current_last {
215+
return Ok(());
216+
}
217+
218+
*current = Some(new_snapshot);
210219

211-
// Update current snapshot.
212-
*current_snapshot = Some(new_snapshot);
213220
Ok(())
214221
}
215222

Diff for: examples/raft-kv-rocksdb/src/store.rs

+62-15
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use openraft::RaftSnapshotBuilder;
1313
use openraft_rocksstore::log_store::RocksLogStore;
1414
use rocksdb::ColumnFamily;
1515
use rocksdb::ColumnFamilyDescriptor;
16+
use rocksdb::Direction;
17+
use rocksdb::IteratorMode;
1618
use rocksdb::Options;
1719
use rocksdb::DB;
1820
use serde::Deserialize;
@@ -100,17 +102,14 @@ impl RaftSnapshotBuilder<TypeConfig> for StateMachineStore {
100102
snapshot_id,
101103
};
102104

103-
let snapshot = StoredSnapshot {
104-
meta: meta.clone(),
105-
data: kv_json.clone(),
105+
let snapshot = Snapshot {
106+
meta,
107+
snapshot: Cursor::new(kv_json),
106108
};
107109

108-
self.set_current_snapshot_(snapshot)?;
110+
self.save_snapshot(&snapshot).await?;
109111

110-
Ok(Snapshot {
111-
meta,
112-
snapshot: Cursor::new(kv_json),
113-
})
112+
Ok(snapshot)
114113
}
115114
}
116115

@@ -146,18 +145,49 @@ impl StateMachineStore {
146145
Ok(())
147146
}
148147

148+
/// List all snapshots in the db, returns the last
149149
fn get_current_snapshot_(&self) -> StorageResult<Option<StoredSnapshot>> {
150-
Ok(self
151-
.db
152-
.get_cf(self.store(), b"snapshot")
153-
.map_err(|e| StorageError::read(&e))?
154-
.and_then(|v| serde_json::from_slice(&v).ok()))
150+
let mut last_snapshot_key = None;
151+
152+
let it = self.db.iterator_cf(self.store(), IteratorMode::From(b"snapshot-", Direction::Forward));
153+
154+
for kv in it {
155+
let (key, _value) = kv.map_err(|e| StorageError::read(&e))?;
156+
if key.starts_with(b"snapshot-") {
157+
last_snapshot_key = Some(key.to_vec());
158+
} else {
159+
break;
160+
}
161+
}
162+
let Some(key) = last_snapshot_key else {
163+
return Ok(None);
164+
};
165+
166+
let data = self.db.get_cf(self.store(), &key).map_err(|e| StorageError::read(&e))?.unwrap();
167+
168+
let snap: StoredSnapshot = serde_json::from_slice(&data).map_err(|e| StorageError::read_snapshot(None, &e))?;
169+
170+
Ok(Some(snap))
155171
}
156172

173+
/// Save snapshot by last-log-id and when reading, get the last one as the current.
174+
///
175+
/// So that writing an old one won't overwrite the newer one.
176+
/// In a real world application, the old ones should be cleaned.
157177
fn set_current_snapshot_(&self, snap: StoredSnapshot) -> StorageResult<()> {
178+
let last_log_id = snap.meta.last_log_id.as_ref();
179+
180+
let id_str = Self::order_preserved_log_id_string(last_log_id);
181+
182+
let key = format!("snapshot-{id_str}");
158183
self.db
159-
.put_cf(self.store(), b"snapshot", serde_json::to_vec(&snap).unwrap().as_slice())
184+
.put_cf(
185+
self.store(),
186+
key.as_bytes(),
187+
serde_json::to_vec(&snap).unwrap().as_slice(),
188+
)
160189
.map_err(|e| StorageError::write_snapshot(Some(snap.meta.signature()), &e))?;
190+
161191
self.flush(ErrorSubject::Snapshot(Some(snap.meta.signature())), ErrorVerb::Write)?;
162192
Ok(())
163193
}
@@ -170,6 +200,14 @@ impl StateMachineStore {
170200
fn store(&self) -> &ColumnFamily {
171201
self.db.cf_handle("store").unwrap()
172202
}
203+
204+
fn order_preserved_log_id_string(log_id: Option<&LogId>) -> String {
205+
let term = log_id.map(|l| l.committed_leader_id().term).unwrap_or_default();
206+
let node_id = log_id.map(|l| l.committed_leader_id().node_id).unwrap_or_default();
207+
let index = log_id.map(|l| l.index).unwrap_or_default();
208+
209+
format!("{:020}-{:020}-{:020}", term, node_id, index)
210+
}
173211
}
174212

175213
impl RaftStateMachine<TypeConfig> for StateMachineStore {
@@ -227,7 +265,16 @@ impl RaftStateMachine<TypeConfig> for StateMachineStore {
227265
data: snapshot.into_inner(),
228266
};
229267

230-
self.update_state_machine_(new_snapshot.clone()).await?;
268+
self.update_state_machine_(new_snapshot).await?;
269+
270+
Ok(())
271+
}
272+
273+
async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
274+
let new_snapshot = StoredSnapshot {
275+
meta: snapshot.meta.clone(),
276+
data: snapshot.snapshot.clone().into_inner(),
277+
};
231278

232279
self.set_current_snapshot_(new_snapshot)?;
233280

0 commit comments

Comments
 (0)