Skip to content

Commit a7848b5

Browse files
[storage] fix rewind bugs (#1521)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent 9dd9e2d commit a7848b5

11 files changed

Lines changed: 55 additions & 9 deletions

File tree

storage/fuzz/fuzz_targets/journal_operations.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ fn fuzz(input: FuzzInput) {
121121
JournalOperation::Rewind { size } => {
122122
if *size <= journal_size && *size >= oldest_retained_pos {
123123
journal.rewind(*size).await.unwrap();
124+
journal.sync().await.unwrap();
124125
journal_size = *size;
125126
}
126127
}

storage/src/adb/any/fixed/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ impl<
220220
let op_count = log_size - rewind_leaf_num;
221221
warn!(op_count, "rewinding over uncommitted log operations");
222222
log.rewind(rewind_leaf_num).await?;
223+
log.sync().await?;
223224
log_size = rewind_leaf_num;
224225
}
225226

storage/src/adb/any/fixed/sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ pub(crate) async fn init_journal<E: Storage + Metrics, A: CodecFixed<Cfg = ()>>(
214214
);
215215
journal.prune(lower_bound).await?;
216216
journal.rewind(upper_bound + 1).await?; // +1 because upper_bound is inclusive
217+
journal.sync().await?;
217218
journal
218219
};
219220
let journal_size = journal.size().await?;

storage/src/adb/any/variable/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translato
225225
locations_size, "rewinding misaligned locations map"
226226
);
227227
self.locations.rewind(mmr_leaves).await?;
228+
self.locations.sync().await?;
228229
} else if mmr_leaves > locations_size {
229230
warn!(mmr_leaves, locations_size, "rewinding misaligned mmr");
230231
self.mmr.pop((mmr_leaves - locations_size) as usize).await?;
@@ -347,6 +348,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translato
347348
// Pop any MMR elements that are ahead of the last log commit point.
348349
if mmr_leaves > self.log_size {
349350
self.locations.rewind(self.log_size).await?;
351+
self.locations.sync().await?;
350352

351353
let op_count = mmr_leaves - self.log_size;
352354
warn!(op_count, "popping uncommitted MMR operations");

storage/src/adb/immutable/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translato
297297
locations_size, "rewinding misaligned locations map"
298298
);
299299
locations.rewind(mmr_leaves).await?;
300+
locations.sync().await?;
300301
}
301302
if mmr_leaves > locations_size {
302303
warn!(mmr_leaves, locations_size, "rewinding misaligned mmr");
@@ -391,6 +392,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translato
391392
// Pop any MMR elements that are ahead of the last log commit point.
392393
if mmr_leaves > log_size {
393394
locations.rewind(log_size).await?;
395+
locations.sync().await?;
394396

395397
let op_count = mmr_leaves - log_size;
396398
warn!(op_count, "popping uncommitted MMR operations");

storage/src/adb/keyless.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ impl<E: Storage + Clock + Metrics, V: Codec, H: CHasher> Keyless<E, V, H> {
146146
locations_size, "rewinding misaligned locations journal"
147147
);
148148
locations.rewind(mmr_leaves).await?;
149+
locations.sync().await?;
149150
locations_size = mmr_leaves;
150151
} else if mmr_leaves > locations_size {
151152
warn!(mmr_leaves, locations_size, "rewinding misaligned mmr");
@@ -195,6 +196,7 @@ impl<E: Storage + Clock + Metrics, V: Codec, H: CHasher> Keyless<E, V, H> {
195196
"rewinding to last commit point"
196197
);
197198
locations.rewind(last_commit_loc + 1).await?;
199+
locations.sync().await?;
198200
mmr.pop((locations_size - last_commit_loc - 1) as usize)
199201
.await?;
200202
locations_size = last_commit_loc + 1;
@@ -203,16 +205,19 @@ impl<E: Storage + Clock + Metrics, V: Codec, H: CHasher> Keyless<E, V, H> {
203205
let rewind_point = rewind_point.expect("no rewind point found");
204206
let section = rewind_point.0 / cfg.log_items_per_section.get();
205207
log.rewind_to_offset(section, rewind_point.1).await?;
208+
log.sync(section).await?;
206209
}
207210
} else if locations_size > 0 {
208211
warn!(
209212
old_size = locations_size,
210213
"no commit point found, rewinding to start"
211214
);
212215
locations.rewind(0).await?;
216+
locations.sync().await?;
213217
mmr.pop(locations_size as usize).await?;
214218
locations_size = 0;
215219
log.rewind_section(0, 0).await?;
220+
log.sync(0).await?;
216221
}
217222

218223
Ok(Self {

storage/src/freezer/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Freezer<E, K, V> {
578578

579579
// Rewind the journal to the committed section and offset
580580
journal.rewind(checkpoint.section, checkpoint.size).await?;
581+
journal.sync(checkpoint.section).await?;
581582

582583
// Resize table if needed
583584
let expected_table_len = Self::table_offset(checkpoint.table_size);

storage/src/journal/fixed.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,14 @@ impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
362362
Ok(item_pos)
363363
}
364364

365-
/// Rewind the journal to the given `size`. Returns [Error::MissingBlob] if the rewind
366-
/// point precedes the oldest retained element point. The journal is not synced after rewinding.
365+
/// Rewind the journal to the given `size`. Returns [Error::MissingBlob] if the rewind point
366+
/// precedes the oldest retained element point. The journal is not synced after rewinding.
367367
///
368-
/// Note that this operation is not atomic, but it will always leave the journal in a consistent
369-
/// state in the event of failure since blobs are always removed from newest to oldest.
368+
/// # Warnings
369+
///
370+
/// * This operation is not guaranteed to survive restarts until sync is called.
371+
/// * This operation is not atomic, but it will always leave the journal in a consistent state
372+
/// in the event of failure since blobs are always removed from newest to oldest.
370373
pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
371374
match size.cmp(&self.size().await?) {
372375
std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),

storage/src/journal/variable.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,13 +652,25 @@ impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
652652
}
653653

654654
/// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
655+
///
656+
/// # Warnings
657+
///
658+
/// * This operation is not guaranteed to survive restarts until sync is called.
659+
/// * This operation is not atomic, but it will always leave the journal in a consistent state
660+
/// in the event of failure since blobs are always removed in reverse order of section.
655661
pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
656662
self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
657663
}
658664

659665
/// Rewinds the journal to the given `section` and `size`.
660666
///
661667
/// This removes any data beyond the specified `section` and `size`.
668+
///
669+
/// # Warnings
670+
///
671+
/// * This operation is not guaranteed to survive restarts until sync is called.
672+
/// * This operation is not atomic, but it will always leave the journal in a consistent state
673+
/// in the event of failure since blobs are always removed in reverse order of section.
662674
pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
663675
self.prune_guard(section, false)?;
664676

@@ -707,6 +719,10 @@ impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
707719
/// Rewinds the `section` to the given `size`.
708720
///
709721
/// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
722+
///
723+
/// # Warning
724+
///
725+
/// This operation is not guaranteed to survive restarts until sync is called.
710726
pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
711727
self.prune_guard(section, false)?;
712728

storage/src/mmr/journaled.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ impl<E: RStorage + Clock + Metrics, H: CHasher> Mmr<E, H> {
266266
orphaned_leaf = Some(item);
267267
}
268268
journal.rewind(last_valid_size).await?;
269+
journal.sync().await?;
269270
journal_size = last_valid_size
270271
}
271272

