@@ -455,6 +455,10 @@ async fn handle_broadcasts(
455455 break ;
456456 }
457457 } ,
458+ _ = join_set. next( ) => {
459+ // drains the joinset
460+ continue ;
461+ } ,
458462 _ = bcast_interval. tick( ) => {
459463 Branch :: BroadcastDeadline
460464 } ,
@@ -557,10 +561,6 @@ async fn handle_broadcasts(
557561 }
558562 }
559563
560- while join_set. try_join_next ( ) . is_some ( ) {
561- // we're draining the join_set, even though it's not strictly necessary
562- }
563-
564564 let prev_rate_limited = rate_limited;
565565
566566 // start with local broadcasts, they're higher priority
@@ -572,6 +572,9 @@ async fn handle_broadcasts(
572572 let mut spawn_count = 0 ;
573573 let mut ring0_count = 0 ;
574574 for addr in members. ring0 ( agent. cluster_id ( ) ) {
575+ if join_set. len ( ) >= MAX_INFLIGHT_BROADCAST {
576+ break ;
577+ }
575578 ring0_count += 1 ;
576579
577580 match try_transmit_broadcast (
@@ -617,7 +620,7 @@ async fn handle_broadcasts(
617620 counter ! ( "corro.broadcast.spawn" , "type" => "local" ) . increment ( spawn_count) ;
618621 }
619622
620- if !rate_limited {
623+ if !rate_limited && !to_broadcast . is_empty ( ) && join_set . len ( ) < MAX_INFLIGHT_BROADCAST {
621624 let ( members_count, ring0_count) = {
622625 let members = agent. members ( ) . read ( ) ;
623626 let members_count = members. states . len ( ) ;
@@ -667,7 +670,10 @@ async fn handle_broadcasts(
667670 . choose_multiple (
668671 & mut rng,
669672 // prevent going over max count
670- cmp:: min ( choose_count, MAX_INFLIGHT_BROADCAST - join_set. len ( ) ) ,
673+ cmp:: min (
674+ choose_count,
675+ MAX_INFLIGHT_BROADCAST . saturating_sub ( join_set. len ( ) ) ,
676+ ) ,
671677 )
672678 } ;
673679
0 commit comments