@@ -761,10 +761,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
761761 let reconstruction_deadline_millis =
762762 ( slot_duration * RECONSTRUCTION_DEADLINE . 0 ) / RECONSTRUCTION_DEADLINE . 1 ;
763763 let reconstruction_deadline = Duration :: from_millis ( reconstruction_deadline_millis) ;
764- if let Some ( seconds_from_current_slot ) =
765- self . slot_clock . seconds_from_current_slot_start ( )
764+ if let Some ( duration_from_current_slot ) =
765+ self . slot_clock . millis_from_current_slot_start ( )
766766 && let Some ( current_slot) = self . slot_clock . now ( )
767- && seconds_from_current_slot >= reconstruction_deadline
767+ && duration_from_current_slot >= reconstruction_deadline
768768 && current_slot == request. slot
769769 {
770770 // If we are at least `reconstruction_deadline` seconds into the current slot,
@@ -1227,4 +1227,116 @@ mod tests {
12271227 // The entry for the block root should be gone.
12281228 assert ! ( queue. awaiting_lc_updates_per_parent_root. is_empty( ) ) ;
12291229 }
1230+
1231+ async fn test_reconstruction_immediate_at_deadline ( slot_duration_secs : u64 ) {
1232+ let config = BeaconProcessorConfig :: default ( ) ;
1233+ let ( ready_work_tx, _) = mpsc:: channel :: < ReadyWork > ( config. max_scheduled_work_queue_len ) ;
1234+ let ( _, reprocess_work_rx) =
1235+ mpsc:: channel :: < ReprocessQueueMessage > ( config. max_scheduled_work_queue_len ) ;
1236+ let slot_clock = Arc :: new ( testing_slot_clock ( slot_duration_secs) ) ;
1237+ let mut queue = ReprocessQueue :: new ( ready_work_tx, reprocess_work_rx, slot_clock) ;
1238+
1239+ let slot_duration = queue. slot_clock . slot_duration ( ) ;
1240+ let reconstruction_deadline_millis = ( slot_duration. as_millis ( ) as u64
1241+ * RECONSTRUCTION_DEADLINE . 0 )
1242+ / RECONSTRUCTION_DEADLINE . 1 ;
1243+ let reconstruction_deadline = Duration :: from_millis ( reconstruction_deadline_millis) ;
1244+
1245+ // Advance time to just after the deadline
1246+ advance_time (
1247+ & queue. slot_clock ,
1248+ reconstruction_deadline + Duration :: from_millis ( 10 ) ,
1249+ )
1250+ . await ;
1251+
1252+ let current_slot = queue. slot_clock . now ( ) . unwrap ( ) ;
1253+ let block_root = Hash256 :: repeat_byte ( 0xaa ) ;
1254+
1255+ // Queue a reconstruction for the current slot after the deadline
1256+ let reconstruction_request = QueuedColumnReconstruction {
1257+ block_root,
1258+ slot : current_slot,
1259+ process_fn : Box :: pin ( async { } ) ,
1260+ } ;
1261+ queue. handle_message ( InboundEvent :: Msg (
1262+ ReprocessQueueMessage :: DelayColumnReconstruction ( reconstruction_request) ,
1263+ ) ) ;
1264+
1265+ assert_eq ! ( queue. queued_column_reconstructions. len( ) , 1 ) ;
1266+
1267+ // Should be immediately ready (0 delay since we're past deadline)
1268+ let ready_msg = queue. next ( ) . await . unwrap ( ) ;
1269+ assert ! ( matches!(
1270+ ready_msg,
1271+ InboundEvent :: ReadyColumnReconstruction ( _)
1272+ ) ) ;
1273+
1274+ if let InboundEvent :: ReadyColumnReconstruction ( reconstruction) = ready_msg {
1275+ assert_eq ! ( reconstruction. block_root, block_root) ;
1276+ queue. handle_message ( InboundEvent :: ReadyColumnReconstruction ( reconstruction) ) ;
1277+ }
1278+
1279+ assert ! ( queue. queued_column_reconstructions. is_empty( ) ) ;
1280+ }
1281+
1282+ /// Tests that column reconstruction queued after the deadline is triggered immediately
1283+ /// on mainnet (12s slots).
1284+ ///
1285+ /// When a reconstruction for the current slot is queued after the reconstruction deadline
1286+ /// (1/4 of slot duration = 3s for mainnet), it should be processed immediately with 0 delay.
1287+ #[ tokio:: test]
1288+ async fn column_reconstruction_immediate_processing_at_deadline_mainnet ( ) {
1289+ tokio:: time:: pause ( ) ;
1290+ test_reconstruction_immediate_at_deadline ( 12 ) . await ;
1291+ }
1292+
1293+ /// Tests that column reconstruction queued after the deadline is triggered immediately
1294+ /// on Gnosis (5s slots).
1295+ ///
1296+ /// When a reconstruction for the current slot is queued after the reconstruction deadline
1297+ /// (1/4 of slot duration = 1.25s for Gnosis), it should be processed immediately with 0 delay.
1298+ #[ tokio:: test]
1299+ async fn column_reconstruction_immediate_processing_at_deadline_gnosis ( ) {
1300+ tokio:: time:: pause ( ) ;
1301+ test_reconstruction_immediate_at_deadline ( 5 ) . await ;
1302+ }
1303+
1304+ /// Tests that column reconstruction uses the standard delay when queued before the deadline.
1305+ ///
1306+ /// When a reconstruction for the current slot is queued before the deadline, it should wait
1307+ /// for the standard QUEUED_RECONSTRUCTION_DELAY (150ms) before being triggered.
1308+ #[ tokio:: test]
1309+ async fn column_reconstruction_uses_standard_delay ( ) {
1310+ tokio:: time:: pause ( ) ;
1311+
1312+ let mut queue = test_queue ( ) ;
1313+ let current_slot = queue. slot_clock . now ( ) . unwrap ( ) ;
1314+ let block_root = Hash256 :: repeat_byte ( 0xcc ) ;
1315+
1316+ // Queue a reconstruction at the start of the slot (before deadline)
1317+ let reconstruction_request = QueuedColumnReconstruction {
1318+ block_root,
1319+ slot : current_slot,
1320+ process_fn : Box :: pin ( async { } ) ,
1321+ } ;
1322+ queue. handle_message ( InboundEvent :: Msg (
1323+ ReprocessQueueMessage :: DelayColumnReconstruction ( reconstruction_request) ,
1324+ ) ) ;
1325+
1326+ assert_eq ! ( queue. queued_column_reconstructions. len( ) , 1 ) ;
1327+
1328+ // Advance time by QUEUED_RECONSTRUCTION_DELAY
1329+ advance_time ( & queue. slot_clock , QUEUED_RECONSTRUCTION_DELAY ) . await ;
1330+
1331+ // Should be ready after the standard delay
1332+ let ready_msg = queue. next ( ) . await . unwrap ( ) ;
1333+ assert ! ( matches!(
1334+ ready_msg,
1335+ InboundEvent :: ReadyColumnReconstruction ( _)
1336+ ) ) ;
1337+
1338+ if let InboundEvent :: ReadyColumnReconstruction ( reconstruction) = ready_msg {
1339+ assert_eq ! ( reconstruction. block_root, block_root) ;
1340+ }
1341+ }
12301342}
0 commit comments