Skip to content

Commit 56983cc

Browse files
nits
1 parent 6ae3bdd commit 56983cc

1 file changed

Lines changed: 100 additions & 13 deletions

File tree

runtime/src/utils/buffer/paged/append.rs

Lines changed: 100 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,15 @@ impl<B: Blob> Append<B> {
186186
}
187187

188188
let capacity = capacity_with_floor(capacity, cache_ref.page_size());
189+
let needs_sync = !invalid_data_found;
189190

190191
let (blob_state, partial_data) = match partial_page_state {
191192
Some((partial_page, crc_record)) => (
192193
BlobState {
193194
blob,
194195
current_page: pages - 1,
195196
partial_page_state: Some(crc_record),
196-
needs_sync: false,
197+
needs_sync,
197198
},
198199
Some(partial_page),
199200
),
@@ -202,7 +203,7 @@ impl<B: Blob> Append<B> {
202203
blob,
203204
current_page: pages,
204205
partial_page_state: None,
205-
needs_sync: false,
206+
needs_sync,
206207
},
207208
None,
208209
),
@@ -1740,21 +1741,28 @@ mod tests {
17401741
.await
17411742
.unwrap();
17421743

1743-
// A single buffered write with no prior dirty state can be made durable directly.
1744+
// A newly wrapped blob preserves one full barrier before range sync is used.
1745+
append.sync().await.unwrap();
1746+
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1747+
assert_eq!(writes, 0);
1748+
assert_eq!(full_syncs, 1);
1749+
assert_eq!(range_syncs, 0);
1750+
1751+
// A single buffered write with no remaining dirty state can be made durable directly.
17441752
let data = b"hello world";
17451753
append.append(data).await.unwrap();
17461754
append.sync().await.unwrap();
17471755

17481756
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
17491757
assert_eq!(writes, 1);
1750-
assert_eq!(full_syncs, 0);
1758+
assert_eq!(full_syncs, 1);
17511759
assert_eq!(range_syncs, 1);
17521760

17531761
// With no new writes and no pending full-sync barrier, sync has no work left.
17541762
append.sync().await.unwrap();
17551763
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
17561764
assert_eq!(writes, 1);
1757-
assert_eq!(full_syncs, 0);
1765+
assert_eq!(full_syncs, 1);
17581766
assert_eq!(range_syncs, 1);
17591767

17601768
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
@@ -1876,6 +1884,81 @@ mod tests {
18761884
});
18771885
}
18781886

