@@ -16,8 +16,8 @@ const MAX_FRAME_SIZE: u32 = 1024 * 1024;
1616// Handshake timeout per message (increased for large McEliece keys)
1717const HANDSHAKE_TIMEOUT : Duration = Duration :: from_secs ( 30 ) ;
1818
19- // Frame read timeout (30 seconds )
20- const FRAME_READ_TIMEOUT : Duration = Duration :: from_secs ( 30 ) ;
19+ // Frame read timeout (must exceed heartbeat interval to allow pongs during idle periods )
20+ const FRAME_READ_TIMEOUT : Duration = Duration :: from_secs ( 60 ) ;
2121use super :: {
2222 ConnectionError , ConnectionManager , Frame , FrameCodec , FrameError , FrameType , HandshakeContext ,
2323 HandshakeError , Protocol ,
@@ -987,13 +987,10 @@ impl SourceContainer {
987987
988988 /// Spawns background task to send periodic heartbeat pings
989989 async fn spawn_heartbeat_task ( & self ) -> Result < ( ) , SourceError > {
990- if !self . config . reconnection_enabled {
991- return Ok ( ( ) ) ;
992- }
993-
994990 let interval = Duration :: from_millis ( self . config . heartbeat_interval_ms ) ;
995991 let timeout_duration = Duration :: from_millis ( self . config . heartbeat_timeout_ms ) ;
996992 let max_missed = self . config . max_missed_pongs ;
993+ let reconnection_enabled = self . config . reconnection_enabled ;
997994
998995 let state = self . heartbeat_state . clone ( ) ;
999996 let container = Arc :: new ( self . clone ( ) ) ;
@@ -1036,8 +1033,9 @@ impl SourceContainer {
10361033 if total >= max_missed {
10371034 tracing:: error!( "Heartbeat failed: {} consecutive timeouts" , total) ;
10381035 health. set_status( HealthStatus :: Unhealthy ) . await ;
1039- // Trigger reconnection
1040- container. reconnection_needed. store( true , Ordering :: SeqCst ) ;
1036+ if reconnection_enabled {
1037+ container. reconnection_needed. store( true , Ordering :: SeqCst ) ;
1038+ }
10411039 } else if total >= max_missed / 2 {
10421040 health. set_status( HealthStatus :: Degraded ) . await ;
10431041 }
@@ -1100,6 +1098,7 @@ impl SourceContainer {
11001098 conn_id
11011099 ) ;
11021100 self . unregister_connection ( conn_id) . await ;
1101+ self . connection_manager . remove_connection ( conn_id) . await ;
11031102 return Err ( SourceError :: ConfigError (
11041103 "Channel closed while waiting for CONNECT_ACK" . to_string ( ) ,
11051104 ) ) ;
@@ -1111,17 +1110,20 @@ impl SourceContainer {
11111110 fin_frame. set_fin ( ) ;
11121111 let _ = self . send_frame ( fin_frame) . await ;
11131112 self . unregister_connection ( conn_id) . await ;
1113+ self . connection_manager . remove_connection ( conn_id) . await ;
11141114 return Ok ( ( ) ) ; // Clean exit
11151115 }
11161116 } ;
11171117
11181118 if ack_frame. frame_type ( ) != FrameType :: ConnectAck {
11191119 self . unregister_connection ( conn_id) . await ;
1120+ self . connection_manager . remove_connection ( conn_id) . await ;
11201121 return Err ( SourceError :: ConfigError ( "Expected CONNECT_ACK" . to_string ( ) ) ) ;
11211122 }
11221123
11231124 if ack_frame. payload ( ) . is_empty ( ) || ack_frame. payload ( ) [ 0 ] == 0 {
11241125 self . unregister_connection ( conn_id) . await ;
1126+ self . connection_manager . remove_connection ( conn_id) . await ;
11251127 return Err ( SourceError :: ConfigError ( "Connection rejected" . to_string ( ) ) ) ;
11261128 }
11271129
@@ -1186,9 +1188,13 @@ impl SourceContainer {
11861188 }
11871189 }
11881190
1189- // Cleanup: unregister connection from demux registry
1191+ // Cleanup connection from registry and manager
11901192 self_clone. unregister_connection ( conn_clone. id ( ) ) . await ;
11911193 conn_clone. set_state ( super :: ConnectionState :: Closed ) . await ;
1194+ self_clone
1195+ . connection_manager
1196+ . remove_connection ( conn_clone. id ( ) )
1197+ . await ;
11921198 } ) ;
11931199
11941200 Ok ( ( ) )
0 commit comments