Skip to content

Commit dd4b4d9

Browse files
authored
Merge 'core/mvcc: properly complete checkpoint in case of error' from Nikita Sivukhin
Call `self.mvstore.storage.on_checkpoint_end` in the `cleanup_after_external_io_error` to properly cleanup checkpoint in cased of IO error Reviewed-by: Mikaël Francoeur (@LeMikaelF) Closes #7413
2 parents 17b772d + 44e50d0 commit dd4b4d9

6 files changed

Lines changed: 24 additions & 9 deletions

File tree

core/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1721,7 +1721,7 @@ impl Connection {
17211721
Ok(TransitionResult::Done(result)) => return Ok(result),
17221722
Ok(TransitionResult::Io(iocompletions)) => {
17231723
if let Err(err) = iocompletions.wait(io.as_ref()) {
1724-
ckpt_sm.cleanup_after_external_io_error();
1724+
ckpt_sm.cleanup_after_external_io_error(err.clone())?;
17251725
return Err(err);
17261726
}
17271727
}

core/mvcc/database/checkpoint_state_machine.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,10 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
688688
/// Cleanup path for I/O errors that happen while waiting on completions outside
689689
/// of `step()`. This mirrors `step()` error handling and also resets pager/WAL
690690
/// checkpoint bookkeeping.
691-
pub fn cleanup_after_external_io_error(&mut self) {
691+
pub fn cleanup_after_external_io_error(&mut self, err: LimboError) -> Result<()> {
692+
// run storage cleanup within proper checkpoint context (e.g. pager has pending read/write txn)
693+
let result = self.mvstore.storage.on_checkpoint_end(Err(err));
694+
692695
if self.lock_states.pager_write_tx {
693696
self.pager.rollback_tx(self.connection.as_ref());
694697
if self.update_transaction_state {
@@ -716,6 +719,8 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
716719
self.checkpoint_lock.unlock();
717720
self.lock_states.blocking_checkpoint_lock_held = false;
718721
}
722+
723+
result
719724
}
720725

721726
/// Returns all checkpointable [RowVersion]s for that `table_id`
@@ -1558,6 +1563,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
15581563
*special_write,
15591564
)
15601565
};
1566+
tracing::debug!("WriteRow: num_columns={num_columns}, table_id={table_id:?}, special_write={special_write:?}");
15611567

15621568
// Handle CREATE TABLE / DROP TABLE / CREATE INDEX / DROP INDEX ops
15631569
if let Some(special_write) = special_write {
@@ -1703,6 +1709,8 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
17031709
})
17041710
};
17051711

1712+
tracing::debug!("WriteRow: resolved root page: root_page={root_page}");
1713+
17061714
// If a table was created, it now has a real root page allocated for it, but the 'root_page' field in the sqlite_schema record is still the table id.
17071715
// So we need to rewrite the row version to use the real root page.
17081716
if let Some(SpecialWrite::BTreeCreate {
@@ -2255,9 +2263,11 @@ impl<Clock: LogicalClock> StateTransition for CheckpointStateMachine<Clock> {
22552263
let res = self.step_inner(&());
22562264
match res {
22572265
Err(ref err) => {
2258-
self.mvstore.storage.on_checkpoint_end(Err(err.clone()))?;
22592266
tracing::debug!("Error in checkpoint state machine: {err}");
2260-
self.cleanup_after_external_io_error();
2267+
// `cleanup_after_external_io_error` already emits the paired
2268+
// `on_checkpoint_end(Err(..))`, so don't call it here too — doing both
2269+
// double-fires the hook for a single failure.
2270+
self.cleanup_after_external_io_error(err.clone())?;
22612271
res
22622272
}
22632273
Ok(TransitionResult::Done(ref result)) => {

core/mvcc/database/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,10 +1394,13 @@ pub struct DeleteRowStateMachine {
13941394
impl<Clock: LogicalClock> CommitStateMachine<Clock> {
13951395
pub(crate) fn cleanup_mvcc_checkpoint_state(&mut self) {
13961396
if let CommitState::Checkpoint { state_machine } = &mut self.state {
1397-
state_machine
1397+
let _ = state_machine
13981398
.lock()
13991399
.inner_mut()
1400-
.cleanup_after_external_io_error();
1400+
.cleanup_after_external_io_error(LimboError::InternalError(
1401+
"mvcc: cleanup_unfinished_commit".to_string(),
1402+
))
1403+
.inspect_err(|e| tracing::error!("cleanup_after_external_io_error failed: {e}"));
14011404
}
14021405
}
14031406

core/mvcc/database/tests.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2852,7 +2852,9 @@ fn test_checkpoint_resamples_boundary_before_starting() {
28522852
mvcc_store.durable_txid_max.load(Ordering::SeqCst),
28532853
update_ts
28542854
);
2855-
interrupted_checkpoint.cleanup_after_external_io_error();
2855+
interrupted_checkpoint
2856+
.cleanup_after_external_io_error(LimboError::Interrupt)
2857+
.unwrap();
28562858

28572859
let mut finished = false;
28582860
for _ in 0..50_000 {

core/storage/pager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6124,7 +6124,7 @@ mod ptrmap_tests {
61246124
target_idx,
61256125
PendingRead {
61266126
page: synthetic_page.clone(),
6127-
disk_read: Some(stub_disk_read.clone()),
6127+
disk_read: Some(stub_disk_read),
61286128
},
61296129
);
61306130

core/vdbe/execute.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ pub fn op_checkpoint(
616616
Ok(TransitionResult::Done(result)) => break result,
617617
Ok(TransitionResult::Io(iocompletions)) => {
618618
if let Err(err) = iocompletions.wait(pager.io.as_ref()) {
619-
ckpt_sm.cleanup_after_external_io_error();
619+
ckpt_sm.cleanup_after_external_io_error(err.clone())?;
620620
return Err(err);
621621
}
622622
}

0 commit comments

Comments
 (0)