@@ -62,6 +62,13 @@ use super::{
6262use crate :: engines:: hbbft:: hbbft_message_memorium:: HbbftMessageDispatcher ;
6363use std:: { ops:: Deref , sync:: atomic:: Ordering } ;
6464
65+ // Internal representation for storing deferred outgoing consensus messages.
66+ struct StoredOutgoingMessage {
67+ block_number : BlockNumber ,
68+ data : Vec < u8 > ,
69+ recipients : Vec < H512 > ,
70+ }
71+
6572type TargetedMessage = hbbft:: TargetedMessage < Message , NodeId > ;
6673
6774/// A message sent between validators that is part of Honey Badger BFT or the block sealing process.
@@ -104,6 +111,13 @@ pub struct HoneyBadgerBFT {
104111 early_epoch_manager : Mutex < Option < HbbftEarlyEpochEndManager > > ,
105112 hbbft_engine_cache : Mutex < HbbftEngineCache > ,
106113 delayed_hbbft_join : AtomicBool ,
114+
115+ // When true, outgoing consensus messages are deferred and stored for later delivery.
116+ defer_outgoing_messages : AtomicBool ,
117+ // Storage for deferred outgoing messages ready to be delivered later.
118+ stored_outgoing_messages : Mutex < Vec < StoredOutgoingMessage > > ,
119+ // Phoenix recovery protocol: ensure we reset HoneyBadger only once before resuming sending.
120+ phoenix_reset_performed : AtomicBool ,
107121}
108122
109123struct TransitionHandler {
@@ -177,6 +191,8 @@ const ENGINE_DELAYED_UNITL_SYNCED_TOKEN: TimerToken = 3;
177191// Some Operations have no urge on the timing, but are rather expensive.
178192// those are handeled by this slow ticking timer.
179193const ENGINE_VALIDATOR_CANDIDATE_ACTIONS : TimerToken = 4 ;
194+ // Check for current Phoenix Protocol phase
195+ const ENGINE_PHOENIX_CHECK : TimerToken = 5 ;
180196
181197impl TransitionHandler {
182198 fn handle_shutdown_on_missing_block_import (
@@ -362,6 +378,11 @@ impl IoHandler<()> for TransitionHandler {
362378
363379 // io.channel()
364380 // io.register_stream()
381+
382+ io. register_timer ( ENGINE_PHOENIX_CHECK , Duration :: from_secs ( 10 ) )
383+ . unwrap_or_else (
384+ |e| warn ! ( target: "consensus" , "ENGINE_PHOENIX_CHECK Timer failed: {}." , e) ,
385+ ) ;
365386 }
366387
367388 fn timeout ( & self , io : & IoContext < ( ) > , timer : TimerToken ) {
@@ -449,11 +470,131 @@ impl IoHandler<()> for TransitionHandler {
449470 if let Err ( err) = self . engine . do_validator_engine_actions ( ) {
450471 error ! ( target: "consensus" , "do_validator_engine_actions failed: {:?}" , err) ;
451472 }
473+ } else if timer == ENGINE_PHOENIX_CHECK {
474+ self . engine . handle_phoenix_recovery_protocol ( ) ;
452475 }
453476 }
454477}
455478
456479impl HoneyBadgerBFT {
480+ // Phoenix recovery protocol parameters
481+ // Start deferring and reset HoneyBadger after this many seconds without a new block.
482+ const PHOENIX_DEFER_AFTER_SECS : i64 = 600 ;
483+ // Add this number to PHOENIX_DEFER_AFTER_SECS for each try after the first try to
484+ // incrementally increase the time for the next block creation attempt.
485+ // If 'n' is the current try, starting at 0 then:
486+ // Try(0): PHOENIX_DEFER_AFTER_SECS
487+ // Try(n): Try(n-1) + PHOENIX_DEFER_AFTER_SECS + n * PHOENIX_DEFER_INCREMENT_SECS
488+ const PHOENIX_DEFER_INCREMENT_SECS : i64 = 120 ;
489+ // Resume sending and deliver deferred messages after this many seconds.
490+ const PHOENIX_RESUME_AFTER_SECS : i64 = 120 ;
491+ // Timeout for trying to acquire the hbbft_state lock to reset HoneyBadger, in milliseconds.
492+ const PHOENIX_LOCK_TIMEOUT_MS : u64 = 100 ;
493+
494+ /// Phoenix recovery protocol
495+ /// Called periodically to detect stalls and perform a controlled recovery.
496+ /// Retry recovery every n*PHOENIX_DEFER_AFTER_SECS by deferring and resetting,
497+ /// and resume sending messages every n*PHOENIX_DEFER_AFTER_SECS + PHOENIX_RESUME_AFTER_SECS.
498+ fn handle_phoenix_recovery_protocol ( & self ) {
499+ if let Some ( client) = self . client_arc ( ) {
500+ // Skip if still syncing.
501+ if self . is_syncing ( & client) {
502+ return ;
503+ }
504+
505+ match client. block_header ( BlockId :: Latest ) {
506+ Some ( h) => {
507+ let ts = h. timestamp ( ) as i64 ;
508+ let now_ts = unix_now_secs ( ) as i64 ;
509+ let diff_secs = now_ts - ts;
510+
511+ let defer_after = Self :: PHOENIX_DEFER_AFTER_SECS ;
512+ let resume_after = Self :: PHOENIX_RESUME_AFTER_SECS ;
513+
514+ if diff_secs >= defer_after {
515+ // Determine the current recovery cycle index n and its boundaries using
516+ // increasing delays per try. The cumulative start time S_n is defined as:
517+ // S_0 = PHOENIX_DEFER_AFTER_SECS
518+ // S_n = S_{n-1} + PHOENIX_DEFER_AFTER_SECS + n * PHOENIX_DEFER_INCREMENT_SECS
519+ let mut n: i64 = 0 ;
520+ let mut cycle_start: i64 = defer_after; // S_0
521+ let next_cycle_start: i64 ;
522+ loop {
523+ let incr = defer_after + ( n + 1 ) * Self :: PHOENIX_DEFER_INCREMENT_SECS ;
524+ let candidate_next = cycle_start + incr; // S_{n+1}
525+ if diff_secs >= candidate_next {
526+ n += 1 ;
527+ cycle_start = candidate_next; // advance to next cycle
528+ continue ;
529+ } else {
530+ next_cycle_start = candidate_next;
531+ break ;
532+ }
533+ }
534+ let cycle_resume = cycle_start + resume_after;
535+
536+ if diff_secs < cycle_resume {
537+ // We are within the deferring window of the current cycle.
538+ self . set_defer_outgoing_messages ( true ) ;
539+
540+ // Ensure we reset the HoneyBadger instance only once during a deferring window.
541+ if !self . phoenix_reset_performed . load ( Ordering :: SeqCst ) {
542+ match self . hbbft_state . try_write_for ( Duration :: from_millis (
543+ Self :: PHOENIX_LOCK_TIMEOUT_MS ,
544+ ) ) {
545+ Some ( mut state) => {
546+ if state. reset_honeybadger ( ) . is_some ( ) {
547+ // Also reset the sealing protocol state to avoid mixing signatures
548+ // from the previous block creation attempt.
549+ self . sealing . write ( ) . clear ( ) ;
550+
551+ self . phoenix_reset_performed
552+ . store ( true , Ordering :: SeqCst ) ;
553+ warn ! ( target: "consensus" , "Phoenix Protocol: Deferred outgoing messages, reset HoneyBadger and cleared sealing state ({}s since last block; cycle n={}, window [{}..{}))" , diff_secs, n, cycle_start, cycle_resume) ;
554+ } else {
555+ warn ! ( target: "consensus" , "Phoenix Protocol: Deferred outgoing messages but failed to reset HoneyBadger ({}s since last block; cycle n={}, window [{}..{}))" , diff_secs, n, cycle_start, cycle_resume) ;
556+ }
557+ }
558+ None => {
559+ warn ! ( target: "consensus" , "Phoenix Protocol: Could not acquire hbbft_state lock to reset HoneyBadger while deferring messages ({}s since last block; cycle n={}, window [{}..{}))" , diff_secs, n, cycle_start, cycle_resume) ;
560+ }
561+ }
562+ }
563+ } else if diff_secs < next_cycle_start {
564+ // We are in the resume window of the current cycle.
565+ if self . phoenix_reset_performed . load ( Ordering :: SeqCst ) {
566+ self . set_defer_outgoing_messages ( false ) ;
567+ self . deliver_stored_outgoing_messages ( ) ;
568+ // Allow the next cycle to perform a reset again if needed.
569+ self . phoenix_reset_performed . store ( false , Ordering :: SeqCst ) ;
570+ warn ! ( target: "consensus" , "Phoenix Protocol: Resumed sending and delivered deferred messages ({}s since last block; cycle n={}, resume @ {}, next_cycle @ {})" , diff_secs, n, cycle_resume, next_cycle_start) ;
571+ } else {
572+ warn ! ( target: "consensus" , "Phoenix Protocol: Expecting block to be generated ({}s since last block; cycle n={}, resume @ {}, next_cycle @ {})" , diff_secs, n, cycle_resume, next_cycle_start) ;
573+ }
574+ }
575+ } else {
576+ // A new block has been imported while recovery protocol was active.
577+ // Clean up recovery state: stop deferring and deliver any stored messages.
578+ if self . defer_outgoing_messages . load ( Ordering :: SeqCst )
579+ || self . phoenix_reset_performed . load ( Ordering :: SeqCst )
580+ {
581+ self . set_defer_outgoing_messages ( false ) ;
582+ self . deliver_stored_outgoing_messages ( ) ;
583+ self . phoenix_reset_performed . store ( false , Ordering :: SeqCst ) ;
584+ warn ! ( target: "consensus" , "Phoenix Protocol: Cleaned up recovery state after new block ({}s since last block < defer threshold {})" , diff_secs, defer_after) ;
585+ }
586+ }
587+
588+ // Always log the latest timestamp and diff for visibility.
589+ trace ! ( target: "consensus" , "Phoenix Protocol: latest block timestamp: {} (diff_secs: {})" , h. timestamp( ) , diff_secs) ;
590+ }
591+ None => {
592+ error ! ( target: "consensus" , "Phoenix Protocol: No latest block header available." ) ;
593+ }
594+ }
595+ }
596+ }
597+
457598 /// Creates an instance of the Honey Badger BFT Engine.
458599 pub fn new ( params : HbbftParams , machine : EthereumMachine ) -> Result < Arc < Self > , Error > {
459600 let is_unit_test = params. is_unit_test . unwrap_or ( false ) ;
@@ -493,6 +634,10 @@ impl HoneyBadgerBFT {
493634 early_epoch_manager : Mutex :: new ( None ) ,
494635 hbbft_engine_cache : Mutex :: new ( HbbftEngineCache :: new ( ) ) ,
495636 delayed_hbbft_join : AtomicBool :: new ( false ) ,
637+
638+ defer_outgoing_messages : AtomicBool :: new ( false ) ,
639+ stored_outgoing_messages : Mutex :: new ( Vec :: new ( ) ) ,
640+ phoenix_reset_performed : AtomicBool :: new ( false ) ,
496641 } ) ;
497642
498643 if !engine. params . is_unit_test . unwrap_or ( false ) {
@@ -753,27 +898,44 @@ impl HoneyBadgerBFT {
753898 for m in messages {
754899 let ser =
755900 rmp_serde:: to_vec ( & m. message ) . expect ( "Serialization of consensus message failed" ) ;
901+
902+ // Determine recipients based on the target, excluding ourselves.
903+ let mut recipients: Vec < H512 > = Vec :: new ( ) ;
756904 match m. target {
757905 Target :: Nodes ( set) => {
758- trace ! ( target: "consensus" , "Dispatching message {:?} to {:?}" , m. message, set) ;
759906 for node_id in set. into_iter ( ) . filter ( |p| p != net_info. our_id ( ) ) {
760- let block_num = m. message . block_number ( ) ;
761- trace ! ( target: "consensus" , "Sending message to {} for block #{} " , node_id. 0 , block_num) ;
762- client. send_consensus_message ( block_num, ser. clone ( ) , Some ( node_id. 0 ) ) ;
907+ recipients. push ( node_id. 0 ) ;
763908 }
764909 }
765910 Target :: AllExcept ( set) => {
766- trace ! ( target: "consensus" , "Dispatching exclusive message {:?} to all except {:?}" , m. message, set) ;
767911 for node_id in net_info
768912 . all_ids ( )
769913 . filter ( |p| ( p != & net_info. our_id ( ) && !set. contains ( p) ) )
770914 {
771- let block_num = m. message . block_number ( ) ;
772- trace ! ( target: "consensus" , "Sending exclusive message to {} for block #{}" , node_id. 0 , block_num) ;
773- client. send_consensus_message ( block_num, ser. clone ( ) , Some ( node_id. 0 ) ) ;
915+ recipients. push ( node_id. 0 ) ;
774916 }
775917 }
776918 }
919+
920+ let block_number = m. message . block_number ( ) ;
921+
922+ if self . defer_outgoing_messages . load ( Ordering :: SeqCst ) {
923+ // Store for deferred delivery
924+ warn ! ( target: "consensus" , "Phoenix Protocol: Storing message for deferred sending for block #{} " , block_number) ;
925+ self . stored_outgoing_messages
926+ . lock ( )
927+ . push ( StoredOutgoingMessage {
928+ block_number,
929+ data : ser,
930+ recipients,
931+ } ) ;
932+ } else {
933+ // Send immediately
934+ for node in recipients {
935+ trace ! ( target: "consensus" , "Sending message to {} for block #{} " , node, block_number) ;
936+ client. send_consensus_message ( block_number, ser. clone ( ) , Some ( node) ) ;
937+ }
938+ }
777939 }
778940 }
779941
@@ -819,6 +981,38 @@ impl HoneyBadgerBFT {
819981 self . process_output ( client, step. output , network_info) ;
820982 }
821983
984+ /// Enables or disables deferring of outgoing consensus messages.
985+ pub fn set_defer_outgoing_messages ( & self , defer : bool ) {
986+ self . defer_outgoing_messages . store ( defer, Ordering :: SeqCst ) ;
987+ }
988+
989+ /// Deliver all stored outgoing consensus messages immediately.
990+ /// If no client is registered yet, the messages remain stored.
991+ pub fn deliver_stored_outgoing_messages ( & self ) {
992+ let client = match self . client_arc ( ) {
993+ Some ( c) => c,
994+ None => {
995+ warn ! ( target: "consensus" , "deliver_stored_outgoing_messages: No client available; keeping messages deferred." ) ;
996+ return ;
997+ }
998+ } ;
999+
1000+ let mut stored = self . stored_outgoing_messages . lock ( ) ;
1001+ if stored. is_empty ( ) {
1002+ return ;
1003+ }
1004+ let mut messages: Vec < StoredOutgoingMessage > = Vec :: with_capacity ( stored. len ( ) ) ;
1005+ std:: mem:: swap ( & mut * stored, & mut messages) ;
1006+ drop ( stored) ;
1007+
1008+ for msg in messages. into_iter ( ) {
1009+ for node in msg. recipients . iter ( ) {
1010+ trace ! ( target: "consensus" , "Delivering deferred message to {}" , node) ;
1011+ client. send_consensus_message ( msg. block_number , msg. data . clone ( ) , Some ( * node) ) ;
1012+ }
1013+ }
1014+ }
1015+
8221016 /// Conditionally joins the current hbbft epoch if the number of received
8231017 /// contributions exceeds the maximum number of tolerated faulty nodes.
8241018 fn join_hbbft_epoch ( & self ) -> Result < ( ) , EngineError > {
0 commit comments