Skip to content

Commit 784c993

Browse files
committed
re-queue broadcast even if we didn't send it out
Signed-off-by: Somtochi Onyekwere <[email protected]>
1 parent 3e0626d commit 784c993

File tree

1 file changed

+16
-22
lines changed
  • crates/corro-agent/src/broadcast

1 file changed

+16
-22
lines changed

crates/corro-agent/src/broadcast/mod.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,6 @@ async fn handle_broadcasts(
723723
)
724724
};
725725

726-
let pending_sent_instance = pending.sent_to.len();
727-
728726
let mut spawn_count = 0;
729727
trace!("broadcasting to: {:?}", broadcast_to);
730728
for addr in broadcast_to {
@@ -764,26 +762,22 @@ async fn handle_broadcasts(
764762

765763
counter!("corro.broadcast.spawn", "type" => "global").increment(spawn_count);
766764

767-
if pending_sent_instance != pending.sent_to.len() {
768-
// we've sent this to at least 1 member...
769-
770-
if let Some(send_count) = pending.send_count.checked_add(1) {
771-
trace!("send_count: {send_count}, max_transmissions: {max_transmissions}");
772-
pending.send_count = send_count;
773-
774-
if send_count < max_transmissions {
775-
debug!("queueing for re-send");
776-
idle_pendings.push(Box::pin(async move {
777-
// slow our send pace if we've been previously rate limited
778-
let sleep_ms_base = if prev_rate_limited { 500 } else { 100 };
779-
// send with increasing latency as we've already sent the updates out
780-
tokio::time::sleep(Duration::from_millis(
781-
sleep_ms_base * send_count as u64,
782-
))
783-
.await;
784-
pending
785-
}));
786-
}
765+
if let Some(send_count) = pending.send_count.checked_add(1) {
766+
trace!("send_count: {send_count}, max_transmissions: {max_transmissions}");
767+
pending.send_count = send_count;
768+
769+
if send_count < max_transmissions {
770+
debug!("queueing for re-send");
771+
idle_pendings.push(Box::pin(async move {
772+
// slow our send pace if we've been previously rate limited
773+
let sleep_ms_base = if prev_rate_limited { 500 } else { 100 };
774+
// send with increasing latency as we've already sent the updates out
775+
tokio::time::sleep(Duration::from_millis(
776+
sleep_ms_base * send_count as u64,
777+
))
778+
.await;
779+
pending
780+
}));
787781
}
788782
}
789783
}

0 commit comments

Comments
 (0)