Skip to content

Commit 605024e

Browse files
committed
Change: Add RaftStateMachine::save_snapshot() method
Previously, `RaftStateMachine::install_snapshot()` was responsible for two tasks: - Replacing the state machine with the given snapshot. - Persisting the snapshot to disk. This commit introduces a new method, `RaftStateMachine::save_snapshot()`, to handle the task of persisting snapshots. With this change: - `install_snapshot()` no longer needs to persist snapshots, though it can still do so without causing harm. - The responsibility for persisting snapshots is now clearly separated. - Part of #1333 Upgrade tip: - Implement snapshot persistence logic in the new `save_snapshot()` method. - It is recommended to remove snapshot persistence logic from `install_snapshot()` for better separation of concerns.
1 parent 50cb1cc commit 605024e

File tree

14 files changed

+364
-143
lines changed

14 files changed

+364
-143
lines changed

Diff for: cluster_benchmark/tests/benchmark/store.rs

+24-16
Original file line numberDiff line numberDiff line change
@@ -162,22 +162,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
162162
snapshot_id,
163163
};
164164

165-
let snapshot = StoredSnapshot {
166-
meta: meta.clone(),
167-
data: data.clone(),
165+
let snapshot = Snapshot {
166+
meta,
167+
snapshot: Cursor::new(data),
168168
};
169169

170-
{
171-
let mut current_snapshot = self.current_snapshot.write().await;
172-
*current_snapshot = Some(snapshot);
173-
}
174-
175-
tracing::info!(snapshot_size, "log compaction complete");
170+
self.save_snapshot(&snapshot).await?;
176171

177-
Ok(Snapshot {
178-
meta,
179-
snapshot: Cursor::new(data),
180-
})
172+
Ok(snapshot)
181173
}
182174
}
183175

@@ -304,9 +296,25 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
304296
*sm = new_sm;
305297
}
306298

307-
// Update current snapshot.
308-
let mut current_snapshot = self.current_snapshot.write().await;
309-
*current_snapshot = Some(new_snapshot);
299+
Ok(())
300+
}
301+
302+
async fn save_snapshot(&mut self, snapshot: &Snapshot<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
303+
let new_snapshot = StoredSnapshot {
304+
meta: snapshot.meta.clone(),
305+
data: snapshot.snapshot.clone().into_inner(),
306+
};
307+
308+
let mut current = self.current_snapshot.write().await;
309+
310+
// Only save it if the new snapshot contains more recent data than the current snapshot.
311+
312+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
313+
if new_snapshot.meta.last_log_id <= current_last {
314+
return Ok(());
315+
}
316+
317+
*current = Some(new_snapshot);
310318
Ok(())
311319
}
312320

Diff for: examples/raft-kv-memstore-grpc/src/store/mod.rs

+29-10
Original file line numberDiff line numberDiff line change
@@ -141,23 +141,42 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
141141
async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), StorageError> {
142142
tracing::info!("install snapshot");
143143

144-
let new_snapshot = StoredSnapshot {
145-
meta: meta.clone(),
146-
data: snapshot,
147-
};
148-
149144
// Update the state machine.
150145
{
151-
let d: pb::StateMachineData = prost::Message::decode(new_snapshot.data.as_ref())
152-
.map_err(|e| StorageError::read_snapshot(None, &e))?;
146+
let d: pb::StateMachineData =
147+
prost::Message::decode(snapshot.as_ref()).map_err(|e| StorageError::read_snapshot(None, &e))?;
153148

154149
let mut state_machine = self.state_machine.lock().unwrap();
155150
*state_machine = d;
156151
}
157152

158-
// Update current snapshot.
159-
let mut current_snapshot = self.current_snapshot.lock().unwrap();
160-
*current_snapshot = Some(new_snapshot);
153+
let snapshot = Snapshot {
154+
meta: meta.clone(),
155+
snapshot: Cursor::new(snapshot),
156+
};
157+
158+
self.save_snapshot(&snapshot).await?;
159+
160+
Ok(())
161+
}
162+
163+
async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
164+
let new_snapshot = StoredSnapshot {
165+
meta: snapshot.meta.clone(),
166+
data: snapshot.snapshot.clone().into_inner(),
167+
};
168+
169+
let mut current = self.current_snapshot.lock().unwrap();
170+
171+
// Only save it if the new snapshot contains more recent data than the current snapshot.
172+
173+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
174+
if new_snapshot.meta.last_log_id <= current_last {
175+
return Ok(());
176+
}
177+
178+
*current = Some(new_snapshot);
179+
161180
Ok(())
162181
}
163182

