Skip to content

Commit fa0170f

Browse files
authored
fix(coprocessor): fhevm-listener, fix disconnect/reconnect bug (#217)
* fix(coprocessor): fhevm-listener, last_valid_block update to work on reconnect * fix(coprocessor): fhevm-listner, disable alloy reconnection retry alloy skips some events while reconnecting
1 parent e2def6d commit fa0170f

File tree

5 files changed

+12
-32
lines changed

5 files changed

+12
-32
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/fhevm-listener/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ rustls = { workspace = true }
2626
serde = { workspace = true }
2727
sqlx = { workspace = true }
2828
tokio = { workspace = true }
29-
humantime = { workspace = true }
30-
3129

3230
# local dependencies
3331
fhevm-engine-common = { path = "../fhevm-engine-common" }

coprocessor/fhevm-engine/fhevm-listener/src/cmd/mod.rs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use alloy::rpc::types::{BlockNumberOrTag, Filter, Log};
1616
use alloy_sol_types::SolEventInterface;
1717

1818
use clap::Parser;
19-
use humantime::parse_duration;
2019

2120
use rustls;
2221

@@ -76,12 +75,6 @@ pub struct Args {
7675
help = "Initial block time, refined on each block"
7776
)]
7877
pub initial_block_time: u64,
79-
80-
#[arg(long, default_value = "1000000")]
81-
pub provider_max_retries: u32,
82-
83-
#[arg(long, default_value = "4s", value_parser = parse_duration)]
84-
pub provider_retry_interval: Duration,
8578
}
8679

8780
type RProvider = FillProvider<
@@ -98,9 +91,6 @@ type RProvider = FillProvider<
9891
// TODO: to merge with Levent works
9992
struct InfiniteLogIter {
10093
url: String,
101-
max_retries: u32,
102-
retry_interval: Duration,
103-
10494
block_time: u64, /* A default value that is refined with real-time
10595
* events data */
10696
no_block_immediate_recheck: bool,
@@ -149,8 +139,6 @@ impl InfiniteLogIter {
149139
current_event: None,
150140
last_block_event_count: 0,
151141
last_block_recheck_planned: 0,
152-
max_retries: args.provider_max_retries,
153-
retry_interval: args.provider_retry_interval,
154142
}
155143
}
156144

@@ -249,8 +237,7 @@ impl InfiniteLogIter {
249237
let mut retry = 20;
250238
loop {
251239
let ws = WsConnect::new(&self.url)
252-
.with_max_retries(self.max_retries)
253-
.with_retry_interval(self.retry_interval);
240+
.with_max_retries(0); // disabled, alloy skips events
254241

255242
match ProviderBuilder::new().connect_ws(ws).await {
256243
Ok(provider) => {
@@ -391,12 +378,6 @@ impl InfiniteLogIter {
391378
let Some(current_event) = &self.current_event else {
392379
return None;
393380
};
394-
if let Some(block_number) = current_event.block_number {
395-
// we subtract one because the current block is on going
396-
self.last_valid_block = Some(
397-
block_number.max(self.last_valid_block.unwrap_or_default()) - 1,
398-
);
399-
}
400381
self.last_block_event_count += 1;
401382
return self.current_event.clone();
402383
}
@@ -478,11 +459,14 @@ pub async fn main(args: Args) {
478459
if let Some(block_number) = log.block_number {
479460
if block_error_event_fthe == 0 {
480461
if let Some(ref mut db) = db {
481-
db.mark_prev_block_as_valid(
462+
let last_valid_block = db.mark_prev_block_as_valid(
482463
&log_iter.current_event,
483464
&log_iter.prev_event,
484465
)
485466
.await;
467+
if last_valid_block.is_some() {
468+
log_iter.last_valid_block = last_valid_block;
469+
}
486470
}
487471
} else {
488472
eprintln!(

coprocessor/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -285,26 +285,26 @@ impl Database {
285285
&mut self,
286286
opt_event: &Option<alloy_rpc_types::Log>,
287287
opt_prev_event: &Option<alloy_rpc_types::Log>,
288-
) {
288+
) -> Option<u64> {
289289
let Some(prev_event) = opt_prev_event else {
290-
return;
290+
return None;
291291
};
292292
let Some(event) = opt_event else {
293-
return;
293+
return None;
294294
};
295295
if prev_event.block_number == event.block_number {
296-
return;
296+
return None;
297297
}
298298
let prev_event = if prev_event.block_number < event.block_number {
299299
event
300300
} else {
301301
prev_event
302302
};
303303
let Some(block_number) = prev_event.block_number else {
304-
return;
304+
return None;
305305
};
306306
let Some(block_hash) = prev_event.block_hash else {
307-
return;
307+
return Some(block_number); // but cannot write to db
308308
};
309309
let _ = sqlx::query!(
310310
r#"
@@ -318,6 +318,7 @@ impl Database {
318318
)
319319
.execute(&self.pool)
320320
.await;
321+
return Some(block_number);
321322
}
322323

323324
pub async fn read_last_valid_block(&mut self) -> Option<i64> {

coprocessor/fhevm-engine/fhevm-listener/tests/integration_test.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,6 @@ async fn test_listener_restart() -> Result<(), anyhow::Error> {
168168
start_at_block: None,
169169
end_at_block: None,
170170
catchup_margin: 5,
171-
provider_max_retries: 5,
172-
provider_retry_interval: Duration::from_secs(1),
173171
};
174172

175173
// Start listener in background task

0 commit comments

Comments
 (0)