Skip to content

Commit a6d7b68

Browse files
authored
feat(voyager): parallelize evm indexing (#3799)
2 parents b07be1e + f50724f commit a6d7b68

File tree

2 files changed

+89
-64
lines changed

2 files changed

+89
-64
lines changed

voyager/plugins/event-source/ethereum/src/call.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,23 @@ use unionlabs::primitives::H256;
77
#[model]
88
#[derive(Enumorph, SubsetOf)]
99
pub enum ModuleCall {
10+
FetchBlocks(FetchBlocks),
1011
FetchGetLogs(FetchGetLogs),
1112
MakeFullEvent(MakeFullEvent),
1213
}
1314

15+
/// Fetch a block at the specified height, requeuing a seq(wait(H+1), fetch(H+1)).
16+
#[model]
17+
pub struct FetchBlocks {
18+
pub block_number: u64,
19+
}
20+
1421
/// Fetch all events in `block_number` emitted by the `IBCHandler` via [`eth_getLogs`].
1522
///
1623
/// [`eth_getLogs`]: https://ethereum.org/en/developers/docs/apis/json-rpc/#[model]th_getlogs
1724
#[model]
1825
pub struct FetchGetLogs {
1926
pub block_number: u64,
20-
/// If set, only fetch blocks up to this range; otherwise indefinitely unfold.
21-
#[serde(default, skip_serializing_if = "Option::is_none")]
22-
pub up_to: Option<u64>,
2327
}
2428

2529
/// Construct a full ChainEvent from the given EVM event and associated metadata.

voyager/plugins/event-source/ethereum/src/main.rs

Lines changed: 82 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![warn(clippy::unwrap_used)]
22

3-
use std::collections::VecDeque;
3+
use std::{cmp::Ordering, collections::VecDeque};
44

55
use alloy::{
66
providers::{Provider, ProviderBuilder, RootProvider},
@@ -32,18 +32,17 @@ use unionlabs::{
3232
ErrorReporter,
3333
};
3434
use voyager_message::{
35-
call::Call,
35+
call::{Call, WaitForHeight},
3636
core::{ChainId, ClientInfo, IbcSpec},
3737
data::{ChainEvent, Data},
3838
into_value,
3939
module::{PluginInfo, PluginServer},
4040
DefaultCmd, ExtensionsExt, Plugin, PluginMessage, VoyagerClient, VoyagerMessage,
41-
FATAL_JSONRPC_ERROR_CODE,
4241
};
43-
use voyager_vm::{call, conc, data, defer, noop, now, pass::PassResult, seq, BoxDynError, Op};
42+
use voyager_vm::{call, conc, data, noop, pass::PassResult, seq, BoxDynError, Op};
4443

4544
use crate::{
46-
call::{FetchGetLogs, IbcEvents, MakeFullEvent, ModuleCall},
45+
call::{FetchBlocks, FetchGetLogs, IbcEvents, MakeFullEvent, ModuleCall},
4746
callback::ModuleCallback,
4847
};
4948

@@ -62,6 +61,8 @@ pub struct Module {
6261

6362
pub ibc_handler_address: H160,
6463

64+
pub chunk_block_fetch_size: u64,
65+
6566
pub provider: RootProvider<BoxTransport>,
6667
}
6768

@@ -74,10 +75,17 @@ pub struct Config {
7475
/// The address of the `IBCHandler` smart contract.
7576
pub ibc_handler_address: H160,
7677

78+
#[serde(default = "default_chunk_block_fetch_size")]
79+
pub chunk_block_fetch_size: u64,
80+
7781
/// The RPC endpoint for the execution chain.
7882
pub rpc_url: String,
7983
}
8084

85+
fn default_chunk_block_fetch_size() -> u64 {
86+
10
87+
}
88+
8189
impl Plugin for Module {
8290
type Call = ModuleCall;
8391
type Callback = ModuleCallback;
@@ -124,6 +132,7 @@ impl Module {
124132
Ok(Self {
125133
chain_id: ChainId::new(chain_id.to_string()),
126134
ibc_handler_address: config.ibc_handler_address,
135+
chunk_block_fetch_size: config.chunk_block_fetch_size,
127136
provider,
128137
})
129138
}
@@ -225,9 +234,8 @@ impl PluginServer<ModuleCall, ModuleCallback> for Module {
225234
Op::Call(Call::FetchBlocks(fetch)) if fetch.chain_id == self.chain_id => {
226235
call(PluginMessage::new(
227236
self.plugin_name(),
228-
ModuleCall::from(FetchGetLogs {
237+
ModuleCall::from(FetchBlocks {
229238
block_number: fetch.start_height.height(),
230-
up_to: None,
231239
}),
232240
))
233241
}
@@ -252,6 +260,13 @@ impl PluginServer<ModuleCall, ModuleCallback> for Module {
252260
#[instrument(skip_all, fields(chain_id = %self.chain_id))]
253261
async fn call(&self, e: &Extensions, msg: ModuleCall) -> RpcResult<Op<VoyagerMessage>> {
254262
match msg {
263+
ModuleCall::FetchBlocks(FetchBlocks { block_number }) => {
264+
self.fetch_blocks(e.try_get::<VoyagerClient>()?, block_number)
265+
.await
266+
}
267+
ModuleCall::FetchGetLogs(FetchGetLogs { block_number }) => {
268+
self.fetch_get_logs(block_number).await
269+
}
255270
ModuleCall::MakeFullEvent(MakeFullEvent {
256271
block_number,
257272
tx_hash,
@@ -260,52 +275,81 @@ impl PluginServer<ModuleCall, ModuleCallback> for Module {
260275
self.make_full_event(e.try_get::<VoyagerClient>()?, block_number, tx_hash, event)
261276
.await
262277
}
263-
ModuleCall::FetchGetLogs(FetchGetLogs {
264-
block_number,
265-
up_to,
266-
}) => {
267-
self.fetch_get_logs(e.try_get::<VoyagerClient>()?, block_number, up_to)
268-
.await
269-
}
270278
}
271279
}
272280
}
273281

274282
impl Module {
275-
#[instrument(skip_all, fields(%block_number, ?up_to))]
276-
async fn fetch_get_logs(
283+
#[instrument(skip_all, fields(%block_number))]
284+
async fn fetch_blocks(
277285
&self,
278286
voyager_client: &VoyagerClient,
279287
block_number: u64,
280-
up_to: Option<u64>,
281288
) -> RpcResult<Op<VoyagerMessage>> {
282-
if up_to.is_some_and(|up_to| up_to < block_number) {
283-
return Err(ErrorObject::owned(
284-
FATAL_JSONRPC_ERROR_CODE,
285-
"`up_to` must be either >= `block_number` or null",
286-
None::<()>,
287-
));
288-
}
289-
290289
let latest_height = voyager_client
291290
.query_latest_height(self.chain_id.clone(), true)
292-
.await?;
291+
.await?
292+
.height();
293293

294-
if latest_height.height() < block_number {
295-
debug!(block_number, "block is not yet finalized");
294+
info!(%latest_height, %block_number, "fetching blocks");
296295

297-
return Ok(seq([
298-
defer(now() + 1),
299-
call(Call::Plugin(PluginMessage::new(
296+
let continuation = |next_height: u64| {
297+
seq([
298+
// TODO: Make this a config param
299+
call(WaitForHeight {
300+
chain_id: self.chain_id.clone(),
301+
height: Height::new(next_height),
302+
finalized: true,
303+
}),
304+
call(PluginMessage::new(
300305
self.plugin_name(),
301-
ModuleCall::from(FetchGetLogs {
302-
block_number,
303-
up_to,
306+
ModuleCall::from(FetchBlocks {
307+
block_number: next_height,
304308
}),
305-
))),
306-
]));
309+
)),
310+
])
311+
};
312+
313+
match block_number.cmp(&latest_height) {
314+
// height < latest_height
315+
// fetch transactions on all blocks height..next_height (*exclusive* on the upper bound!)
316+
// and then queue the continuation starting at next_height
317+
Ordering::Equal | Ordering::Less => {
318+
let next_height = (latest_height - block_number)
319+
.clamp(1, self.chunk_block_fetch_size)
320+
+ block_number;
321+
322+
info!(
323+
from_height = block_number,
324+
to_height = next_height,
325+
"batch fetching blocks in range {block_number}..{next_height}"
326+
);
327+
328+
Ok(conc(
329+
(block_number..next_height)
330+
.map(|block_number| {
331+
call(PluginMessage::new(
332+
self.plugin_name(),
333+
ModuleCall::from(FetchGetLogs { block_number }),
334+
))
335+
})
336+
.chain([continuation(next_height)]),
337+
))
338+
}
339+
// height > latest_height
340+
Ordering::Greater => {
341+
warn!(
342+
"the latest finalized height ({latest_height}) \
343+
is less than the requested height ({block_number})"
344+
);
345+
346+
Ok(continuation(block_number))
347+
}
307348
}
349+
}
308350

351+
#[instrument(skip_all, fields(%block_number))]
352+
async fn fetch_get_logs(&self, block_number: u64) -> RpcResult<Op<VoyagerMessage>> {
309353
debug!("fetching logs in execution block");
310354

311355
let logs = self
@@ -330,7 +374,7 @@ impl Module {
330374
)
331375
})?;
332376

333-
info!("found {} logs", logs.len());
377+
info!(logs_count = logs.len(), "found logs");
334378

335379
let events = logs.into_iter().flat_map(|log| {
336380
let tx_hash = log
@@ -422,30 +466,7 @@ impl Module {
422466
})
423467
});
424468

425-
let next_fetch = match up_to {
426-
Some(up_to) => {
427-
if up_to > block_number {
428-
Some(call(Call::Plugin(PluginMessage::new(
429-
self.plugin_name(),
430-
ModuleCall::from(FetchGetLogs {
431-
block_number: block_number + 1,
432-
up_to: Some(up_to),
433-
}),
434-
))))
435-
} else {
436-
None
437-
}
438-
}
439-
None => Some(call(Call::Plugin(PluginMessage::new(
440-
self.plugin_name(),
441-
ModuleCall::from(FetchGetLogs {
442-
block_number: block_number + 1,
443-
up_to: None,
444-
}),
445-
)))),
446-
};
447-
448-
Ok(conc(next_fetch.into_iter().chain(events)))
469+
Ok(conc(events))
449470
}
450471

451472
#[instrument(skip_all, fields(%block_number, %tx_hash))]

0 commit comments

Comments
 (0)