@@ -483,7 +484,8 @@ impl<E: RStorage + Clock + Metrics, H: CHasher> Mmr<E, H> {
483484
}
484485

485486
/// Pop the given number of elements from the tip of the MMR assuming they exist, and otherwise
486-
/// return Empty or ElementPruned errors.
487+
/// return Empty or ElementPruned errors. The backing journal is synced to disk before
488+
/// returning.
487489
///
488490
/// # Warning
489491
///
@@ -522,6 +524,7 @@ impl<E: RStorage + Clock + Metrics, H: CHasher> Mmr<E, H> {
522524
}
523525

524526
self.journal.rewind(new_size).await?;
527+
self.journal.sync().await?;
525528
self.journal_size = new_size;
526529

527530
// Reset the mem_mmr to one of the new_size in the "prune_all" state.
@@ -561,10 +564,6 @@ impl<E: RStorage + Clock + Metrics, H: CHasher> Mmr<E, H> {
561564
/// Process all batched updates and sync the MMR to disk. If `pool` is non-null, then it will be
562565
/// used to parallelize the sync.
563566
pub async fn sync(&mut self, h: &mut impl Hasher<H>) -> Result<(), Error> {
564-
if self.size() == 0 {
565-
return Ok(());
566-
}
567-
568567
// Write the nodes cached in the memory-resident MMR to the journal.
569568
self.mem_mmr.sync(h);
570569

@@ -852,6 +851,20 @@ mod tests {
852851
assert!(mmr.prune_to_pos(&mut hasher, 0).await.is_ok());
853852
assert!(mmr.sync(&mut hasher).await.is_ok());
854853
assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
854+
855+
mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
856+
assert_eq!(mmr.size(), 1);
857+
mmr.sync(&mut hasher).await.unwrap();
858+
assert!(mmr.get_node(0).await.is_ok());
859+
assert!(mmr.pop(1).await.is_ok());
860+
assert_eq!(mmr.size(), 0);
861+
mmr.sync(&mut hasher).await.unwrap();
862+
863+
let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
864+
.await
865+
.unwrap();
866+
assert_eq!(mmr.size(), 0);
867+
855868
mmr.destroy().await.unwrap();
856869
});
857870
}

0 commit comments

Comments
 (0)