Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions crates/relayer/src/event/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ impl<T> EventBus<T> {
}
}

// Remove all disconnected subscribers
for idx in disconnected {
// Remove all disconnected subscribers in reverse order.
// This is critical: removing in ascending order would shift indices
// and cause wrong elements to be removed.
for idx in disconnected.into_iter().rev() {
self.txs.remove(idx);
}
}
Expand Down Expand Up @@ -117,4 +119,38 @@ mod tests {

assert_eq!(counter(), 20);
}

#[test]
#[serial]
fn multiple_disconnected_subscribers() {
reset_counter();

let mut bus: EventBus<i32> = EventBus::new();

// Create 5 subscribers
let rx0 = bus.subscribe();
let rx1 = bus.subscribe();
let rx2 = bus.subscribe();
let rx3 = bus.subscribe();
let rx4 = bus.subscribe();

assert_eq!(bus.txs.len(), 5);

// Drop subscribers at indices 1 and 3 (non-contiguous)
// This tests that index removal works correctly
drop(rx1);
drop(rx3);

// Broadcast should succeed for remaining subscribers (rx0, rx2, rx4)
// and clean up disconnected ones
bus.broadcast(42);

// Verify bus state is correct after cleanup - disconnected senders removed
assert_eq!(bus.txs.len(), 3);

// Verify remaining subscribers received the message
assert_eq!(rx0.recv(), Ok(42));
assert_eq!(rx2.recv(), Ok(42));
assert_eq!(rx4.recv(), Ok(42));
}
}
2 changes: 1 addition & 1 deletion crates/relayer/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ fn process_batch<Chain: ChainHandle>(
trace!(
"skipping events for '{}': destination chain '{}' is not registered",
object.short_name(),
object.src_chain_id()
object.dst_chain_id()
);

continue;
Expand Down
17 changes: 14 additions & 3 deletions crates/relayer/src/worker/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,24 @@ fn handle_packet_cmd<ChainA: ChainHandle, ChainB: ChainHandle>(
WorkerCmd::IbcEvents { batch } if link.a_to_b.channel().ordering == Ordering::Ordered => {
let lowest_sequence = lowest_sequence(&batch.events);

let next_sequence = query_next_sequence_receive(
let next_sequence = match query_next_sequence_receive(
link.a_to_b.dst_chain(),
link.a_to_b.dst_port_id(),
link.a_to_b.dst_channel_id(),
QueryHeight::Specific(batch.height),
)
.ok();
) {
Ok(seq) => Some(seq),
Err(e) => {
// Log the error but continue - clearing will be triggered due to
// None < Some(x) being true, which is the safe fallback behavior
warn!(
"failed to query next_sequence_receive for ordered channel, \
will trigger clearing as fallback: {}",
e
);
None
}
};

if *should_clear_on_start || next_sequence < lowest_sequence {
handle_clear_packet(link, clear_interval, path, Some(batch.height), clear_limit)?;
Expand Down