diff --git a/kms-connector/Cargo.lock b/kms-connector/Cargo.lock index 54b7f1bcda..06f2d19052 100644 --- a/kms-connector/Cargo.lock +++ b/kms-connector/Cargo.lock @@ -2982,7 +2982,7 @@ dependencies = [ [[package]] name = "fhevm_gateway_bindings" version = "0.1.0-rc14" -source = "git+https://github.com/zama-ai/fhevm.git?tag=v0.10.0#df9e61f99f957b76130651f9deebd45e186ba356" +source = "git+https://github.com/zama-ai/fhevm.git?tag=v0.10.3#edb4c81c4d45f557faab569055957dd21991223d" dependencies = [ "alloy", "serde", diff --git a/kms-connector/Cargo.toml b/kms-connector/Cargo.toml index 973347636c..42aa1940f6 100644 --- a/kms-connector/Cargo.toml +++ b/kms-connector/Cargo.toml @@ -18,7 +18,7 @@ gw-listener.path = "crates/gw-listener" kms-worker.path = "crates/kms-worker" tx-sender.path = "crates/tx-sender" connector-utils.path = "crates/utils" -fhevm_gateway_bindings = { git = "https://github.com/zama-ai/fhevm.git", tag = "v0.10.0", default-features = false } +fhevm_gateway_bindings = { git = "https://github.com/zama-ai/fhevm.git", tag = "v0.10.3", default-features = false } kms-grpc = { git = "https://github.com/zama-ai/kms.git", tag = "v0.12.4", default-features = true } bc2wrap = { git = "https://github.com/zama-ai/kms.git", tag = "v0.12.4", default-features = true } tfhe = "=1.4.0-alpha.3" diff --git a/kms-connector/crates/gw-listener/src/core/gw_listener.rs b/kms-connector/crates/gw-listener/src/core/gw_listener.rs index 25704379a1..3f019cb521 100644 --- a/kms-connector/crates/gw-listener/src/core/gw_listener.rs +++ b/kms-connector/crates/gw-listener/src/core/gw_listener.rs @@ -9,7 +9,9 @@ use crate::{ use alloy::{ contract::{Event, EventPoller}, network::Ethereum, + primitives::LogData, providers::Provider, + rpc::types::{Filter, Log}, sol_types::SolEvent, }; use anyhow::anyhow; @@ -28,7 +30,7 @@ use std::time::Duration; use tokio::{select, task::JoinSet, time::timeout}; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{error, info, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; /// Struct monitoring and storing Gateway's events. @@ -151,16 +153,13 @@ where async fn subscribe_inner( &self, event_type: EventType, - mut event_filter: Event<&'_ P, E>, + event_filter: Event<&'_ P, E>, poll_interval: Duration, ) -> anyhow::Result<()> where E: Into + SolEvent + Send + Sync + 'static, { let mut last_block_polled = self.get_last_block_polled(event_type).await?; - if let Some(from_block) = last_block_polled { - event_filter = event_filter.from_block(from_block); - } let mut event_poller = event_filter .watch() .await @@ -168,6 +167,10 @@ where event_poller.poller = event_poller.poller.with_poll_interval(poll_interval); info!("✓ Subscribed to {event_type} events"); + self.catchup_past_events::(&mut last_block_polled, event_type) + .await + .map_err(|e| anyhow!("Failed to catch up past {event_type} events: {e}"))?; + select! { _ = self.process_events(event_type, event_poller, &mut last_block_polled) => (), _ = self.cancel_token.cancelled() => info!("{event_type} subscription cancelled..."), @@ -183,12 +186,72 @@ where Ok(()) } + /// Catches events created before the event filter using `eth_getFilterLogs`. + async fn catchup_past_events( + &self, + last_block_polled: &mut Option, + event_type: EventType, + ) -> anyhow::Result<()> + where + E: Into + SolEvent + Send + Sync + 'static, + { + let catchup_from_block = match last_block_polled { + None => { + info!( + "No previously polled block for {event_type}; skipping catchup of past events." + ); + return Ok(()); + } + Some(block) => *block, + }; + + let contract_address = match event_type { + EventType::PublicDecryptionRequest | EventType::UserDecryptionRequest => { + self.decryption_contract.address() + } + _ => self.kms_generation_contract.address(), + }; + + let filter = Filter::new() + .address(*contract_address) + .event_signature(E::SIGNATURE_HASH) + .from_block(catchup_from_block); + let provider = self.decryption_contract.provider(); + + info!("Catching up {event_type} from {catchup_from_block}..."); + let mut event_count = 0; + let event_filter_id = provider.new_filter(&filter).await?; + let past_events = provider + .get_filter_logs(event_filter_id) + .await? + .into_iter() + .map(|log| { + decode_log::(&log).map(|event| { + event_count += 1; + (event, log) + }) + }); + + for event in past_events { + self.spawn_event_handling(event_type, event, last_block_polled) + .await; + } + + info!( + "Successfully caught {event_count} {event_type} events from block {catchup_from_block}!" + ); + if let Err(e) = provider.uninstall_filter(event_filter_id).await { + warn!("Failed to uninstall {event_type} event catchup filter: {e}"); + } + Ok(()) + } + /// Event processing loop. async fn process_events( &self, event_type: EventType, event_poller: EventPoller, - last_block: &mut Option, + last_block_polled: &mut Option, ) where E: Into + SolEvent + Send + Sync + 'static, { @@ -196,28 +259,42 @@ where loop { info!("Waiting for next {event_type}..."); match events.next().await { - Some(Ok((event, log))) => { - *last_block = log.block_number; - EVENT_RECEIVED_COUNTER - .with_label_values(&[event_type.as_str()]) - .inc(); - - let db = self.db_pool.clone(); - spawn_with_limit(handle_gateway_event(db, event.into(), log.block_number)) - .await; - } - Some(Err(err)) => { - error!("Error while listening for {event_type} events: {err}"); - EVENT_RECEIVED_ERRORS - .with_label_values(&[event_type.as_str()]) - .inc(); - continue; + Some(event) => { + self.spawn_event_handling(event_type, event, last_block_polled) + .await } None => break error!("Alloy Provider was dropped for {event_type}"), } } } + async fn spawn_event_handling( + &self, + event_type: EventType, + event: alloy::sol_types::Result<(E, Log)>, + last_block: &mut Option, + ) where + E: Into + SolEvent + Send + Sync + 'static, + { + match event { + Ok((event, log)) => { + *last_block = log.block_number; + EVENT_RECEIVED_COUNTER + .with_label_values(&[event_type.as_str()]) + .inc(); + + let db = self.db_pool.clone(); + spawn_with_limit(handle_gateway_event(db, event.into(), log.block_number)).await; + } + Err(err) => { + error!("Error while listening for {event_type} events: {err}"); + EVENT_RECEIVED_ERRORS + .with_label_values(&[event_type.as_str()]) + .inc(); + } + } + } + /// Get the last block polled from config or DB. async fn get_last_block_polled(&self, event_type: EventType) -> anyhow::Result> { let last_block_polled = match self.config.from_block_number { @@ -282,6 +359,11 @@ async fn handle_gateway_event( } } +fn decode_log(log: &Log) -> alloy::sol_types::Result { + let log_data: &LogData = log.as_ref(); + E::decode_raw_log(log_data.topics().iter().copied(), &log_data.data) +} + impl GatewayListener { /// Creates a new `GatewayListener` instance from a valid `Config`. pub async fn from_config(