Diff for: examples/raft-kv-memstore-network-v2/src/store.rs

+28-9
Original file line numberDiff line numberDiff line change
@@ -165,21 +165,40 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
165165
async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), StorageError> {
166166
tracing::info!("install snapshot");
167167

168-
let new_snapshot = StoredSnapshot {
169-
meta: meta.clone(),
170-
data: snapshot,
171-
};
172-
173168
// Update the state machine.
174169
{
175-
let updated_state_machine: StateMachineData = new_snapshot.data.clone();
170+
let updated_state_machine: StateMachineData = snapshot.clone();
176171
let mut state_machine = self.state_machine.lock().unwrap();
177172
*state_machine = updated_state_machine;
178173
}
179174

180-
// Update current snapshot.
181-
let mut current_snapshot = self.current_snapshot.lock().unwrap();
182-
*current_snapshot = Some(new_snapshot);
175+
let snapshot = Snapshot {
176+
meta: meta.clone(),
177+
snapshot: Cursor::new(snapshot),
178+
};
179+
180+
self.save_snapshot(&snapshot).await?;
181+
182+
Ok(())
183+
}
184+
185+
async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
186+
let new_snapshot = StoredSnapshot {
187+
meta: snapshot.meta.clone(),
188+
data: snapshot.snapshot.clone().into_inner(),
189+
};
190+
191+
let mut current = self.current_snapshot.lock().unwrap();
192+
193+
// Only save it if the new snapshot contains more recent data than the current snapshot.
194+
195+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
196+
if new_snapshot.meta.last_log_id <= current_last {
197+
return Ok(());
198+
}
199+
200+
*current = Some(new_snapshot);
201+
183202
Ok(())
184203
}
185204

Diff for: examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs

+25-14
Original file line numberDiff line numberDiff line change
@@ -123,20 +123,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
123123
snapshot_id: snapshot_id.clone(),
124124
};
125125

126-
let snapshot = StoredSnapshot {
127-
meta: meta.clone(),
128-
data: snapshot_id.clone(),
126+
let snapshot = Snapshot {
127+
meta,
128+
snapshot: snapshot_id,
129129
};
130130

131-
{
132-
let mut current_snapshot = self.current_snapshot.lock().unwrap();
133-
*current_snapshot = Some(snapshot);
134-
}
131+
self.save_snapshot(&snapshot).await?;
135132

136-
Ok(Snapshot {
137-
meta,
138-
snapshot: snapshot_id,
139-
})
133+
Ok(snapshot)
140134
}
141135
}
142136

@@ -201,9 +195,26 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
201195
*state_machine = updated_state_machine;
202196
}
203197

204-
// Update current snapshot.
205-
let mut current_snapshot = self.current_snapshot.lock().unwrap();
206-
*current_snapshot = Some(new_snapshot);
198+
Ok(())
199+
}
200+
201+
async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
202+
let new_snapshot = StoredSnapshot {
203+
meta: snapshot.meta.clone(),
204+
data: snapshot.snapshot.clone(),
205+
};
206+
207+
let mut current = self.current_snapshot.lock().unwrap();
208+
209+
// Only save it if the new snapshot contains more recent data than the current snapshot.
210+
211+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
212+
213+
if new_snapshot.meta.last_log_id <= current_last {
214+
return Ok(());
215+
}
216+
217+
*current = Some(new_snapshot);
207218
Ok(())
208219
}
209220

Diff for: examples/raft-kv-memstore-singlethreaded/src/store.rs

+25-14
Original file line numberDiff line numberDiff line change
@@ -157,20 +157,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Rc<StateMachineStore> {
157157
snapshot_id,
158158
};
159159

