Skip to content

Commit 123683b

Browse files
authored
fix(blocklock,randomness): replace wonky block poller with custom stream (#261)
1 parent 10bd9f0 commit 123683b

3 files changed

Lines changed: 75 additions & 17 deletions

File tree

bin/blocklock-agent/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ where
9898
decryption_sender_contract.provider().to_owned(),
9999
block_poll_interval,
100100
)
101-
.await?;
101+
.await;
102102
// Transform each stream into a stream of events
103103
let decryption_requested_stream =
104104
decryption_requested_stream.map(|(req, _)| ChainEvent::DecryptionRequested(req));

bin/randomness-agent/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ where
9999
signature_sender_contract.provider().to_owned(),
100100
block_poll_interval,
101101
)
102-
.await?;
102+
.await;
103103

104104
// Transform each stream into a stream of events
105105
let signature_requested_stream =

crates/dcipher-agents/src/utils.rs

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,80 @@
1-
use alloy::primitives::BlockNumber;
1+
use alloy::primitives::U64;
22
use alloy::providers::Provider;
3-
use alloy::transports::TransportResult;
4-
use futures_util::Stream;
5-
use futures_util::StreamExt;
6-
7-
pub async fn block_poller<P>(
8-
provider: P,
9-
interval: std::time::Duration,
10-
) -> TransportResult<impl Stream<Item = BlockNumber> + Unpin>
3+
use alloy::rpc::client::{NoParams, RpcCall};
4+
use futures_util::{FutureExt, Stream};
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
pub async fn block_poller<P>(provider: P, interval: std::time::Duration) -> BlockPoller
119
where
1210
P: Provider,
1311
{
14-
let mut block_watcher = provider.watch_full_blocks().await?;
15-
block_watcher.set_poll_interval(interval);
12+
BlockPoller::new(provider, interval)
13+
}
14+
15+
pub struct BlockPoller {
16+
/// The ticker
17+
ticker: tokio::time::Interval,
18+
19+
/// The cloneable rpc call used to fetch the latest block
20+
block_number_ref_call: RpcCall<NoParams, U64>,
21+
22+
/// A pending block number call
23+
block_number_pending_call: Option<RpcCall<NoParams, U64>>,
24+
}
25+
26+
impl BlockPoller {
27+
fn new(provider: impl Provider, poll_interval: std::time::Duration) -> Self {
28+
let block_number_call: RpcCall<NoParams, U64> =
29+
provider.client().request_noparams("eth_blockNumber");
30+
31+
let mut ticker = tokio::time::interval(poll_interval);
32+
// Reset ticker to ensure that it waits on the first poll since we set the fut block_number_fut
33+
// immediately
34+
ticker.reset();
35+
36+
Self {
37+
ticker,
38+
block_number_pending_call: Some(block_number_call.clone()),
39+
block_number_ref_call: block_number_call,
40+
}
41+
}
42+
}
43+
44+
impl Stream for BlockPoller {
45+
type Item = u64;
46+
47+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48+
if let Some(block_number_fut) = &mut self.block_number_pending_call {
49+
// Pending block number call, poll that
50+
let Poll::Ready(maybe_block_number) = block_number_fut.poll_unpin(cx) else {
51+
return Poll::Pending;
52+
};
53+
54+
// Future completed
55+
self.block_number_pending_call = None;
56+
57+
// Ready to poll again, we might need to get the next block immediately
58+
cx.waker().wake_by_ref();
1659

17-
let block_stream = block_watcher
18-
.into_stream()
19-
.filter_map(|res| std::future::ready(res.ok().map(|b| b.header.number)));
60+
match maybe_block_number {
61+
Ok(block_number) => {
62+
Poll::Ready(Some(block_number.try_into().expect("U64 to fit in u64")))
63+
}
64+
Err(e) => {
65+
tracing::error!(error = ?e, "Poller failed to get latest block number");
66+
Poll::Pending
67+
}
68+
}
69+
} else {
70+
// No pending call, poll ticker
71+
if self.ticker.poll_tick(cx).is_ready() {
72+
self.block_number_pending_call = Some(self.block_number_ref_call.clone());
2073

21-
Ok(block_stream)
74+
// Ready to poll the rpc call
75+
cx.waker().wake_by_ref();
76+
}
77+
Poll::Pending
78+
}
79+
}
2280
}

0 commit comments

Comments
 (0)