Skip to content

Commit 17821db

Browse files
nit
1 parent 7015088 commit 17821db

2 files changed

Lines changed: 38 additions & 7 deletions

File tree

runtime/src/utils/buffer/mod.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,28 +1491,57 @@ mod tests {
14911491
let blob = RangeSyncBlob::new();
14921492
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
14931493

1494-
// A fresh writer has no buffered bytes or prior plain blob mutation, so sync is a no-op.
1494+
// A fresh writer preserves one sync barrier for mutations that predate wrapping.
14951495
writer.sync().await.unwrap();
14961496
let (durable, full_syncs, range_syncs) = blob.snapshot();
14971497
assert!(durable.is_empty());
1498-
assert_eq!(full_syncs, 0);
1498+
assert_eq!(full_syncs, 1);
14991499
assert_eq!(range_syncs, 0);
15001500

15011501
// The write remains entirely buffered, so sync can make just this range durable.
15021502
writer.write_at(0, b"abc").await.unwrap();
15031503
writer.sync().await.unwrap();
15041504

1505-
// No prior plain blob mutation required a full sync barrier.
1505+
// No prior plain blob mutation required another full sync barrier.
15061506
let (durable, full_syncs, range_syncs) = blob.snapshot();
15071507
assert_eq!(durable.as_slice(), b"abc");
1508-
assert_eq!(full_syncs, 0);
1508+
assert_eq!(full_syncs, 1);
15091509
assert_eq!(range_syncs, 1);
15101510

15111511
// The prior sync used write_at_sync, so there is still no pending full-sync barrier.
15121512
writer.sync().await.unwrap();
15131513
let (durable, full_syncs, range_syncs) = blob.snapshot();
15141514
assert_eq!(durable.as_slice(), b"abc");
1515-
assert_eq!(full_syncs, 0);
1515+
assert_eq!(full_syncs, 1);
1516+
assert_eq!(range_syncs, 1);
1517+
});
1518+
}
1519+
1520+
#[test_traced]
1521+
fn test_write_sync_persists_pre_wrapped_blob_mutation() {
1522+
let executor = deterministic::Runner::default();
1523+
executor.start(|context| async move {
1524+
let blob = RangeSyncBlob::new();
1525+
1526+
// Simulate a plain blob mutation before the writer wraps it.
1527+
blob.write_at(0, b"abc").await.unwrap();
1528+
1529+
let writer = Write::from_pooler(&context, blob.clone(), 3, NZUsize!(8));
1530+
writer.sync().await.unwrap();
1531+
1532+
// The first sync must use a full barrier to make the pre-wrapped write durable.
1533+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1534+
assert_eq!(durable.as_slice(), b"abc");
1535+
assert_eq!(full_syncs, 1);
1536+
assert_eq!(range_syncs, 0);
1537+
1538+
// After the barrier is clear, a buffered tip-only write can use range sync again.
1539+
writer.write_at(3, b"d").await.unwrap();
1540+
writer.sync().await.unwrap();
1541+
1542+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1543+
assert_eq!(durable.as_slice(), b"abcd");
1544+
assert_eq!(full_syncs, 1);
15161545
assert_eq!(range_syncs, 1);
15171546
});
15181547
}
@@ -1524,6 +1553,7 @@ mod tests {
15241553
let name = b"failed_range_sync";
15251554
let (blob, size) = context.open("partition", name).await.unwrap();
15261555
let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
1556+
writer.sync().await.unwrap();
15271557

15281558
// Keep the write buffered so sync attempts the clean `write_at_sync` path.
15291559
writer.write_at(0, b"abc").await.unwrap();
@@ -1580,6 +1610,7 @@ mod tests {
15801610
executor.start(|context| async move {
15811611
let blob = RangeSyncBlob::new();
15821612
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1613+
writer.sync().await.unwrap();
15831614

15841615
// Establish already-durable data with a range sync.
15851616
writer.write_at(0, b"abcdef").await.unwrap();
@@ -1592,7 +1623,7 @@ mod tests {
15921623
// The resized contents require a full sync barrier to become durable.
15931624
let (durable, full_syncs, range_syncs) = blob.snapshot();
15941625
assert_eq!(durable.as_slice(), b"abcd");
1595-
assert_eq!(full_syncs, 1);
1626+
assert_eq!(full_syncs, 2);
15961627
assert_eq!(range_syncs, 1);
15971628
});
15981629
}

runtime/src/utils/buffer/write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl<B: Blob> Write<B> {
126126
state: Arc::new(AsyncRwLock::new(State {
127127
blob,
128128
buffer: Buffer::new(size, capacity.get(), pool),
129-
needs_sync: false,
129+
needs_sync: true, // ensure pending writes on the wrapped blob are synced
130130
})),
131131
}
132132
}

0 commit comments

Comments
 (0)