@@ -62,8 +62,7 @@ use crate::{
6262 channel_manager:: downstream_message_handler:: RouteMessageTo ,
6363 config:: JobDeclaratorClientConfig ,
6464 downstream:: Downstream ,
65- error:: { self , JDCError , JDCErrorKind , JDCResult } ,
66- status:: { handle_error, Status , StatusSender } ,
65+ error:: { self , Action , JDCError , JDCErrorKind , JDCResult , LoopControl } ,
6766 utils:: {
6867 AtomicUpstreamState , DownstreamChannelJobId , DownstreamMessage , PendingChannelRequest ,
6968 SharesOrderedByDiff , UpstreamState ,
@@ -271,6 +270,49 @@ pub struct ChannelManager {
271270
272271#[ cfg_attr( not( test) , hotpath:: measure_all) ]
273272impl ChannelManager {
273+ fn handle_error_action (
274+ & self ,
275+ context : & str ,
276+ e : & JDCError < error:: ChannelManager > ,
277+ cancellation_token : & CancellationToken ,
278+ fallback_token : & CancellationToken ,
279+ ) -> LoopControl {
280+ match e. action {
281+ Action :: Log => {
282+ warn ! (
283+ error_kind = ?e. kind,
284+ "{context} returned a log-only error"
285+ ) ;
286+ LoopControl :: Continue
287+ }
288+ Action :: Fallback => {
289+ warn ! (
290+ error_kind = ?e. kind,
291+ "{context} requested fallback"
292+ ) ;
293+ fallback_token. cancel ( ) ;
294+ LoopControl :: Break
295+ }
296+ Action :: Shutdown => {
297+ warn ! (
298+ error_kind = ?e. kind,
299+ "{context} requested shutdown"
300+ ) ;
301+ cancellation_token. cancel ( ) ;
302+ LoopControl :: Break
303+ }
304+ Action :: Disconnect ( downstream_id) => {
305+ warn ! (
306+ downstream_id,
307+ error_kind = ?e. kind,
308+ "{context} requested downstream disconnect"
309+ ) ;
310+ self . remove_downstream ( downstream_id) ;
311+ LoopControl :: Continue
312+ }
313+ }
314+ }
315+
274316 /// Constructor method used to instantiate the Channel Manager
275317 #[ allow( clippy:: too_many_arguments) ]
276318 pub async fn new (
@@ -434,7 +476,6 @@ impl ChannelManager {
434476 task_manager : Arc < TaskManager > ,
435477 cancellation_token : CancellationToken ,
436478 fallback_coordinator : FallbackCoordinator ,
437- status_sender : Sender < Status > ,
438479 channel_manager_sender : Sender < ( DownstreamId , Mining < ' static > , Option < Vec < Tlv > > ) > ,
439480 supported_extensions : Vec < u16 > ,
440481 required_extensions : Vec < u16 > ,
@@ -493,7 +534,6 @@ impl ChannelManager {
493534 let this = Arc :: clone( & this) ;
494535 let cancellation_token_inner = cancellation_token. clone( ) ;
495536 let fallback_coordinator_inner = fallback_coordinator. clone( ) ;
496- let status_sender_inner = status_sender. clone( ) ;
497537 let channel_manager_sender_inner = channel_manager_sender. clone( ) ;
498538 let task_manager_inner = task_manager_clone. clone( ) ;
499539 let supported_extensions_inner = supported_extensions. clone( ) ;
@@ -526,8 +566,7 @@ impl ChannelManager {
526566 Some ( group_channel) => group_channel,
527567 None => {
528568 error!( "Failed to bootstrap group channel - disconnecting downstream {downstream_id}" ) ;
529- let error = JDCError :: <error:: ChannelManager >:: shutdown( JDCErrorKind :: CouldNotInitiateSystem ) ;
530- handle_error( & StatusSender :: ChannelManager ( status_sender_inner) , error) . await ;
569+ cancellation_token_inner. cancel( ) ;
531570 return ;
532571 }
533572 } ;
@@ -558,8 +597,8 @@ impl ChannelManager {
558597 . start(
559598 cancellation_token_inner,
560599 fallback_coordinator_inner,
561- status_sender_inner,
562600 task_manager_inner,
601+ move |downstream_id| this. remove_downstream( downstream_id)
563602 )
564603 . await ;
565604 } ) ;
@@ -587,19 +626,22 @@ impl ChannelManager {
587626 self ,
588627 cancellation_token : CancellationToken ,
589628 fallback_coordinator : FallbackCoordinator ,
590- status_sender : Sender < Status > ,
591629 task_manager : Arc < TaskManager > ,
592630 coinbase_outputs : Vec < TxOut > ,
593631 ) {
594- let status_sender = StatusSender :: ChannelManager ( status_sender) ;
595-
596632 // Serialize coinbase outputs before moving into async block
597633 // todo: should we really be serializing here?
598634 let serialized_coinbase_outputs = consensus:: serialize ( & coinbase_outputs) ;
599635
600636 if let Err ( e) = self . coinbase_output_constraints ( coinbase_outputs) . await {
601637 error ! ( error = ?e, "Failed to send CoinbaseOutputConstraints message to TP" ) ;
602- handle_error ( & status_sender, e) . await ;
638+ if let Action :: Shutdown = e. action {
639+ warn ! (
640+ error_kind = ?e. kind,
641+ "CoinbaseOutputConstraints requested shutdown; cancelling global token"
642+ ) ;
643+ cancellation_token. cancel ( ) ;
644+ }
603645 return ;
604646 }
605647
@@ -610,7 +652,6 @@ impl ChannelManager {
610652
611653 // get the cancellation token that signals fallback
612654 let fallback_token = fallback_coordinator. token ( ) ;
613-
614655 let cm = self . clone ( ) ;
615656 let vd = self . clone ( ) ;
616657 let vardiff_future = vd. run_vardiff_loop ( ) ;
@@ -638,31 +679,51 @@ impl ChannelManager {
638679 res = cm_jds. handle_jds_message( ) => {
639680 if let Err ( e) = res {
640681 error!( error = ?e, "Error handling JDS message" ) ;
641- if handle_error( & status_sender, e) . await {
682+ if let LoopControl :: Break = cm. handle_error_action(
683+ "ChannelManager::handle_jds_message" ,
684+ & e,
685+ & cancellation_token,
686+ & fallback_token,
687+ ) {
642688 break ;
643689 }
644690 }
645691 }
646692 res = cm_pool. handle_pool_message_frame( ) => {
647693 if let Err ( e) = res {
648694 error!( error = ?e, "Error handling Pool message" ) ;
649- if handle_error( & status_sender, e) . await {
695+ if let LoopControl :: Break = cm. handle_error_action(
696+ "ChannelManager::handle_pool_message_frame" ,
697+ & e,
698+ & cancellation_token,
699+ & fallback_token,
700+ ) {
650701 break ;
651702 }
652703 }
653704 }
654705 res = cm_template. handle_template_provider_message( ) => {
655706 if let Err ( e) = res {
656707 error!( error = ?e, "Error handling Template Receiver message" ) ;
657- if handle_error( & status_sender, e) . await {
708+ if let LoopControl :: Break = cm. handle_error_action(
709+ "ChannelManager::handle_template_provider_message" ,
710+ & e,
711+ & cancellation_token,
712+ & fallback_token,
713+ ) {
658714 break ;
659715 }
660716 }
661717 }
662718 res = cm_downstreams. handle_downstream_message( ) => {
663719 if let Err ( e) = res {
664720 error!( error = ?e, "Error handling Downstreams message" ) ;
665- if handle_error( & status_sender, e) . await {
721+ if let LoopControl :: Break = cm. handle_error_action(
722+ "ChannelManager::handle_downstream_message" ,
723+ & e,
724+ & cancellation_token,
725+ & fallback_token,
726+ ) {
666727 break ;
667728 }
668729 }
@@ -680,12 +741,11 @@ impl ChannelManager {
680741 // Given a `downstream_id`, this method:
681742 // 1. Removes the corresponding downstream from the `downstream` map.
682743 #[ allow( clippy:: result_large_err) ]
683- pub fn remove_downstream (
684- & mut self ,
685- downstream_id : DownstreamId ,
686- ) -> JDCResult < ( ) , error:: ChannelManager > {
744+ pub fn remove_downstream ( & self , downstream_id : DownstreamId ) {
687745 self . channel_manager_data . super_safe_lock ( |cm_data| {
688- cm_data. downstream . remove ( & downstream_id) ;
746+ if let Some ( downstream) = cm_data. downstream . remove ( & downstream_id) {
747+ downstream. downstream_cancellation_token . cancel ( ) ;
748+ }
689749 cm_data
690750 . downstream_channel_id_and_job_id_to_template_id
691751 . retain ( |key, _| key. downstream_id != downstream_id) ;
@@ -696,7 +756,6 @@ impl ChannelManager {
696756 self . channel_manager_channel
697757 . downstream_sender
698758 . super_safe_lock ( |map| map. remove ( & downstream_id) ) ;
699- Ok ( ( ) )
700759 }
701760
702761 /// Handles messages received from the JDS subsystem.
0 commit comments