@@ -935,7 +935,7 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
935935 /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
936936 /// event of failure as items are always pruned in order from oldest to newest.
937937 pub async fn prune ( & self , min_item_pos : u64 ) -> Result < bool , Error > {
938- let mut inner = self . inner . write ( ) . await ;
938+ let inner = self . inner . write ( ) . await ;
939939
940940 // Calculate the section that would contain min_item_pos
941941 let target_section = min_item_pos / self . items_per_blob ;
@@ -946,7 +946,15 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
946946 // Cap to tail section. The tail section is guaranteed to exist by our invariant.
947947 let min_section = std:: cmp:: min ( target_section, tail_section) ;
948948
949- let pruned = inner. journal . prune ( min_section) . await ?;
949+ // Unlink old sections while allowing concurrent readers to use existing open handles.
950+ let inner_ref = inner. downgrade_to_upgradable ( ) ;
951+ let pruned = inner_ref. journal . unlink_before ( min_section) . await ?;
952+ if !pruned {
953+ return Ok ( false ) ;
954+ }
955+
956+ let mut inner = inner_ref. upgrade ( ) . await ;
957+ let pruned = inner. journal . commit_prune ( min_section) ;
950958
951959 // After pruning, update pruning_boundary to the start of the oldest remaining section
952960 if pruned {
@@ -1120,14 +1128,19 @@ impl<E: Context, A: CodecFixedShared> crate::journal::authenticated::Inner<E> fo
11201128mod tests {
11211129 use super :: * ;
11221130 use commonware_cryptography:: { sha256:: Digest , Hasher as _, Sha256 } ;
1123- use commonware_macros:: test_traced;
1131+ use commonware_macros:: { select , test_traced} ;
11241132 use commonware_runtime:: {
11251133 deterministic:: { self , Context } ,
1126- Blob , BufferPooler , Error as RuntimeError , Metrics as _, Runner , Storage , Supervisor as _,
1134+ Blob , BufferPooler , Clock as _, Error as RuntimeError , Metrics as _, Runner , Spawner as _,
1135+ Storage , Supervisor as _,
11271136 } ;
1128- use commonware_utils:: { NZUsize , NZU16 , NZU64 } ;
1137+ use commonware_utils:: { sync :: Notify , NZUsize , NZU16 , NZU64 } ;
11291138 use futures:: { pin_mut, StreamExt } ;
1130- use std:: num:: NonZeroU16 ;
1139+ use std:: {
1140+ num:: NonZeroU16 ,
1141+ sync:: Arc ,
1142+ time:: { Duration , SystemTime } ,
1143+ } ;
11311144
11321145 const PAGE_SIZE : NonZeroU16 = NZU16 ! ( 44 ) ;
11331146 const PAGE_CACHE_SIZE : NonZeroUsize = NZUsize ! ( 3 ) ;
@@ -1158,6 +1171,142 @@ mod tests {
11581171 }
11591172 }
11601173
1174+ /// Coordinates a test pause after a target section is removed from storage.
1175+ struct RemoveBlocker {
1176+ target : Vec < u8 > ,
1177+ removed : Notify ,
1178+ release : Notify ,
1179+ }
1180+
1181+ impl RemoveBlocker {
1182+ /// Create a blocker for removal of the given section.
1183+ fn new ( section : u64 ) -> Self {
1184+ Self {
1185+ target : section. to_be_bytes ( ) . to_vec ( ) ,
1186+ removed : Notify :: new ( ) ,
1187+ release : Notify :: new ( ) ,
1188+ }
1189+ }
1190+ }
1191+
1192+ struct BlockingContext {
1193+ inner : Context ,
1194+ blocker : Arc < RemoveBlocker > ,
1195+ }
1196+
1197+ impl BlockingContext {
1198+ /// Wrap a deterministic context and pause removal of the blocker's target section.
1199+ fn new ( inner : Context , blocker : Arc < RemoveBlocker > ) -> Self {
1200+ Self { inner, blocker }
1201+ }
1202+ }
1203+
1204+ impl commonware_runtime:: Supervisor for BlockingContext {
1205+ fn name ( & self ) -> commonware_runtime:: Name {
1206+ self . inner . name ( )
1207+ }
1208+
1209+ fn child ( & self , label : & ' static str ) -> Self {
1210+ Self {
1211+ inner : self . inner . child ( label) ,
1212+ blocker : self . blocker . clone ( ) ,
1213+ }
1214+ }
1215+
1216+ fn with_attribute ( self , key : & ' static str , value : impl std:: fmt:: Display ) -> Self {
1217+ Self {
1218+ inner : self . inner . with_attribute ( key, value) ,
1219+ blocker : self . blocker ,
1220+ }
1221+ }
1222+ }
1223+
1224+ impl commonware_runtime:: Metrics for BlockingContext {
1225+ fn register <
1226+ N : Into < String > ,
1227+ H : Into < String > ,
1228+ M : commonware_runtime:: telemetry:: metrics:: Metric ,
1229+ > (
1230+ & self ,
1231+ name : N ,
1232+ help : H ,
1233+ metric : M ,
1234+ ) -> commonware_runtime:: telemetry:: metrics:: Registered < M > {
1235+ self . inner . register ( name, help, metric)
1236+ }
1237+
1238+ fn encode ( & self ) -> String {
1239+ self . inner . encode ( )
1240+ }
1241+ }
1242+
1243+ impl governor:: clock:: Clock for BlockingContext {
1244+ type Instant = SystemTime ;
1245+
1246+ fn now ( & self ) -> Self :: Instant {
1247+ self . inner . current ( )
1248+ }
1249+ }
1250+
1251+ impl governor:: clock:: ReasonablyRealtime for BlockingContext { }
1252+
1253+ impl commonware_runtime:: Clock for BlockingContext {
1254+ fn current ( & self ) -> SystemTime {
1255+ self . inner . current ( )
1256+ }
1257+
1258+ fn sleep (
1259+ & self ,
1260+ duration : Duration ,
1261+ ) -> impl std:: future:: Future < Output = ( ) > + Send + ' static {
1262+ self . inner . sleep ( duration)
1263+ }
1264+
1265+ fn sleep_until (
1266+ & self ,
1267+ deadline : SystemTime ,
1268+ ) -> impl std:: future:: Future < Output = ( ) > + Send + ' static {
1269+ self . inner . sleep_until ( deadline)
1270+ }
1271+ }
1272+
1273+ impl BufferPooler for BlockingContext {
1274+ fn network_buffer_pool ( & self ) -> & commonware_runtime:: BufferPool {
1275+ self . inner . network_buffer_pool ( )
1276+ }
1277+
1278+ fn storage_buffer_pool ( & self ) -> & commonware_runtime:: BufferPool {
1279+ self . inner . storage_buffer_pool ( )
1280+ }
1281+ }
1282+
1283+ impl Storage for BlockingContext {
1284+ type Blob = <Context as Storage >:: Blob ;
1285+
1286+ async fn open_versioned (
1287+ & self ,
1288+ partition : & str ,
1289+ name : & [ u8 ] ,
1290+ versions : std:: ops:: RangeInclusive < u16 > ,
1291+ ) -> Result < ( Self :: Blob , u64 , u16 ) , RuntimeError > {
1292+ self . inner . open_versioned ( partition, name, versions) . await
1293+ }
1294+
1295+ async fn remove ( & self , partition : & str , name : Option < & [ u8 ] > ) -> Result < ( ) , RuntimeError > {
1296+ let block = name. is_some_and ( |name| name == self . blocker . target . as_slice ( ) ) ;
1297+ let result = self . inner . remove ( partition, name) . await ;
1298+ if block {
1299+ self . blocker . removed . notify_one ( ) ;
1300+ self . blocker . release . notified ( ) . await ;
1301+ }
1302+ result
1303+ }
1304+
1305+ async fn scan ( & self , partition : & str ) -> Result < Vec < Vec < u8 > > , RuntimeError > {
1306+ self . inner . scan ( partition) . await
1307+ }
1308+ }
1309+
11611310 #[ test_traced]
11621311 fn test_fixed_journal_init_conflicting_partitions ( ) {
11631312 let executor = deterministic:: Runner :: default ( ) ;
@@ -1400,6 +1549,91 @@ mod tests {
14001549 } ) ;
14011550 }
14021551
1552+ #[ test_traced]
1553+ fn test_fixed_journal_reads_during_prune_unlink ( ) {
1554+ let executor = deterministic:: Runner :: default ( ) ;
1555+ executor. start ( |context| async move {
1556+ let blocker = Arc :: new ( RemoveBlocker :: new ( 0 ) ) ;
1557+ let journal_context = BlockingContext :: new ( context. child ( "journal" ) , blocker. clone ( ) ) ;
1558+ let cfg = test_cfg ( & journal_context, NZU64 ! ( 2 ) ) ;
1559+ let journal = Arc :: new (
1560+ Journal :: < _ , Digest > :: init ( journal_context. child ( "inner" ) , cfg)
1561+ . await
1562+ . expect ( "failed to initialize journal" ) ,
1563+ ) ;
1564+
1565+ for i in 0 ..6 {
1566+ let pos = journal. append ( & test_digest ( i) ) . await . unwrap ( ) ;
1567+ assert_eq ! ( pos, i) ;
1568+ }
1569+ journal. sync ( ) . await . unwrap ( ) ;
1570+
1571+ let prune = context. child ( "prune" ) . spawn ( {
1572+ let journal = journal. clone ( ) ;
1573+ |_| async move { journal. prune ( 4 ) . await }
1574+ } ) ;
1575+
1576+ select ! {
1577+ _ = blocker. removed. notified( ) => { } ,
1578+ _ = context. sleep( Duration :: from_secs( 1 ) ) => panic!( "prune did not unlink section" ) ,
1579+ }
1580+
1581+ let reader = journal. reader ( ) ;
1582+ pin_mut ! ( reader) ;
1583+ let reader = select ! {
1584+ reader = reader => reader,
1585+ _ = context. sleep( Duration :: from_secs( 1 ) ) => {
1586+ panic!( "reader blocked while prune was unlinking" )
1587+ } ,
1588+ } ;
1589+
1590+ assert_eq ! ( reader. read( 0 ) . await . unwrap( ) , test_digest( 0 ) ) ;
1591+ assert_eq ! ( reader. read( 4 ) . await . unwrap( ) , test_digest( 4 ) ) ;
1592+ drop ( reader) ;
1593+
1594+ blocker. release . notify_one ( ) ;
1595+ assert ! ( prune. await . unwrap( ) . unwrap( ) ) ;
1596+
1597+ assert_eq ! ( journal. pruning_boundary( ) . await , 4 ) ;
1598+ assert_eq ! ( journal. test_oldest_section( ) . await , Some ( 2 ) ) ;
1599+ assert ! ( matches!( journal. read( 0 ) . await , Err ( Error :: ItemPruned ( 0 ) ) ) ) ;
1600+ assert_eq ! ( journal. read( 4 ) . await . unwrap( ) , test_digest( 4 ) ) ;
1601+ } ) ;
1602+ }
1603+
1604+ #[ test_traced]
1605+ fn test_fixed_journal_prune_remove_failure_reopens_contiguous ( ) {
1606+ let executor = deterministic:: Runner :: default ( ) ;
1607+ executor. start ( |context| async move {
1608+ let cfg = test_cfg ( & context, NZU64 ! ( 2 ) ) ;
1609+ let journal = Journal :: < _ , Digest > :: init ( context. child ( "first" ) , cfg. clone ( ) )
1610+ . await
1611+ . expect ( "failed to initialize journal" ) ;
1612+
1613+ for i in 0 ..6 {
1614+ let pos = journal. append ( & test_digest ( i) ) . await . unwrap ( ) ;
1615+ assert_eq ! ( pos, i) ;
1616+ }
1617+ journal. sync ( ) . await . unwrap ( ) ;
1618+
1619+ let fault_cfg = context. storage_fault_config ( ) ;
1620+ * fault_cfg. write ( ) = deterministic:: FaultConfig :: default ( ) . remove ( 1.0 ) ;
1621+ let err = journal. prune ( 4 ) . await . expect_err ( "prune should fail" ) ;
1622+ assert ! ( matches!( err, Error :: Runtime ( RuntimeError :: Io ( _) ) ) ) ;
1623+ drop ( journal) ;
1624+
1625+ * fault_cfg. write ( ) = deterministic:: FaultConfig :: default ( ) ;
1626+ let journal = Journal :: < _ , Digest > :: init ( context. child ( "second" ) , cfg)
1627+ . await
1628+ . expect ( "failed to re-initialize journal" ) ;
1629+ assert_eq ! ( journal. bounds( ) . await , 0 ..6 ) ;
1630+ assert_eq ! ( journal. test_oldest_section( ) . await , Some ( 0 ) ) ;
1631+ for i in 0 ..6 {
1632+ assert_eq ! ( journal. read( i) . await . unwrap( ) , test_digest( i) ) ;
1633+ }
1634+ } ) ;
1635+ }
1636+
14031637 /// Append a lot of data to make sure we exercise page cache paging boundaries.
14041638 #[ test_traced]
14051639 fn test_fixed_journal_append_a_lot_of_data ( ) {
0 commit comments