Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::{OnchainOrderCustomData, OnchainOrderParsing},
crate::database::events::log_to_event_index,
alloy::rpc::types::Log,
alloy::{eips::BlockNumberOrTag, rpc::types::Log},
anyhow::{Context, Result, anyhow},
contracts::alloy::{
CoWSwapOnchainOrders::CoWSwapOnchainOrders::{
Expand All @@ -19,14 +19,14 @@ use {
orders::{ExecutionTime, Interaction, Order},
},
ethrpc::{
AlloyProvider,
Web3,
block_stream::{BlockNumberHash, block_number_to_block_number_hash},
},
hex_literal::hex,
sqlx::{PgPool, types::BigDecimal},
std::{collections::HashMap, convert::TryInto},
tracing::instrument,
web3::types::U64,
};

// 4c84c1c8 is the identifier of the following function:
Expand Down Expand Up @@ -146,12 +146,12 @@ fn convert_to_quote_id_and_user_valid_to(
}

async fn settlement_deployment_block_number_hash(
web3: &Web3,
provider: &AlloyProvider,
chain_id: u64,
) -> Result<BlockNumberHash> {
let block_number =
GPv2Settlement::deployment_block(&chain_id).context("no deployment block configured")?;
block_number_to_block_number_hash(web3, U64::from(block_number).into())
block_number_to_block_number_hash(provider, BlockNumberOrTag::Number(block_number))
.await
.context("Deployment block not found")
}
Expand Down Expand Up @@ -261,19 +261,25 @@ async fn find_indexing_start_block(
.context("failed to read last indexed block from db")?;

if last_indexed_block > 0 {
return block_number_to_block_number_hash(web3, U64::from(last_indexed_block).into())
.await
.map(Some)
.context("failed to fetch block");
return block_number_to_block_number_hash(
&web3.alloy,
BlockNumberOrTag::Number(last_indexed_block),
)
.await
.map(Some)
.context("failed to fetch block");
}
if let Some(start_block) = fallback_start_block {
return block_number_to_block_number_hash(web3, start_block.into())
.await
.map(Some)
.context("failed to fetch fallback indexing start block");
return block_number_to_block_number_hash(
&web3.alloy,
BlockNumberOrTag::Number(start_block),
)
.await
.map(Some)
.context("failed to fetch fallback indexing start block");
}
if let Some(chain_id) = settlement_fallback_chain_id {
return settlement_deployment_block_number_hash(web3, chain_id)
return settlement_deployment_block_number_hash(&web3.alloy, chain_id)
.await
.map(Some)
.context("failed to fetch settlement deployment block");
Expand Down
5 changes: 3 additions & 2 deletions crates/autopilot/src/database/onchain_order_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
super::{Metrics as DatabaseMetrics, Postgres, events::bytes_to_order_uid},
crate::database::events::log_to_event_index,
alloy::{
eips::BlockNumberOrTag,
primitives::{Address, TxHash, U256},
rpc::types::Log,
},
Expand Down Expand Up @@ -64,7 +65,6 @@ use {
},
sqlx::PgConnection,
std::{collections::HashMap, sync::Arc},
web3::types::U64,
};

pub struct OnchainOrderParser<EventData: Send + Sync, EventRow: Send + Sync> {
Expand Down Expand Up @@ -396,7 +396,8 @@ async fn get_block_numbers_of_events(
.into_iter()
.map(|block_number| async move {
let timestamp =
timestamp_of_block_in_seconds(web3, U64::from(block_number).into()).await?;
timestamp_of_block_in_seconds(&web3.alloy, BlockNumberOrTag::Number(block_number))
.await?;
Ok((block_number, timestamp))
});
let block_number_timestamp_pair: Vec<anyhow::Result<(u64, u32)>> =
Expand Down
5 changes: 3 additions & 2 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use {
shutdown_controller::ShutdownController,
solvable_orders::SolvableOrdersCache,
},
alloy::eips::BlockNumberOrTag,
chain::Chain,
clap::Parser,
contracts::alloy::{BalancerV2Vault, GPv2Settlement, IUniswapV3Factory, InstanceExt, WETH9},
ethcontract::{BlockNumber, H160},
ethcontract::H160,
ethrpc::{
Web3,
alloy::conversions::{IntoAlloy, IntoLegacy},
Expand Down Expand Up @@ -428,7 +429,7 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {

let skip_event_sync_start = if args.skip_event_sync {
Some(
block_number_to_block_number_hash(&web3, BlockNumber::Latest)
block_number_to_block_number_hash(&web3.alloy, BlockNumberOrTag::Latest)
.await
.expect("Failed to fetch latest block"),
)
Expand Down
2 changes: 1 addition & 1 deletion crates/cow-amm/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Registry {
address: factory,
};
let event_handler = EventHandler::new(
Arc::new(self.web3.clone()),
Arc::new(self.web3.alloy.clone()),
AlloyEventRetriever(indexer),
storage,
None,
Expand Down
21 changes: 16 additions & 5 deletions crates/e2e/tests/e2e/ethflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ async fn eth_flow_tx(web3: Web3) {
let quote: OrderQuoteResponse = test_submit_quote(&services, &quote_request).await;

let valid_to = chrono::offset::Utc::now().timestamp() as u32
+ timestamp_of_current_block_in_seconds(&web3).await.unwrap()
+ timestamp_of_current_block_in_seconds(&web3.alloy)
.await
.unwrap()
+ 3600;
let ethflow_order =
ExtendedEthFlowOrder::from_quote(&quote, valid_to).include_slippage_bps(300);
Expand Down Expand Up @@ -272,7 +274,9 @@ async fn eth_flow_without_quote(web3: Web3) {
services.start_protocol(solver).await;

let valid_to = chrono::offset::Utc::now().timestamp() as u32
+ timestamp_of_current_block_in_seconds(&web3).await.unwrap()
+ timestamp_of_current_block_in_seconds(&web3.alloy)
.await
.unwrap()
+ 3600;
let ethflow_order = ExtendedEthFlowOrder(EthflowOrder {
buy_token: dai.address().into_legacy(),
Expand Down Expand Up @@ -321,7 +325,10 @@ async fn eth_flow_indexing_after_refund(web3: Web3) {
services.start_protocol(solver).await;

// Create an order that only exists to be cancelled.
let valid_to = timestamp_of_current_block_in_seconds(&web3).await.unwrap() + 60;
let valid_to = timestamp_of_current_block_in_seconds(&web3.alloy)
.await
.unwrap()
+ 60;
let dummy_order = ExtendedEthFlowOrder::from_quote(
&test_submit_quote(
&services,
Expand Down Expand Up @@ -355,7 +362,9 @@ async fn eth_flow_indexing_after_refund(web3: Web3) {
let receiver = H160([0x42; 20]);
let sell_amount = to_wei(1);
let valid_to = chrono::offset::Utc::now().timestamp() as u32
+ timestamp_of_current_block_in_seconds(&web3).await.unwrap()
+ timestamp_of_current_block_in_seconds(&web3.alloy)
.await
.unwrap()
+ 60;
let ethflow_order = ExtendedEthFlowOrder::from_quote(
&test_submit_quote(
Expand Down Expand Up @@ -871,7 +880,9 @@ async fn eth_flow_zero_buy_amount(web3: Web3) {

let place_order = async |trader: TestAccount, buy_amount: u64| {
let valid_to = chrono::offset::Utc::now().timestamp() as u32
+ timestamp_of_current_block_in_seconds(&web3).await.unwrap()
+ timestamp_of_current_block_in_seconds(&web3.alloy)
.await
.unwrap()
+ 3600;
let ethflow_order = ExtendedEthFlowOrder(EthflowOrder {
buy_token: dai.address().into_legacy(),
Expand Down
5 changes: 4 additions & 1 deletion crates/e2e/tests/e2e/refunder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ async fn refunder_tx(web3: Web3) {
let quote_response = services.submit_quote(&quote).await.unwrap();

let validity_duration = 600;
let valid_to = timestamp_of_current_block_in_seconds(&web3).await.unwrap() + validity_duration;
let valid_to = timestamp_of_current_block_in_seconds(&web3.alloy)
.await
.unwrap()
+ validity_duration;
// Accounting for slippage is necessary for the order to be picked up by the
// refunder
let ethflow_order =
Expand Down
33 changes: 33 additions & 0 deletions crates/ethrpc/src/alloy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ fn rpc(url: &str) -> RpcClient {
.http(url.parse().unwrap())
}

/// Creates an unbuffered [`RpcClient`] from the given URL with
/// [`LabelingLayer`] and [`InstrumentationLayer`] but WITHOUT
/// [`BatchCallLayer`].
///
/// This is useful for components that need to avoid batching (e.g., block
/// stream polling on high-frequency chains).
fn unbuffered_rpc(url: &str) -> RpcClient {
ClientBuilder::default()
.layer(LabelingLayer {
label: "main_unbuffered".into(),
})
.layer(InstrumentationLayer)
.http(url.parse().unwrap())
}

/// Creates an unbuffered provider for the given URL and label.
///
/// Unlike [`provider()`], this does not include batching.
/// Useful for read-only operations like block polling.
///
/// Returns a copy of the [`MutWallet`] so the caller can modify it later.
pub fn unbuffered_provider(url: &str) -> (AlloyProvider, MutWallet) {
let rpc = unbuffered_rpc(url);
let wallet = MutWallet::default();
let provider = ProviderBuilder::new()
.wallet(wallet.clone())
.with_simple_nonce_management()
.connect_client(rpc)
.erased();

(provider, wallet)
}

/// Creates a provider with the provided URL and an empty [`MutWallet`].
///
/// Returns a copy of the [`MutWallet`] so the caller can modify it later.
Expand Down
Loading
Loading