Skip to content

Commit 64aa9d7

Browse files
fix(kms-connector): gw-listener catchup fix (#1599)
fix(kms-connector): gw-listener catchup fix (#1590)
1 parent 63fcb82 commit 64aa9d7

File tree

3 files changed

+106
-24
lines changed

3 files changed

+106
-24
lines changed

kms-connector/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

kms-connector/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ gw-listener.path = "crates/gw-listener"
1818
kms-worker.path = "crates/kms-worker"
1919
tx-sender.path = "crates/tx-sender"
2020
connector-utils.path = "crates/utils"
21-
fhevm_gateway_bindings = { git = "https://github.com/zama-ai/fhevm.git", tag = "v0.10.0", default-features = false }
21+
fhevm_gateway_bindings = { git = "https://github.com/zama-ai/fhevm.git", tag = "v0.10.3", default-features = false }
2222
kms-grpc = { git = "https://github.com/zama-ai/kms.git", tag = "v0.12.4", default-features = true }
2323
bc2wrap = { git = "https://github.com/zama-ai/kms.git", tag = "v0.12.4", default-features = true }
2424
tfhe = "=1.4.0-alpha.3"

kms-connector/crates/gw-listener/src/core/gw_listener.rs

Lines changed: 104 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use crate::{
99
use alloy::{
1010
contract::{Event, EventPoller},
1111
network::Ethereum,
12+
primitives::LogData,
1213
providers::Provider,
14+
rpc::types::{Filter, Log},
1315
sol_types::SolEvent,
1416
};
1517
use anyhow::anyhow;
@@ -28,7 +30,7 @@ use std::time::Duration;
2830
use tokio::{select, task::JoinSet, time::timeout};
2931
use tokio_stream::StreamExt;
3032
use tokio_util::sync::CancellationToken;
31-
use tracing::{error, info};
33+
use tracing::{error, info, warn};
3234
use tracing_opentelemetry::OpenTelemetrySpanExt;
3335

3436
/// Struct monitoring and storing Gateway's events.
@@ -151,23 +153,24 @@ where
151153
async fn subscribe_inner<E>(
152154
&self,
153155
event_type: EventType,
154-
mut event_filter: Event<&'_ P, E>,
156+
event_filter: Event<&'_ P, E>,
155157
poll_interval: Duration,
156158
) -> anyhow::Result<()>
157159
where
158160
E: Into<GatewayEventKind> + SolEvent + Send + Sync + 'static,
159161
{
160162
let mut last_block_polled = self.get_last_block_polled(event_type).await?;
161-
if let Some(from_block) = last_block_polled {
162-
event_filter = event_filter.from_block(from_block);
163-
}
164163
let mut event_poller = event_filter
165164
.watch()
166165
.await
167166
.map_err(|e| anyhow!("Failed to subscribe to {event_type} events: {e}"))?;
168167
event_poller.poller = event_poller.poller.with_poll_interval(poll_interval);
169168
info!("✓ Subscribed to {event_type} events");
170169

170+
self.catchup_past_events::<E>(&mut last_block_polled, event_type)
171+
.await
172+
.map_err(|e| anyhow!("Failed to catch up past {event_type} events: {e}"))?;
173+
171174
select! {
172175
_ = self.process_events(event_type, event_poller, &mut last_block_polled) => (),
173176
_ = self.cancel_token.cancelled() => info!("{event_type} subscription cancelled..."),
@@ -183,41 +186,115 @@ where
183186
Ok(())
184187
}
185188

189+
/// Catches events created before the event filter using `eth_getFilterLogs`.
190+
async fn catchup_past_events<E>(
191+
&self,
192+
last_block_polled: &mut Option<u64>,
193+
event_type: EventType,
194+
) -> anyhow::Result<()>
195+
where
196+
E: Into<GatewayEventKind> + SolEvent + Send + Sync + 'static,
197+
{
198+
let catchup_from_block = match last_block_polled {
199+
None => {
200+
info!(
201+
"No previously polled block for {event_type}; skipping catchup of past events."
202+
);
203+
return Ok(());
204+
}
205+
Some(block) => *block,
206+
};
207+
208+
let contract_address = match event_type {
209+
EventType::PublicDecryptionRequest | EventType::UserDecryptionRequest => {
210+
self.decryption_contract.address()
211+
}
212+
_ => self.kms_generation_contract.address(),
213+
};
214+
215+
let filter = Filter::new()
216+
.address(*contract_address)
217+
.event_signature(E::SIGNATURE_HASH)
218+
.from_block(catchup_from_block);
219+
let provider = self.decryption_contract.provider();
220+
221+
info!("Catching up {event_type} from {catchup_from_block}...");
222+
let mut event_count = 0;
223+
let event_filter_id = provider.new_filter(&filter).await?;
224+
let past_events = provider
225+
.get_filter_logs(event_filter_id)
226+
.await?
227+
.into_iter()
228+
.map(|log| {
229+
decode_log::<E>(&log).map(|event| {
230+
event_count += 1;
231+
(event, log)
232+
})
233+
});
234+
235+
for event in past_events {
236+
self.spawn_event_handling(event_type, event, last_block_polled)
237+
.await;
238+
}
239+
240+
info!(
241+
"Successfully caught {event_count} {event_type} events from block {catchup_from_block}!"
242+
);
243+
if let Err(e) = provider.uninstall_filter(event_filter_id).await {
244+
warn!("Failed to uninstall {event_type} event catchup filter: {e}");
245+
}
246+
Ok(())
247+
}
248+
186249
/// Event processing loop.
187250
async fn process_events<E>(
188251
&self,
189252
event_type: EventType,
190253
event_poller: EventPoller<E>,
191-
last_block: &mut Option<u64>,
254+
last_block_polled: &mut Option<u64>,
192255
) where
193256
E: Into<GatewayEventKind> + SolEvent + Send + Sync + 'static,
194257
{
195258
let mut events = event_poller.into_stream();
196259
loop {
197260
info!("Waiting for next {event_type}...");
198261
match events.next().await {
199-
Some(Ok((event, log))) => {
200-
*last_block = log.block_number;
201-
EVENT_RECEIVED_COUNTER
202-
.with_label_values(&[event_type.as_str()])
203-
.inc();
204-
205-
let db = self.db_pool.clone();
206-
spawn_with_limit(handle_gateway_event(db, event.into(), log.block_number))
207-
.await;
208-
}
209-
Some(Err(err)) => {
210-
error!("Error while listening for {event_type} events: {err}");
211-
EVENT_RECEIVED_ERRORS
212-
.with_label_values(&[event_type.as_str()])
213-
.inc();
214-
continue;
262+
Some(event) => {
263+
self.spawn_event_handling(event_type, event, last_block_polled)
264+
.await
215265
}
216266
None => break error!("Alloy Provider was dropped for {event_type}"),
217267
}
218268
}
219269
}
220270

271+
async fn spawn_event_handling<E>(
272+
&self,
273+
event_type: EventType,
274+
event: alloy::sol_types::Result<(E, Log)>,
275+
last_block: &mut Option<u64>,
276+
) where
277+
E: Into<GatewayEventKind> + SolEvent + Send + Sync + 'static,
278+
{
279+
match event {
280+
Ok((event, log)) => {
281+
*last_block = log.block_number;
282+
EVENT_RECEIVED_COUNTER
283+
.with_label_values(&[event_type.as_str()])
284+
.inc();
285+
286+
let db = self.db_pool.clone();
287+
spawn_with_limit(handle_gateway_event(db, event.into(), log.block_number)).await;
288+
}
289+
Err(err) => {
290+
error!("Error while listening for {event_type} events: {err}");
291+
EVENT_RECEIVED_ERRORS
292+
.with_label_values(&[event_type.as_str()])
293+
.inc();
294+
}
295+
}
296+
}
297+
221298
/// Get the last block polled from config or DB.
222299
async fn get_last_block_polled(&self, event_type: EventType) -> anyhow::Result<Option<u64>> {
223300
let last_block_polled = match self.config.from_block_number {
@@ -282,6 +359,11 @@ async fn handle_gateway_event(
282359
}
283360
}
284361

362+
fn decode_log<E: SolEvent>(log: &Log) -> alloy::sol_types::Result<E> {
363+
let log_data: &LogData = log.as_ref();
364+
E::decode_raw_log(log_data.topics().iter().copied(), &log_data.data)
365+
}
366+
285367
impl GatewayListener<GatewayProvider> {
286368
/// Creates a new `GatewayListener` instance from a valid `Config`.
287369
pub async fn from_config(

0 commit comments

Comments
 (0)