1887+
#[test_traced("DEBUG")]
1888+
fn test_recreated_sync_preserves_replay_plain_flush_barrier() {
1889+
let executor = deterministic::Runner::default();
1890+
executor.start(|context: deterministic::Context| async move {
1891+
let blob = SyncTrackingBlob::new();
1892+
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1893+
let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1894+
.await
1895+
.unwrap();
1896+
1897+
append.append(b"replayed").await.unwrap();
1898+
let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1899+
assert!(replay.ensure(b"replayed".len()).await.unwrap());
1900+
assert_eq!(replay.remaining(), b"replayed".len());
1901+
assert_eq!(replay.chunk(), b"replayed");
1902+
drop(replay);
1903+
drop(append);
1904+
1905+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1906+
assert!(durable.is_empty());
1907+
assert_eq!(writes, 1);
1908+
assert_eq!(full_syncs, 0);
1909+
assert_eq!(range_syncs, 0);
1910+
1911+
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1912+
let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1913+
.await
1914+
.unwrap();
1915+
assert_eq!(reopened.size().await, b"replayed".len() as u64);
1916+
reopened.sync().await.unwrap();
1917+
1918+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1919+
assert_eq!(durable.len(), blob.size() as usize);
1920+
assert_eq!(writes, 1);
1921+
assert_eq!(full_syncs, 1);
1922+
assert_eq!(range_syncs, 0);
1923+
});
1924+
}
1925+
1926+
#[test_traced("DEBUG")]
1927+
fn test_recreated_sync_skips_barrier_after_invalid_truncation() {
1928+
let executor = deterministic::Runner::default();
1929+
executor.start(|context: deterministic::Context| async move {
1930+
let blob = SyncTrackingBlob::new();
1931+
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1932+
let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1933+
.await
1934+
.unwrap();
1935+
append.sync().await.unwrap();
1936+
append.append(b"valid").await.unwrap();
1937+
append.sync().await.unwrap();
1938+
drop(append);
1939+
1940+
blob.write_at(blob.size(), b"junk").await.unwrap();
1941+
1942+
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1943+
let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1944+
.await
1945+
.unwrap();
1946+
assert_eq!(reopened.size().await, b"valid".len() as u64);
1947+
1948+
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1949+
assert_eq!(writes, 2);
1950+
assert_eq!(full_syncs, 2);
1951+
assert_eq!(range_syncs, 1);
1952+
1953+
reopened.sync().await.unwrap();
1954+
1955+
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1956+
assert_eq!(writes, 2);
1957+
assert_eq!(full_syncs, 2);
1958+
assert_eq!(range_syncs, 1);
1959+
});
1960+
}
1961+
18791962
#[test_traced("DEBUG")]
18801963
fn test_sync_batches_split_protected_writes_with_full_sync() {
18811964
let executor = deterministic::Runner::default();
@@ -1885,6 +1968,7 @@ mod tests {
18851968
let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
18861969
.await
18871970
.unwrap();
1971+
append.sync().await.unwrap();
18881972

18891973
// Establish a persisted partial page with one authoritative CRC slot.
18901974
append.append(b"abc").await.unwrap();
@@ -1897,7 +1981,7 @@ mod tests {
18971981

18981982
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
18991983
assert_eq!(writes, 3);
1900-
assert_eq!(full_syncs, 1);
1984+
assert_eq!(full_syncs, 2);
19011985
assert_eq!(range_syncs, 1);
19021986

19031987
// On the next extension, the protected slot is the second CRC, so only the prefix
@@ -1907,7 +1991,7 @@ mod tests {
19071991

19081992
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
19091993
assert_eq!(writes, 4);
1910-
assert_eq!(full_syncs, 1);
1994+
assert_eq!(full_syncs, 2);
19111995
assert_eq!(range_syncs, 2);
19121996

19131997
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
@@ -3473,6 +3557,7 @@ mod tests {
34733557
let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
34743558
.await
34753559
.unwrap();
3560+
append.sync().await.unwrap();
34763561

34773562
let data = vec![5u8; PAGE_SIZE.get() as usize];
34783563
append.append(&data).await.unwrap();
@@ -3484,7 +3569,7 @@ mod tests {
34843569

34853570
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
34863571
assert_eq!(writes, 4);
3487-
assert_eq!(full_syncs, 0);
3572+
assert_eq!(full_syncs, 1);
34883573
assert_eq!(range_syncs, 4);
34893574
});
34903575
}
@@ -3498,6 +3583,7 @@ mod tests {
34983583
let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
34993584
.await
35003585
.unwrap();
3586+
append.sync().await.unwrap();
35013587

35023588
let data = vec![9u8; PAGE_SIZE.get() as usize * 2];
35033589
append.append(&data).await.unwrap();
@@ -3510,7 +3596,7 @@ mod tests {
35103596

35113597
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
35123598
assert_eq!(writes, 4);
3513-
assert_eq!(full_syncs, 1);
3599+
assert_eq!(full_syncs, 2);
35143600
assert_eq!(range_syncs, 3);
35153601

35163602
// Once the resize barrier is cleared, the next single flush can use range sync again.
@@ -3519,7 +3605,7 @@ mod tests {
35193605

35203606
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
35213607
assert_eq!(writes, 5);
3522-
assert_eq!(full_syncs, 1);
3608+
assert_eq!(full_syncs, 2);
35233609
assert_eq!(range_syncs, 4);
35243610

35253611
let mut expected = data[..50].to_vec();
@@ -3538,9 +3624,10 @@ mod tests {
35383624
let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
35393625
.await
35403626
.unwrap();
3627+
append.sync().await.unwrap();
35413628

3542-
// Start with two durable full pages. The initial sync can persist them with one
3543-
// range-sync write.
3629+
// Start with two durable full pages. After clearing the wrapper barrier, the data sync
3630+
// can persist them with one range-sync write.
35443631
let page_size = PAGE_SIZE.get() as usize;
35453632
let data = vec![11u8; page_size * 2];
35463633
append.append(&data).await.unwrap();
@@ -3553,7 +3640,7 @@ mod tests {
35533640
// Only the resize needs a full sync, no additional writes are emitted by the shrink.
35543641
let (_, writes, full_syncs, range_syncs) = blob.snapshot();
35553642
assert_eq!(writes, 1);
3556-
assert_eq!(full_syncs, 1);
3643+
assert_eq!(full_syncs, 2);
35573644
assert_eq!(range_syncs, 1);
35583645

35593646
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));

0 commit comments

Comments
 (0)