160-
let snapshot = StoredSnapshot {
161-
meta: meta.clone(),
162-
data: data.clone(),
160+
let snapshot = Snapshot {
161+
meta,
162+
snapshot: Cursor::new(data),
163163
};
164164

165-
{
166-
let mut current_snapshot = self.current_snapshot.borrow_mut();
167-
*current_snapshot = Some(snapshot);
168-
}
165+
self.save_snapshot(&snapshot).await?;
169166

170-
Ok(Snapshot {
171-
meta,
172-
snapshot: Cursor::new(data),
173-
})
167+
Ok(snapshot)
174168
}
175169
}
176170

@@ -238,9 +232,26 @@ impl RaftStateMachine<TypeConfig> for Rc<StateMachineStore> {
238232
*state_machine = updated_state_machine;
239233
}
240234

241-
// Update current snapshot.
242-
let mut current_snapshot = self.current_snapshot.borrow_mut();
243-
*current_snapshot = Some(new_snapshot);
235+
Ok(())
236+
}
237+
238+
async fn save_snapshot(&mut self, snapshot: &Snapshot) -> Result<(), StorageError> {
239+
let new_snapshot = StoredSnapshot {
240+
meta: snapshot.meta.clone(),
241+
data: snapshot.snapshot.clone().into_inner(),
242+
};
243+
244+
let mut current = self.current_snapshot.borrow_mut();
245+
246+
// Only save it if the new snapshot contains more recent data than the current snapshot.
247+
248+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
249+
if new_snapshot.meta.last_log_id <= current_last {
250+
return Ok(());
251+
}
252+
253+
*current = Some(new_snapshot);
254+
244255
Ok(())
245256
}
246257

Diff for: examples/raft-kv-memstore/src/store/mod.rs

+23-16
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,6 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
9898
let last_applied_log = state_machine.last_applied_log;
9999
let last_membership = state_machine.last_membership.clone();
100100

101-
// Lock the current snapshot before releasing the lock on the state machine, to avoid a race
102-
// condition on the written snapshot
103-
let mut current_snapshot = self.current_snapshot.write().await;
104101
drop(state_machine);
105102

106103
let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed) + 1;
@@ -116,17 +113,14 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
116113
snapshot_id,
117114
};
118115

119-
let snapshot = StoredSnapshot {
116+
let snapshot = Snapshot {
120117
meta: meta.clone(),
121-
data: data.clone(),
118+
snapshot: Cursor::new(data.clone()),
122119
};
123120

124-
*current_snapshot = Some(snapshot);
121+
self.save_snapshot(&snapshot).await?;
125122

126-
Ok(Snapshot {
127-
meta,
128-
snapshot: Cursor::new(data),
129-
})
123+
Ok(snapshot)
130124
}
131125
}
132126

@@ -203,13 +197,26 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
203197
let mut state_machine = self.state_machine.write().await;
204198
*state_machine = updated_state_machine;
205199

206-
// Lock the current snapshot before releasing the lock on the state machine, to avoid a race
207-
// condition on the written snapshot
208-
let mut current_snapshot = self.current_snapshot.write().await;
209-
drop(state_machine);
200+
Ok(())
201+
}
202+
203+
async fn save_snapshot(&mut self, snapshot: &Snapshot<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
204+
let new_snapshot = StoredSnapshot {
205+
meta: snapshot.meta.clone(),
206+
data: snapshot.snapshot.clone().into_inner(),
207+
};
208+
209+
let mut current = self.current_snapshot.write().await;
210+
211+
// Only save it if the new snapshot contains more recent data than the current snapshot.
212+
213+
let current_last = current.as_ref().and_then(|s| s.meta.last_log_id);
214+
if new_snapshot.meta.last_log_id <= current_last {
215+
return Ok(());
216+
}
217+
218+
*current = Some(new_snapshot);
210219

211-
// Update current snapshot.
212-
*current_snapshot = Some(new_snapshot);
213220
Ok(())
214221
}
215222

0 commit comments

Comments
 (0)