Skip to content

Commit 4647ca0

Browse files
fix(kms-connector): listen from latest block if catchup fail (#1734)
* fix(kms-connector): listen from latest block if catchup fail * chore(kms-connector): add tests
1 parent 96e9391 commit 4647ca0

File tree

1 file changed

+29
-5
lines changed

1 file changed

+29
-5
lines changed

kms-connector/crates/gw-listener/src/core/gw_listener.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,10 @@ where
167167
event_poller.poller = event_poller.poller.with_poll_interval(poll_interval);
168168
info!("✓ Subscribed to {event_type} events");
169169

170-
self.catchup_past_events::<E>(&mut last_block_polled, event_type)
170+
let _ = self
171+
.catchup_past_events::<E>(&mut last_block_polled, event_type)
171172
.await
172-
.map_err(|e| anyhow!("Failed to catch up past {event_type} events: {e}"))?;
173+
.inspect_err(|e| warn!("Failed to catch up past {event_type} events: {e}"));
173174

174175
select! {
175176
_ = self.process_events(event_type, event_poller, &mut last_block_polled) => (),
@@ -407,7 +408,7 @@ mod tests {
407408
#[timeout(Duration::from_secs(90))]
408409
#[tokio::test]
409410
async fn test_reset_filter_stops_listener() {
410-
let (_test_instance, asserter, gw_listener) = test_setup().await;
411+
let (_test_instance, asserter, gw_listener) = test_setup(None).await;
411412

412413
asserter.push_failure(ErrorPayload {
413414
code: -32000,
@@ -418,11 +419,31 @@ mod tests {
418419
gw_listener.subscribe(EventType::KeygenRequest).await;
419420
}
420421

422+
#[rstest::rstest]
423+
#[timeout(Duration::from_secs(90))]
424+
#[tokio::test]
425+
async fn test_failed_catchup_does_not_stop_listener() {
426+
let (mut test_instance, asserter, gw_listener) = test_setup(Some(0)).await;
427+
428+
asserter.push_failure(ErrorPayload {
429+
code: -32002,
430+
message: "request timed out".into(),
431+
data: None,
432+
});
433+
434+
let event_type = EventType::KeygenRequest;
435+
tokio::spawn(gw_listener.subscribe(event_type));
436+
test_instance.wait_for_log("Failed to catch up").await;
437+
test_instance
438+
.wait_for_log(&format!("Waiting for next {event_type}"))
439+
.await;
440+
}
441+
421442
#[rstest::rstest]
422443
#[timeout(Duration::from_secs(90))]
423444
#[tokio::test]
424445
async fn test_listener_ended_by_end_of_any_task() {
425-
let (mut test_instance, _asserter, gw_listener) = test_setup().await;
446+
let (mut test_instance, _asserter, gw_listener) = test_setup(None).await;
426447

427448
// Will stop because some subcription tasks will not be able to init their event filter
428449
gw_listener.start().await;
@@ -438,7 +459,9 @@ mod tests {
438459
RootProvider,
439460
>;
440461

441-
async fn test_setup() -> (TestInstance, Asserter, GatewayListener<MockProvider>) {
462+
async fn test_setup(
463+
from_block_number: Option<u64>,
464+
) -> (TestInstance, Asserter, GatewayListener<MockProvider>) {
442465
let test_instance = TestInstanceBuilder::db_setup().await.unwrap();
443466

444467
// Create a mocked `alloy::Provider`
@@ -452,6 +475,7 @@ mod tests {
452475
let config = Config {
453476
decryption_polling: Duration::from_millis(500),
454477
key_management_polling: Duration::from_millis(500),
478+
from_block_number,
455479
..Default::default()
456480
};
457481
let listener = GatewayListener::new(

0 commit comments

Comments
 (0)