@@ -5,7 +5,9 @@ use crate::{
55 status:: { handle_error, Status , StatusSender } ,
66 sv1:: {
77 downstream:: downstream:: Downstream ,
8- sv1_server:: { channel:: Sv1ServerChannelState , KEEPALIVE_JOB_ID_DELIMITER } ,
8+ sv1_server:: {
9+ channel:: Sv1ServerChannelState , is_mining_authorize, KEEPALIVE_JOB_ID_DELIMITER ,
10+ } ,
911 } ,
1012 utils:: AGGREGATED_CHANNEL_ID ,
1113} ;
@@ -41,7 +43,7 @@ use stratum_apps::{
4143 } ,
4244 sv2_to_sv1:: { build_sv1_notify_from_sv2, build_sv1_set_difficulty_from_sv2_target} ,
4345 } ,
44- sv1_api:: { json_rpc , server_to_client, utils:: HexU32Be , IsServer } ,
46+ sv1_api:: { server_to_client, utils:: HexU32Be , IsServer } ,
4547 } ,
4648 task_manager:: TaskManager ,
4749 utils:: types:: { ChannelId , DownstreamId , Hashrate , RequestId , SharesPerMinute } ,
@@ -346,9 +348,11 @@ impl Sv1Server {
346348 return Ok ( ( ) ) ;
347349 }
348350
351+ let is_authorize = is_mining_authorize ( & downstream_message) ;
352+
349353 let response = self
350354 . clone ( )
351- . handle_message ( Some ( downstream_id) , downstream_message. clone ( ) ) ;
355+ . handle_message ( Some ( downstream_id) , downstream_message) ;
352356
353357 match response {
354358 Ok ( Some ( response_msg) ) => {
@@ -370,13 +374,11 @@ impl Sv1Server {
370374 } ) ?;
371375
372376 // Check if this was an authorize message and handle sv1 handshake completion
373- if let json_rpc:: Message :: StandardRequest ( request) = & downstream_message {
374- if request. method == "mining.authorize" {
375- info ! ( "Down: Handling mining.authorize after handshake completion" ) ;
376- if let Err ( e) = downstream. handle_sv1_handshake_completion ( ) . await {
377- error ! ( "Down: Failed to handle handshake completion: {:?}" , e) ;
378- return Err ( TproxyError :: disconnect ( e, downstream_id) ) ;
379- }
377+ if is_authorize {
378+ info ! ( "Down: Handling mining.authorize after handshake completion" ) ;
379+ if let Err ( e) = downstream. handle_sv1_handshake_completion ( ) . await {
380+ error ! ( "Down: Failed to handle handshake completion: {:?}" , e) ;
381+ return Err ( TproxyError :: disconnect ( e, downstream_id) ) ;
380382 }
381383 }
382384 }
@@ -570,11 +572,11 @@ impl Sv1Server {
570572 . map_err ( TproxyError :: shutdown) ?;
571573
572574 // Process all queued messages now that channel is established
573- if let Ok ( queued_messages) = downstream. downstream_data . safe_lock ( |d| {
574- let messages = d . queued_sv1_handshake_messages . clone ( ) ;
575- d . queued_sv1_handshake_messages . clear ( ) ;
576- messages
577- } ) {
575+ let queued_messages = downstream
576+ . downstream_data
577+ . safe_lock ( |d| std :: mem :: take ( & mut d . queued_sv1_handshake_messages ) )
578+ . ok ( ) ;
579+ if let Some ( queued_messages ) = queued_messages {
578580 if !queued_messages. is_empty ( ) {
579581 info ! (
580582 "Processing {} queued Sv1 messages for downstream {}" ,
@@ -586,23 +588,50 @@ impl Sv1Server {
586588 downstream
587589 . processing_queued_sv1_handshake_responses
588590 . store ( true , Ordering :: SeqCst ) ;
591+ let downstream_sv1_sender = downstream
592+ . downstream_channel_state
593+ . downstream_sv1_sender
594+ . clone ( ) ;
589595
590596 for message in queued_messages {
591- if let Ok ( Some ( response_msg) ) =
592- self . clone ( ) . handle_message ( Some ( downstream_id) , message)
593- {
594- self . sv1_server_channel_state
595- . sv1_server_to_downstream_sender
596- . send ( (
597- m. channel_id ,
598- Some ( downstream_id) ,
599- response_msg. into ( ) ,
600- ) )
601- . map_err ( |_| {
602- TproxyError :: shutdown (
603- TproxyErrorKind :: ChannelErrorSender ,
604- )
605- } ) ?;
597+ let is_authorize = is_mining_authorize ( & message) ;
598+ let response =
599+ self . clone ( ) . handle_message ( Some ( downstream_id) , message) ;
600+ match response {
601+ Ok ( Some ( response_msg) ) => {
602+ downstream_sv1_sender. send ( response_msg. into ( ) ) . await
603+ . map_err ( |e| {
604+ error ! (
605+ "Down: Failed to send message to downstream: {e:?}"
606+ ) ;
607+ TproxyError :: disconnect (
608+ TproxyErrorKind :: ChannelErrorSender , downstream_id
609+ )
610+ } ) ?;
611+
612+ if is_authorize {
613+ info ! ( "Down: Handling mining.authorize after upstream channel is open" ) ;
614+ if let Err ( e) =
615+ downstream. handle_sv1_handshake_completion ( ) . await
616+ {
617+ error ! (
618+ "Down: Failed to handle handshake completion: {:?}" ,
619+ e
620+ ) ;
621+ return Err ( TproxyError :: disconnect (
622+ e,
623+ downstream_id,
624+ ) ) ;
625+ }
626+ }
627+ }
628+ Ok ( None ) => {
629+ // Message was handled but no response needed
630+ }
631+ Err ( e) => {
632+ error ! ( "Down: Error handling downstream message: {:?}" , e) ;
633+ return Err ( TproxyError :: disconnect ( e, downstream_id) ) ;
634+ }
606635 }
607636 }
608637 }
0 commit comments