Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kms-connector/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion kms-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ gw-listener.path = "crates/gw-listener"
kms-worker.path = "crates/kms-worker"
tx-sender.path = "crates/tx-sender"
connector-utils.path = "crates/utils"
fhevm_gateway_bindings = { git = "https://github.com/zama-ai/fhevm.git", tag = "v0.10.0", default-features = false }
fhevm_gateway_bindings = { git = "https://github.com/zama-ai/fhevm.git", tag = "v0.10.3", default-features = false }
kms-grpc = { git = "https://github.com/zama-ai/kms.git", tag = "v0.12.4", default-features = true }
bc2wrap = { git = "https://github.com/zama-ai/kms.git", tag = "v0.12.4", default-features = true }
tfhe = "=1.4.0-alpha.3"
Expand Down
126 changes: 104 additions & 22 deletions kms-connector/crates/gw-listener/src/core/gw_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use crate::{
use alloy::{
contract::{Event, EventPoller},
network::Ethereum,
primitives::LogData,
providers::Provider,
rpc::types::{Filter, Log},
sol_types::SolEvent,
};
use anyhow::anyhow;
Expand All @@ -28,7 +30,7 @@ use std::time::Duration;
use tokio::{select, task::JoinSet, time::timeout};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use tracing::{error, info, warn};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Struct monitoring and storing Gateway's events.
Expand Down Expand Up @@ -151,23 +153,24 @@ where
async fn subscribe_inner<E>(
&self,
event_type: EventType,
mut event_filter: Event<&'_ P, E>,
event_filter: Event<&'_ P, E>,
poll_interval: Duration,
) -> anyhow::Result<()>
where
E: Into<GatewayEventKind> + SolEvent + Send + Sync + 'static,
{
let mut last_block_polled = self.get_last_block_polled(event_type).await?;
if let Some(from_block) = last_block_polled {
event_filter = event_filter.from_block(from_block);
}
let mut event_poller = event_filter
.watch()
.await
.map_err(|e| anyhow!("Failed to subscribe to {event_type} events: {e}"))?;
event_poller.poller = event_poller.poller.with_poll_interval(poll_interval);
info!("✓ Subscribed to {event_type} events");

self.catchup_past_events::<E>(&mut last_block_polled, event_type)
.await
.map_err(|e| anyhow!("Failed to catch up past {event_type} events: {e}"))?;

select! {
_ = self.process_events(event_type, event_poller, &mut last_block_polled) => (),
_ = self.cancel_token.cancelled() => info!("{event_type} subscription cancelled..."),
Expand All @@ -183,41 +186,115 @@ where
Ok(())
}

/// Catches events created before the event filter using `eth_getFilterLogs`.
async fn catchup_past_events<E>(
&self,
last_block_polled: &mut Option<u64>,
event_type: EventType,
) -> anyhow::Result<()>
where
E: Into<GatewayEventKind> + SolEvent + Send + Sync + 'static,
{
let catchup_from_block = match last_block_polled {
None => {
info!(
"No previously polled block for {event_type}; skipping catchup of past events."
);
return Ok(());
}
Some(block) => *block,
};

let contract_address = match event_type {
EventType::PublicDecryptionRequest | EventType::UserDecryptionRequest => {
self.decryption_contract.address()
}
_ => self.kms_generation_contract.address(),
};

let filter = Filter::new()
.address(*contract_address)
.event_signature(E::SIGNATURE_HASH)
.from_block(catchup_from_block);
let provider = self.decryption_contract.provider();

info!("Catching up {event_type} from {catchup_from_block}...");
let mut event_count = 0;
let event_filter_id = provider.new_filter(&filter).await?;
let past_events = provider
.get_filter_logs(event_filter_id)
.await?
.into_iter()
.map(|log| {
decode_log::<E>(&log).map(|event| {
event_count += 1;
(event, log)
})
});

for event in past_events {
self.spawn_event_handling(event_type, event, last_block_polled)
.await;
}

info!(
"Successfully caught {event_count} {event_type} events from block {catchup_from_block}!"
);
if let Err(e) = provider.uninstall_filter(event_filter_id).await {
warn!("Failed to uninstall {event_type} event catchup filter: {e}");
}
Ok(())
}

/// Event processing loop.
async fn process_events<E>(
&self,
event_type: EventType,
event_poller: EventPoller<E>,
last_block: &mut Option<u64>,
last_block_polled: &mut Option<u64>,
) where
E: Into<GatewayEventKind> + SolEvent + Send + Sync + 'static,
{
let mut events = event_poller.into_stream();
loop {
info!("Waiting for next {event_type}...");
match events.next().await {
Some(Ok((event, log))) => {
*last_block = log.block_number;
EVENT_RECEIVED_COUNTER
.with_label_values(&[event_type.as_str()])
.inc();

let db = self.db_pool.clone();
spawn_with_limit(handle_gateway_event(db, event.into(), log.block_number))
.await;
}
Some(Err(err)) => {
error!("Error while listening for {event_type} events: {err}");
EVENT_RECEIVED_ERRORS
.with_label_values(&[event_type.as_str()])
.inc();
continue;
Some(event) => {
self.spawn_event_handling(event_type, event, last_block_polled)
.await
}
None => break error!("Alloy Provider was dropped for {event_type}"),
}
}
}

async fn spawn_event_handling<E>(
&self,
event_type: EventType,
event: alloy::sol_types::Result<(E, Log)>,
last_block: &mut Option<u64>,
) where
E: Into<GatewayEventKind> + SolEvent + Send + Sync + 'static,
{
match event {
Ok((event, log)) => {
*last_block = log.block_number;
EVENT_RECEIVED_COUNTER
.with_label_values(&[event_type.as_str()])
.inc();

let db = self.db_pool.clone();
spawn_with_limit(handle_gateway_event(db, event.into(), log.block_number)).await;
}
Err(err) => {
error!("Error while listening for {event_type} events: {err}");
EVENT_RECEIVED_ERRORS
.with_label_values(&[event_type.as_str()])
.inc();
}
}
}

/// Get the last block polled from config or DB.
async fn get_last_block_polled(&self, event_type: EventType) -> anyhow::Result<Option<u64>> {
let last_block_polled = match self.config.from_block_number {
Expand Down Expand Up @@ -282,6 +359,11 @@ async fn handle_gateway_event(
}
}

fn decode_log<E: SolEvent>(log: &Log) -> alloy::sol_types::Result<E> {
let log_data: &LogData = log.as_ref();
E::decode_raw_log(log_data.topics().iter().copied(), &log_data.data)
}

impl GatewayListener<GatewayProvider> {
/// Creates a new `GatewayListener` instance from a valid `Config`.
pub async fn from_config(
Expand Down