@@ -935,7 +935,7 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
935935 /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
936936 /// event of failure as items are always pruned in order from oldest to newest.
937937 pub async fn prune ( & self , min_item_pos : u64 ) -> Result < bool , Error > {
938- let mut inner = self . inner . write ( ) . await ;
938+ let inner = self . inner . write ( ) . await ;
939939
940940 // Calculate the section that would contain min_item_pos
941941 let target_section = min_item_pos / self . items_per_blob ;
@@ -946,7 +946,15 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
946946 // Cap to tail section. The tail section is guaranteed to exist by our invariant.
947947 let min_section = std:: cmp:: min ( target_section, tail_section) ;
948948
949- let pruned = inner. journal . prune ( min_section) . await ?;
949+ // Unlink old sections while allowing concurrent readers to use existing open handles.
950+ let inner_ref = inner. downgrade_to_upgradable ( ) ;
951+ let pruned = inner_ref. journal . unlink_before ( min_section) . await ?;
952+ if !pruned {
953+ return Ok ( false ) ;
954+ }
955+
956+ let mut inner = inner_ref. upgrade ( ) . await ;
957+ let pruned = inner. journal . commit_prune ( min_section) ;
950958
951959 // After pruning, update pruning_boundary to the start of the oldest remaining section
952960 if pruned {
@@ -1120,14 +1128,19 @@ impl<E: Context, A: CodecFixedShared> crate::journal::authenticated::Inner<E> fo
11201128mod tests {
11211129 use super :: * ;
11221130 use commonware_cryptography:: { sha256:: Digest , Hasher as _, Sha256 } ;
1123- use commonware_macros:: test_traced;
1131+ use commonware_macros:: { select , test_traced} ;
11241132 use commonware_runtime:: {
11251133 deterministic:: { self , Context } ,
1126- Blob , BufferPooler , Error as RuntimeError , Metrics as _, Runner , Storage , Supervisor as _,
1134+ Blob , BufferPooler , Clock as _, Error as RuntimeError , Metrics as _, Runner , Spawner as _,
1135+ Storage , Supervisor as _,
11271136 } ;
1128- use commonware_utils:: { NZUsize , NZU16 , NZU64 } ;
1137+ use commonware_utils:: { sync :: Notify , NZUsize , NZU16 , NZU64 } ;
11291138 use futures:: { pin_mut, StreamExt } ;
1130- use std:: num:: NonZeroU16 ;
1139+ use std:: {
1140+ num:: NonZeroU16 ,
1141+ sync:: Arc ,
1142+ time:: { Duration , SystemTime } ,
1143+ } ;
11311144
11321145 const PAGE_SIZE : NonZeroU16 = NZU16 ! ( 44 ) ;
11331146 const PAGE_CACHE_SIZE : NonZeroUsize = NZUsize ! ( 3 ) ;
@@ -1158,6 +1171,141 @@ mod tests {
11581171 }
11591172 }
11601173
1174+ struct RemoveBlocker {
1175+ target : Vec < u8 > ,
1176+ removed : Notify ,
1177+ release : Notify ,
1178+ }
1179+
1180+ impl RemoveBlocker {
1181+ /// Create a blocker for removal of the given section.
1182+ fn new ( section : u64 ) -> Self {
1183+ Self {
1184+ target : section. to_be_bytes ( ) . to_vec ( ) ,
1185+ removed : Notify :: new ( ) ,
1186+ release : Notify :: new ( ) ,
1187+ }
1188+ }
1189+ }
1190+
1191+ struct BlockingContext {
1192+ inner : Context ,
1193+ blocker : Arc < RemoveBlocker > ,
1194+ }
1195+
1196+ impl BlockingContext {
1197+ /// Wrap a deterministic context and pause removal of the blocker's target section.
1198+ fn new ( inner : Context , blocker : Arc < RemoveBlocker > ) -> Self {
1199+ Self { inner, blocker }
1200+ }
1201+ }
1202+
1203+ impl commonware_runtime:: Supervisor for BlockingContext {
1204+ fn name ( & self ) -> commonware_runtime:: Name {
1205+ self . inner . name ( )
1206+ }
1207+
1208+ fn child ( & self , label : & ' static str ) -> Self {
1209+ Self {
1210+ inner : self . inner . child ( label) ,
1211+ blocker : self . blocker . clone ( ) ,
1212+ }
1213+ }
1214+
1215+ fn with_attribute ( self , key : & ' static str , value : impl std:: fmt:: Display ) -> Self {
1216+ Self {
1217+ inner : self . inner . with_attribute ( key, value) ,
1218+ blocker : self . blocker ,
1219+ }
1220+ }
1221+ }
1222+
1223+ impl commonware_runtime:: Metrics for BlockingContext {
1224+ fn register <
1225+ N : Into < String > ,
1226+ H : Into < String > ,
1227+ M : commonware_runtime:: telemetry:: metrics:: Metric ,
1228+ > (
1229+ & self ,
1230+ name : N ,
1231+ help : H ,
1232+ metric : M ,
1233+ ) -> commonware_runtime:: telemetry:: metrics:: Registered < M > {
1234+ self . inner . register ( name, help, metric)
1235+ }
1236+
1237+ fn encode ( & self ) -> String {
1238+ self . inner . encode ( )
1239+ }
1240+ }
1241+
1242+ impl governor:: clock:: Clock for BlockingContext {
1243+ type Instant = SystemTime ;
1244+
1245+ fn now ( & self ) -> Self :: Instant {
1246+ self . inner . current ( )
1247+ }
1248+ }
1249+
1250+ impl governor:: clock:: ReasonablyRealtime for BlockingContext { }
1251+
1252+ impl commonware_runtime:: Clock for BlockingContext {
1253+ fn current ( & self ) -> SystemTime {
1254+ self . inner . current ( )
1255+ }
1256+
1257+ fn sleep (
1258+ & self ,
1259+ duration : Duration ,
1260+ ) -> impl std:: future:: Future < Output = ( ) > + Send + ' static {
1261+ self . inner . sleep ( duration)
1262+ }
1263+
1264+ fn sleep_until (
1265+ & self ,
1266+ deadline : SystemTime ,
1267+ ) -> impl std:: future:: Future < Output = ( ) > + Send + ' static {
1268+ self . inner . sleep_until ( deadline)
1269+ }
1270+ }
1271+
1272+ impl BufferPooler for BlockingContext {
1273+ fn network_buffer_pool ( & self ) -> & commonware_runtime:: BufferPool {
1274+ self . inner . network_buffer_pool ( )
1275+ }
1276+
1277+ fn storage_buffer_pool ( & self ) -> & commonware_runtime:: BufferPool {
1278+ self . inner . storage_buffer_pool ( )
1279+ }
1280+ }
1281+
1282+ impl Storage for BlockingContext {
1283+ type Blob = <Context as Storage >:: Blob ;
1284+
1285+ async fn open_versioned (
1286+ & self ,
1287+ partition : & str ,
1288+ name : & [ u8 ] ,
1289+ versions : std:: ops:: RangeInclusive < u16 > ,
1290+ ) -> Result < ( Self :: Blob , u64 , u16 ) , RuntimeError > {
1291+ self . inner . open_versioned ( partition, name, versions) . await
1292+ }
1293+
1294+ async fn remove ( & self , partition : & str , name : Option < & [ u8 ] > ) -> Result < ( ) , RuntimeError > {
1295+ let block = name. is_some_and ( |name| name == self . blocker . target . as_slice ( ) ) ;
1296+ let result = self . inner . remove ( partition, name) . await ;
1297+ if block {
1298+ self . blocker . removed . notify_one ( ) ;
1299+ self . blocker . release . notified ( ) . await ;
1300+ }
1301+ result
1302+ }
1303+
1304+ async fn scan ( & self , partition : & str ) -> Result < Vec < Vec < u8 > > , RuntimeError > {
1305+ self . inner . scan ( partition) . await
1306+ }
1307+ }
1308+
11611309 #[ test_traced]
11621310 fn test_fixed_journal_init_conflicting_partitions ( ) {
11631311 let executor = deterministic:: Runner :: default ( ) ;
@@ -1400,6 +1548,91 @@ mod tests {
14001548 } ) ;
14011549 }
14021550
1551+ #[ test_traced]
1552+ fn test_fixed_journal_reads_during_prune_unlink ( ) {
1553+ let executor = deterministic:: Runner :: default ( ) ;
1554+ executor. start ( |context| async move {
1555+ let blocker = Arc :: new ( RemoveBlocker :: new ( 0 ) ) ;
1556+ let journal_context = BlockingContext :: new ( context. child ( "journal" ) , blocker. clone ( ) ) ;
1557+ let cfg = test_cfg ( & journal_context, NZU64 ! ( 2 ) ) ;
1558+ let journal = Arc :: new (
1559+ Journal :: < _ , Digest > :: init ( journal_context. child ( "inner" ) , cfg)
1560+ . await
1561+ . expect ( "failed to initialize journal" ) ,
1562+ ) ;
1563+
1564+ for i in 0 ..6 {
1565+ let pos = journal. append ( & test_digest ( i) ) . await . unwrap ( ) ;
1566+ assert_eq ! ( pos, i) ;
1567+ }
1568+ journal. sync ( ) . await . unwrap ( ) ;
1569+
1570+ let prune = context. child ( "prune" ) . spawn ( {
1571+ let journal = journal. clone ( ) ;
1572+ |_| async move { journal. prune ( 4 ) . await }
1573+ } ) ;
1574+
1575+ select ! {
1576+ _ = blocker. removed. notified( ) => { } ,
1577+ _ = context. sleep( Duration :: from_secs( 1 ) ) => panic!( "prune did not unlink section" ) ,
1578+ }
1579+
1580+ let reader = journal. reader ( ) ;
1581+ pin_mut ! ( reader) ;
1582+ let reader = select ! {
1583+ reader = reader => reader,
1584+ _ = context. sleep( Duration :: from_secs( 1 ) ) => {
1585+ panic!( "reader blocked while prune was unlinking" )
1586+ } ,
1587+ } ;
1588+
1589+ assert_eq ! ( reader. read( 0 ) . await . unwrap( ) , test_digest( 0 ) ) ;
1590+ assert_eq ! ( reader. read( 4 ) . await . unwrap( ) , test_digest( 4 ) ) ;
1591+ drop ( reader) ;
1592+
1593+ blocker. release . notify_one ( ) ;
1594+ assert ! ( prune. await . unwrap( ) . unwrap( ) ) ;
1595+
1596+ assert_eq ! ( journal. pruning_boundary( ) . await , 4 ) ;
1597+ assert_eq ! ( journal. test_oldest_section( ) . await , Some ( 2 ) ) ;
1598+ assert ! ( matches!( journal. read( 0 ) . await , Err ( Error :: ItemPruned ( 0 ) ) ) ) ;
1599+ assert_eq ! ( journal. read( 4 ) . await . unwrap( ) , test_digest( 4 ) ) ;
1600+ } ) ;
1601+ }
1602+
1603+ #[ test_traced]
1604+ fn test_fixed_journal_prune_remove_failure_reopens_contiguous ( ) {
1605+ let executor = deterministic:: Runner :: default ( ) ;
1606+ executor. start ( |context| async move {
1607+ let cfg = test_cfg ( & context, NZU64 ! ( 2 ) ) ;
1608+ let journal = Journal :: < _ , Digest > :: init ( context. child ( "first" ) , cfg. clone ( ) )
1609+ . await
1610+ . expect ( "failed to initialize journal" ) ;
1611+
1612+ for i in 0 ..6 {
1613+ let pos = journal. append ( & test_digest ( i) ) . await . unwrap ( ) ;
1614+ assert_eq ! ( pos, i) ;
1615+ }
1616+ journal. sync ( ) . await . unwrap ( ) ;
1617+
1618+ let fault_cfg = context. storage_fault_config ( ) ;
1619+ * fault_cfg. write ( ) = deterministic:: FaultConfig :: default ( ) . remove ( 1.0 ) ;
1620+ let err = journal. prune ( 4 ) . await . expect_err ( "prune should fail" ) ;
1621+ assert ! ( matches!( err, Error :: Runtime ( RuntimeError :: Io ( _) ) ) ) ;
1622+ drop ( journal) ;
1623+
1624+ * fault_cfg. write ( ) = deterministic:: FaultConfig :: default ( ) ;
1625+ let journal = Journal :: < _ , Digest > :: init ( context. child ( "second" ) , cfg)
1626+ . await
1627+ . expect ( "failed to re-initialize journal" ) ;
1628+ assert_eq ! ( journal. bounds( ) . await , 0 ..6 ) ;
1629+ assert_eq ! ( journal. test_oldest_section( ) . await , Some ( 0 ) ) ;
1630+ for i in 0 ..6 {
1631+ assert_eq ! ( journal. read( i) . await . unwrap( ) , test_digest( i) ) ;
1632+ }
1633+ } ) ;
1634+ }
1635+
14031636 /// Append a lot of data to make sure we exercise page cache paging boundaries.
14041637 #[ test_traced]
14051638 fn test_fixed_journal_append_a_lot_of_data ( ) {
0 commit comments