diff --git a/Cargo.lock b/Cargo.lock index 61d8454..593d3ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4679,6 +4679,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "hex", + "serde", ] [[package]] diff --git a/crates/uniswapx-rs/Cargo.toml b/crates/uniswapx-rs/Cargo.toml index 3c4fa6a..dbf05d5 100644 --- a/crates/uniswapx-rs/Cargo.toml +++ b/crates/uniswapx-rs/Cargo.toml @@ -10,4 +10,5 @@ alloy-primitives = "0.8.20" alloy-sol-types = "0.8.20" alloy-dyn-abi = "0.8.20" anyhow = "1.0.70" -hex = "0.4.3" \ No newline at end of file +hex = "0.4.3" +serde = "1.0.168" \ No newline at end of file diff --git a/crates/uniswapx-rs/src/order.rs b/crates/uniswapx-rs/src/order.rs index b35ec3e..c0a5f0a 100644 --- a/crates/uniswapx-rs/src/order.rs +++ b/crates/uniswapx-rs/src/order.rs @@ -6,6 +6,7 @@ use alloy_primitives::I256; use alloy_primitives::U256; use alloy_sol_types::sol; use anyhow::Result; +use serde::Serialize; use crate::sol_math::MulDiv; @@ -151,6 +152,14 @@ pub enum Order { V3DutchOrder(V3DutchOrder), } +#[derive(Serialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum TradeType { + #[serde(rename = "exactIn")] + ExactIn, + #[serde(rename = "exactOut")] + ExactOut, +} + impl Order { pub fn encode(&self) -> Vec { match self { @@ -159,6 +168,39 @@ impl Order { Order::V3DutchOrder(order) => order.encode_inner(), } } + + pub fn trade_type(&self) -> TradeType { + match self { + Order::V2DutchOrder(order) => { + if order.baseOutputs.iter().any(|o| o.startAmount == o.endAmount) { + TradeType::ExactOut + } else { + TradeType::ExactIn + } + } + Order::PriorityOrder(order) => { + if order.outputs.iter().any(|o| o.mpsPerPriorityFeeWei == U256::from(0)) { + TradeType::ExactOut + } else { + TradeType::ExactIn + } + } + Order::V3DutchOrder(order) => { + if order.baseOutputs.iter().any( + |o| o.curve.relativeAmounts.len() == 0 || + o.curve.relativeAmounts.iter().all(|&x| x.eq(&I256::ZERO)) + ) { + TradeType::ExactOut + } else { + TradeType::ExactIn + } + } + } + } + + pub fn is_exact_output(&self) -> bool { + matches!(self.trade_type(), TradeType::ExactOut) + } } #[derive(Debug, Clone)] @@ -268,7 +310,7 @@ impl PriorityOrder { PriorityOrder::abi_encode(self) } - pub fn resolve(&self, block_number: u64, block_timestamp: u64, block_time_ms: u64, priority_fee: U256, ) -> OrderResolution { + pub fn resolve(&self, block_number: u64, block_timestamp: u64, block_time_ms: u64, priority_fee: U256, min_block_percentage_buffer: u64) -> OrderResolution { let block_time = block_time_ms / 1000; let next_block_timestamp = U256::from(block_timestamp) + U256::from(block_time); @@ -294,7 +336,7 @@ impl PriorityOrder { block_timestamp, block_time_ms ); - let time_buffer_ms = block_time_ms * 1300 / 1000; // TODO: fine tune + let time_buffer_ms = block_time_ms * min_block_percentage_buffer / 100; if U256::from(current_timestamp_ms() + time_buffer_ms).lt(&target_block_ms) { return OrderResolution::NotFillableYet(ResolvedOrder { input, outputs }); } diff --git a/src/aws_utils/cloudwatch_utils.rs b/src/aws_utils/cloudwatch_utils.rs index 589314b..d84a4cb 100644 --- a/src/aws_utils/cloudwatch_utils.rs +++ b/src/aws_utils/cloudwatch_utils.rs @@ -26,6 +26,9 @@ pub const EXECUTION_ATTEMPTED_METRIC: &str = "ExecutionAttempted"; pub const EXECUTION_SKIPPED_ALREADY_FILLED_METRIC: &str = "ExecutionSkippedAlreadyFilled"; pub const EXECUTION_SKIPPED_PAST_DEADLINE_METRIC: &str = "ExecutionSkippedPastDeadline"; pub const UNPROFITABLE_METRIC: &str = "Unprofitable"; +pub const TARGET_BLOCK_DELTA: &str = "TargetBlockDelta"; +pub const REVERT_CODE_METRIC: &str = "RevertCode"; + pub enum DimensionName { Service, } @@ -85,9 +88,12 @@ pub enum CwMetrics { TxSubmitted(u64), TxStatusUnknown(u64), LatestBlock(u64), + RevertCode(u64, String), // chain_id and revert code string /// Balance for individual address Balance(String), + // negative is too early, positive is too late + TargetBlockDelta(u64), } impl From for String { fn from(metric: CwMetrics) -> Self { @@ -111,6 +117,8 @@ impl From for String { } CwMetrics::Balance(val) => format!("Bal-{}", val), CwMetrics::LatestBlock(chain_id) => format!("{}-{}", chain_id, LATEST_BLOCK), + CwMetrics::TargetBlockDelta(chain_id) => format!("{}-{}", chain_id, TARGET_BLOCK_DELTA), + CwMetrics::RevertCode(chain_id, code) => format!("{}-{}-{}", chain_id, REVERT_CODE_METRIC, code), } } } @@ -167,6 +175,10 @@ pub fn receipt_status_to_metric(status: bool, chain_id: u64) -> CwMetrics { } } +pub fn revert_code_to_metric(chain_id: u64, revert_code: String) -> CwMetrics { + CwMetrics::RevertCode(chain_id, revert_code) +} + pub fn build_metric_future( cloudwatch_client: Option>, dimension_value: DimensionValue, diff --git a/src/collectors/uniswapx_order_collector.rs b/src/collectors/uniswapx_order_collector.rs index ae7d601..2a3cd8a 100644 --- a/src/collectors/uniswapx_order_collector.rs +++ b/src/collectors/uniswapx_order_collector.rs @@ -122,7 +122,7 @@ impl Collector for UniswapXOrderCollector { "Starting UniswapX order collector stream" ); - // stream that polls the UniswapX API every 5 seconds + // stream that polls the UniswapX API let stream = IntervalStream::new(tokio::time::interval(Duration::from_millis( POLL_INTERVAL_MS, ))) @@ -133,31 +133,45 @@ impl Collector for UniswapXOrderCollector { async move { tracing::debug!("Polling UniswapX API for new orders"); - let response = match client.get(url.clone()) - .header("x-api-key", api_key) - .send() - .await { - Ok(resp) => resp, - Err(e) => { - tracing::error!(error = %e, "Failed to fetch orders from UniswapX API"); - return Err(anyhow::anyhow!("Failed to fetch orders: {}", e)); + #[allow(unused_assignments)] + let mut last_error = None; + loop { + match client.get(url.clone()) + .header("x-api-key", api_key.clone()) + .send() + .await { + Ok(resp) => { + match resp.json::().await { + Ok(data) => { + tracing::debug!( + num_orders = data.orders.len(), + "Successfully fetched orders from UniswapX API" + ); + return Ok(data.orders); + }, + Err(e) => { + last_error = Some(e.to_string()); + tracing::warn!( + error = %e, + "Failed to parse UniswapX API response, retrying..." + ); + } + } + }, + Err(e) => { + last_error = Some(e.to_string()); + tracing::warn!( + error = %e, + "Failed to fetch orders from UniswapX API, retrying..." + ); + } } - }; - - let data = match response.json::().await { - Ok(data) => data, - Err(e) => { - tracing::error!(error = %e, "Failed to parse UniswapX API response"); - return Err(anyhow::anyhow!("Failed to parse response: {}", e)); + + if let Some(err) = last_error { + tracing::warn!(error = %err, "Error in order stream, retrying..."); + tokio::time::sleep(Duration::from_millis(1000)).await; } - }; - - tracing::debug!( - num_orders = data.orders.len(), - "Successfully fetched orders from UniswapX API" - ); - - Ok(data.orders) + } } }) .flat_map( @@ -268,6 +282,7 @@ mod tests { } else { encoded_order }; + tracing::info!("encoded_order: {:?}", encoded_order); let order_hex: Vec = hex::decode(encoded_order).unwrap(); let result = V2DutchOrder::decode_inner(&order_hex, false); diff --git a/src/collectors/uniswapx_route_collector.rs b/src/collectors/uniswapx_route_collector.rs index 1b4746b..3812627 100644 --- a/src/collectors/uniswapx_route_collector.rs +++ b/src/collectors/uniswapx_route_collector.rs @@ -7,7 +7,7 @@ use reqwest::header::ORIGIN; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{error, info}; -use uniswapx_rs::order::{Order, ResolvedOrder}; +use uniswapx_rs::order::{Order, ResolvedOrder, TradeType}; use artemis_core::types::{Collector, CollectorStream}; use async_trait::async_trait; @@ -40,20 +40,12 @@ pub struct OrderBatchData { pub orders: Vec, pub chain_id: u64, pub amount_in: Uint<256, 4>, - pub amount_out_required: Uint<256, 4>, + pub amount_out: Uint<256, 4>, + pub amount_required: Uint<256, 4>, pub token_in: String, pub token_out: String, } -#[derive(Serialize, Debug)] -#[allow(dead_code)] -enum TradeType { - #[serde(rename = "exactIn")] - ExactIn, - #[serde(rename = "exactOut")] - ExactOut, -} - #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] struct RoutingApiQuery { @@ -82,6 +74,16 @@ pub struct TokenInRoute { decimals: String, } +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +#[allow(dead_code)] +pub struct V4Route { + address: String, + token_in: TokenInRoute, + token_out: TokenInRoute, + fee: String, +} + #[derive(Clone, Debug, Deserialize)] #[serde(rename_all = "camelCase")] #[allow(dead_code)] @@ -104,6 +106,8 @@ pub struct V2Route { #[derive(Clone, Debug, Deserialize)] #[serde(tag = "type")] pub enum Route { + #[serde(rename = "v4-pool")] + V4(V4Route), #[serde(rename = "v3-pool")] V3(V3Route), #[serde(rename = "v2-pool")] @@ -128,6 +132,7 @@ pub struct RouteOrderParams { pub token_out: String, pub amount: String, pub recipient: String, + pub trade_type: TradeType, } #[derive(Clone, Debug)] @@ -177,13 +182,12 @@ impl UniswapXRouteCollector { params: RouteOrderParams, order_hash: String, ) -> Result { - // TODO: support exactOutput let query = RoutingApiQuery { token_in_address: resolve_address(params.token_in), token_out_address: resolve_address(params.token_out), token_in_chain_id: params.chain_id, token_out_chain_id: params.chain_id, - trade_type: TradeType::ExactIn, + trade_type: params.trade_type, amount: params.amount, recipient: params.recipient, slippage_tolerance: SLIPPAGE_TOLERANCE.to_string(), @@ -192,7 +196,7 @@ impl UniswapXRouteCollector { protocols: "v2,v3,v4,mixed".to_string(), }; - let query_string = serde_qs::to_string(&query).unwrap(); + let query_string = serde_qs::to_string(&query)?; let full_query = format!("{}?{}", ROUTING_API, query_string); info!("{} - full query: {}", order_hash, full_query); let client = reqwest::Client::new(); @@ -202,6 +206,7 @@ impl UniswapXRouteCollector { .get(format!("{}?{}", ROUTING_API, query_string)) .header(ORIGIN, "https://app.uniswap.org") .header("x-request-source", "uniswap-web") + .header("x-universal-router-version", "2.0") .send() .await .map_err(|e| anyhow!("Quote request failed with error: {}", e))?; @@ -218,10 +223,14 @@ impl UniswapXRouteCollector { } match response.status() { - StatusCode::OK => Ok(response - .json::() - .await - .map_err(|e| anyhow!("{} - Failed to parse response: {}", order_hash, e))?), + StatusCode::OK => { + let order_route = response + .json::() + .await + .map_err(|e| anyhow!("{} - Failed to parse response: {}", order_hash, e))?; + info!("{} - Received route: {:?}", order_hash, order_route); + Ok(order_route) + } StatusCode::BAD_REQUEST => Err(anyhow!( "{} - Bad request: {}", order_hash, @@ -292,20 +301,24 @@ impl Collector for UniswapXRouteCollector { for batch in all_requests { let order_hash = batch.orders[0].hash.clone(); - let OrderBatchData { token_in, token_out, amount_in, .. } = batch.clone(); + let OrderBatchData { token_in, token_out, amount_in, amount_out, .. } = batch.clone(); info!( - "{} - Routing order, token in: {}, token out: {}", + "{} - Routing order, token in: {}, token out: {}, amount in: {}, amount out: {}", order_hash, - token_in, token_out + token_in, token_out, amount_in, amount_out ); - let future = async move { let route_result = self.route_order(RouteOrderParams { chain_id: self.chain_id, token_in: token_in.clone(), token_out: token_out.clone(), - amount: amount_in.to_string(), + amount: if batch.orders[0].order.is_exact_output() { + amount_out.to_string() + } else { + amount_in.to_string() + }, recipient: self.executor_address.clone(), + trade_type: batch.orders[0].order.trade_type(), }, order_hash).await; (batch, route_result) }; diff --git a/src/executors/protect_executor.rs b/src/executors/protect_executor.rs index 9c7f056..32bfb22 100644 --- a/src/executors/protect_executor.rs +++ b/src/executors/protect_executor.rs @@ -1,9 +1,9 @@ -use alloy_primitives::{utils::format_units, U128}; -use std::sync::Arc; +use alloy_primitives::{utils::format_units, Address, U128}; +use std::{str::FromStr, sync::Arc}; use tracing::{info, warn}; use alloy::{ - network::{AnyNetwork, ReceiptResponse, TransactionBuilder}, + network::{AnyNetwork, EthereumWallet, ReceiptResponse, TransactionBuilder}, providers::{DynProvider, Provider}, rpc::types::TransactionReceipt, serde::WithOtherFields, @@ -17,13 +17,12 @@ use aws_sdk_cloudwatch::Client as CloudWatchClient; use crate::{ aws_utils::cloudwatch_utils::{ - build_metric_future, receipt_status_to_metric, CwMetrics, DimensionValue, - }, - executors::reactor_error_code::ReactorErrorCode, - send_metric, - strategies::keystore::KeyStore, + build_metric_future, receipt_status_to_metric, revert_code_to_metric, CwMetrics, DimensionValue + }, executors::reactor_error_code::{get_revert_reason, ReactorErrorCode}, send_metric, shared::get_nonce_with_retry, strategies::keystore::KeyStore }; +const GAS_LIMIT: u64 = 1_000_000; + /// An executor that sends transactions to the mempool. pub struct ProtectExecutor { client: Arc>, @@ -70,12 +69,12 @@ impl Executor for ProtectExecutor { } // Acquire a key from the key store - let (public_address, private_key) = self + let (addr, private_key) = self .key_store .acquire_key() .await .expect("Failed to acquire key"); - info!("Acquired key: {}", public_address); + info!("Acquired key: {}", addr); let chain_id = u64::from_str_radix( &action @@ -87,14 +86,23 @@ impl Executor for ProtectExecutor { ) .expect("Failed to parse chain ID"); - let wallet: PrivateKeySigner = private_key - .as_str() - .parse::() - .unwrap() - .with_chain_id(Some(chain_id)); - let address = wallet.address(); + + let wallet = EthereumWallet::from( + private_key + .as_str() + .parse::() + .unwrap() + .with_chain_id(Some(chain_id)), + ); + let address = Address::from_str(&addr).unwrap(); action.tx.set_from(address); + + // Retry up to 3 times to get the nonce. + let nonce = get_nonce_with_retry(&self.sender_client, address, "", 3).await?; + action.tx.set_nonce(nonce); + action.tx.set_gas_limit(GAS_LIMIT); + let gas_usage_result = self.client.estimate_gas(&action.tx).await.or_else(|err| { if let Some(raw) = &err.as_error_resp().unwrap().data { if let Ok(serde_value) = serde_json::from_str::(raw.get()) { @@ -182,7 +190,9 @@ impl Executor for ProtectExecutor { send_metric!(metric_future); } - let result = sender_client.send_transaction(action.tx).await; + let tx_request_for_revert = action.tx.clone(); + let tx = action.tx.build(&wallet).await?; + let result = sender_client.send_tx_envelope(tx).await; // Block on pending transaction getting confirmations let (receipt, status) = match result { @@ -199,6 +209,30 @@ impl Executor for ProtectExecutor { "receipt: tx_hash: {:?}, status: {}", receipt.transaction_hash, status, ); + + if !status && receipt.block_number.is_some() { + info!("Attempting to get revert reason"); + // Parse revert reason + match get_revert_reason(&self.sender_client, tx_request_for_revert, receipt.block_number.unwrap()).await { + Ok(reason) => { + info!("Revert reason: {}", reason); + let metric_future = build_metric_future( + self.cloudwatch_client.clone(), + DimensionValue::V3Executor, + revert_code_to_metric(chain_id, reason.to_string()), + 1.0, + ); + if let Some(metric_future) = metric_future { + // do not block current thread by awaiting in the background + send_metric!(metric_future); + } + } + Err(e) => { + info!("Failed to get revert reason - error: {:?}", e); + } + } + } + (Some(receipt), status) } Err(e) => { @@ -213,9 +247,9 @@ impl Executor for ProtectExecutor { } }; - match self.key_store.release_key(public_address.clone()).await { + match self.key_store.release_key(addr.clone()).await { Ok(_) => { - info!("Released key: {}", public_address); + info!("Released key: {}", addr); } Err(e) => { info!("Failed to release key: {}", e); diff --git a/src/executors/public_1559_executor.rs b/src/executors/public_1559_executor.rs index add8c87..d1db7be 100644 --- a/src/executors/public_1559_executor.rs +++ b/src/executors/public_1559_executor.rs @@ -1,26 +1,39 @@ use std::{str::FromStr, sync::Arc}; -use tracing::{info, warn}; +use tracing::{info, warn, debug}; use alloy::{ - eips::{BlockId, BlockNumberOrTag}, network::{ - AnyNetwork, EthereumWallet, ReceiptResponse, TransactionBuilder - }, primitives::{utils::format_units, Address, U256}, providers::{DynProvider, Provider}, rpc::types::TransactionRequest, serde::WithOtherFields, signers::{local::PrivateKeySigner, Signer} + eips::{BlockId, BlockNumberOrTag}, + network::{AnyNetwork, EthereumWallet, ReceiptResponse, TransactionBuilder}, + primitives::{utils::format_units, Address, U128, U256}, + providers::{DynProvider, Provider}, + rpc::types::TransactionRequest, + serde::WithOtherFields, + signers::{local::PrivateKeySigner, Signer}, }; use anyhow::{Context, Result}; use artemis_core::types::Executor; use async_trait::async_trait; use aws_sdk_cloudwatch::Client as CloudWatchClient; +use uniswapx_rs::order::BPS; use crate::{ aws_utils::cloudwatch_utils::{ - build_metric_future, receipt_status_to_metric, CwMetrics, DimensionValue, - }, executors::reactor_error_code::ReactorErrorCode, shared::send_metric_with_order_hash, strategies::{keystore::KeyStore, types::SubmitTxToMempoolWithExecutionMetadata} + build_metric_future, receipt_status_to_metric, revert_code_to_metric, CwMetrics, DimensionValue + }, + executors::reactor_error_code::ReactorErrorCode, + shared::{get_nonce_with_retry, send_metric_with_order_hash, u256}, + strategies::{keystore::KeyStore, types::SubmitTxToMempoolWithExecutionMetadata} }; use crate::executors::reactor_error_code::get_revert_reason; const GAS_LIMIT: u64 = 1_000_000; const MAX_RETRIES: u32 = 3; const TX_BACKOFF_MS: u64 = 0; // retry immediately +static QUOTE_BASED_PRIORITY_BID_BUFFER: U256 = u256!(2); +static GWEI_PER_ETH: U256 = u256!(1_000_000_000); +const QUOTE_ETH_LOG10_THRESHOLD: usize = 8; +// The number of bps to add to the base bid for each fallback bid +const DEFAULT_FALLBACK_BID_SCALE_FACTOR: u64 = 50; /// An executor that sends transactions to the public mempool. pub struct Public1559Executor { @@ -52,14 +65,42 @@ impl Public1559Executor { } } + fn increment_tx_metric( + &self, + order_hash: &Arc, + chain_id: u64, + outcome: &Result, + ) { + if let Some(cloudwatch_client) = &self.cloudwatch_client { + let metric = match outcome { + Ok(TransactionOutcome::Success(_)) => CwMetrics::TxSucceeded(chain_id), + Ok(TransactionOutcome::Failure(_)) | Ok(TransactionOutcome::RetryableFailure) => CwMetrics::TxReverted(chain_id), + Err(_) => CwMetrics::TxStatusUnknown(chain_id), + }; + + let metric_future = build_metric_future( + Some(cloudwatch_client.clone()), + DimensionValue::PriorityExecutor, + metric, + 1.0, + ); + if let Some(metric_future) = metric_future { + send_metric_with_order_hash!(&Arc::new(order_hash.to_string()), metric_future); + } + } + } + async fn send_transaction( &self, wallet: &EthereumWallet, tx_request: WithOtherFields, order_hash: &str, + chain_id: u64, + target_block: Option, ) -> Result { let tx_request_for_revert = tx_request.clone(); let tx = tx_request.build(wallet).await?; + info!("{} - Sending transaction to RPC", order_hash); let result = self.sender_client.send_tx_envelope(tx).await; match result { @@ -73,36 +114,59 @@ impl Public1559Executor { anyhow::anyhow!("{} - Error waiting for confirmations: {}", order_hash, e) }); + match receipt { Ok(receipt) => { + let target_block_delta: f64 = receipt.block_number.unwrap() as f64 - target_block.unwrap() as f64; + if let Some(target_block) = target_block { + info!("{} - target block delta: {}, target_block: {}, actual_block: {}", order_hash, target_block_delta, target_block, receipt.block_number.unwrap()); + } + let metric_future = build_metric_future( + self.cloudwatch_client.clone(), + DimensionValue::PriorityExecutor, + CwMetrics::TargetBlockDelta(chain_id), + target_block_delta as f64, + ); + if let Some(metric_future) = metric_future { + send_metric_with_order_hash!(&Arc::new(order_hash.to_string()), metric_future); + } let status = receipt.status(); info!( "{} - receipt: tx_hash: {:?}, status: {}", order_hash, receipt.transaction_hash, status, ); - if !status { + if !status && receipt.block_number.is_some() { info!("{} - Attempting to get revert reason", order_hash); // Parse revert reason - let revert_reason = get_revert_reason( - &self.sender_client, - tx_request_for_revert, - receipt.block_number.unwrap() - ).await; + match get_revert_reason(&self.sender_client, tx_request_for_revert, receipt.block_number.unwrap()).await { - if let Ok(reason) = revert_reason { - info!("{} - Revert reason: {}", order_hash, reason); - // Retry if the order isn't yet fillable - if matches!(reason, ReactorErrorCode::OrderNotFillable) { - return Ok(TransactionOutcome::RetryableFailure); + Ok(reason) => { + info!("{} - Revert reason: {}", order_hash, reason); + let metric_future = build_metric_future( + self.cloudwatch_client.clone(), + DimensionValue::PriorityExecutor, + revert_code_to_metric(chain_id, reason.to_string()), + 1.0, + ); + if let Some(metric_future) = metric_future { + // do not block current thread by awaiting in the background + send_metric_with_order_hash!(&Arc::new(order_hash.to_string()), metric_future); + } + // Retry if the order isn't yet fillable + if matches!(reason, ReactorErrorCode::OrderNotFillable) { + return Ok(TransactionOutcome::RetryableFailure); + } + else { + info!("{} - Order not fillable, returning failure", order_hash); + return Ok(TransactionOutcome::Failure(receipt.block_number)); + } } - else { - info!("{} - Order not fillable, returning failure", order_hash); - return Ok(TransactionOutcome::Failure(receipt.block_number)); + Err(e) => { + info!("{} - Failed to get revert reason - error: {:?}", order_hash, e); + Ok(TransactionOutcome::Failure(None)) } } - info!("{} - Failed to get revert reason - error: {:?}", order_hash, revert_reason.err().unwrap()); - Ok(TransactionOutcome::Failure(None)) } else { Ok(TransactionOutcome::Success(receipt.block_number)) } @@ -119,298 +183,503 @@ impl Public1559Executor { } } } + + fn get_bids_for_order( + &self, + action: &SubmitTxToMempoolWithExecutionMetadata, + order_hash: &str, + ) -> Vec> { + let mut bid_priority_fees: Vec> = vec![]; + + // priority fee at which we'd break even, meaning 100% of profit goes to user in the form of price improvement + if action.metadata.gas_use_estimate_quote > U256::from(0) { + let quote_based_priority_bid = action + .metadata + .calculate_priority_fee_from_gas_use_estimate(QUOTE_BASED_PRIORITY_BID_BUFFER); + if let Some(bid) = quote_based_priority_bid { + bid_priority_fees.push(Some(bid)); + debug!("{} - quote_based_priority_bid: {:?}", order_hash, bid); + } + } + + // If the quote is large in ETH, add more bids + // < 1e5 gwei = 1 fallback bid, 1e6 = 2 fallback bids, 1e7 = 3 fallback bids, etc. + let mut num_fallback_bids = 3; + if let Some(quote_eth) = action.metadata.quote_eth { + if quote_eth > U256::from(0) { + debug!("{} - Adding fallback bids based on quote size", order_hash); + let quote_in_gwei = "e_eth / GWEI_PER_ETH; + debug!("{} - quote_eth_gwei: {:?}", order_hash, quote_in_gwei); + + if quote_in_gwei > U256::from(0) { + let quote_gwei_log10 = quote_in_gwei.log10(); + debug!("{} - quote_gwei_log10: {:?}", order_hash, quote_gwei_log10); + if quote_gwei_log10 > QUOTE_ETH_LOG10_THRESHOLD { + num_fallback_bids += (quote_gwei_log10 - QUOTE_ETH_LOG10_THRESHOLD) as u64; + } + } + } + } + + // Each fallback bid is 10000 - BID_SCALE_FACTOR * 2^i + // If BID_SCALE_FACTOR = 50, then the bids are: + // 9950, 9900, 9800, 9600, 9200, ... + for i in 0..num_fallback_bids { + // Check if the shift would cause overflow or if the result would be negative + let bid_scale_factor = action.metadata.fallback_bid_scale_factor.unwrap_or(DEFAULT_FALLBACK_BID_SCALE_FACTOR); + let bid_reduction = U128::from(bid_scale_factor * (1 << i)); + if bid_reduction >= U128::from(BPS) { + // Stop generating more fallback bids + break; + } + + let bid_bps = U128::from(BPS) - bid_reduction; + let fallback_bid = action + .metadata + .calculate_priority_fee(bid_bps); + if let Some(bid) = fallback_bid { + bid_priority_fees.push(Some(bid)); + debug!("{} - fallback_bid_{}: {:?}", order_hash, i, bid); + } + } + + bid_priority_fees + } } #[async_trait] impl Executor for Public1559Executor { /// Send a transaction to the mempool. async fn execute(&self, mut action: SubmitTxToMempoolWithExecutionMetadata) -> Result<()> { + info!("{} - Executing transaction", action.metadata.order_hash); let order_hash = Arc::new(action.metadata.order_hash.clone()); - let chain_id_u64 = action - .execution - .tx - .chain_id() - .expect("Chain ID not found on transaction") - .to_string() - .parse::() - .unwrap(); - - let metric_future = build_metric_future( - self.cloudwatch_client.clone(), - DimensionValue::PriorityExecutor, - CwMetrics::ExecutionAttempted(chain_id_u64), - 1.0, - ); - if let Some(metric_future) = metric_future { - send_metric_with_order_hash!(&order_hash, metric_future); - } - - // Acquire a key from the key store - let (public_address, private_key) = self - .key_store - .acquire_key() - .await - .expect("Failed to acquire key"); - info!("{} - Acquired key: {}", order_hash, public_address); - - let chain_id = u64::from_str_radix( - &action + + // Initialize this variable outside the main logic so we can access it in the cleanup section + let mut public_address = None; + + // Use a closure to handle the main logic with ? operator for early returns + let result = async { + let chain_id_u64 = action .execution .tx .chain_id() .expect("Chain ID not found on transaction") - .to_string(), - 10, - ) - .expect("Failed to parse chain ID"); - - let wallet = EthereumWallet::from( - private_key - .as_str() - .parse::() - .unwrap() - .with_chain_id(Some(chain_id)), - ); - let address = Address::from_str(&public_address).unwrap(); + .to_string() + .parse::() + .unwrap(); + + let metric_future = build_metric_future( + self.cloudwatch_client.clone(), + DimensionValue::PriorityExecutor, + CwMetrics::ExecutionAttempted(chain_id_u64), + 1.0, + ); + if let Some(metric_future) = metric_future { + send_metric_with_order_hash!(&order_hash, metric_future); + } - action.execution.tx.set_from(address); + // Acquire a key from the key store + let (addr, private_key) = self + .key_store + .acquire_key() + .await + .context("Failed to acquire key")?; + + // Store the address for cleanup + public_address = Some(addr.clone()); + + info!("{} - Acquired key: {}", order_hash, addr); - // early return on OrderAlready filled - // always use 1_000_000 gas for now - let target_block = match action.metadata.target_block { - Some(b) => BlockId::Number(b.into()), - _ => BlockId::Number(BlockNumberOrTag::Latest), - }; + let chain_id = u64::from_str_radix( + &action + .execution + .tx + .chain_id() + .expect("Chain ID not found on transaction") + .to_string(), + 10, + ) + .expect("Failed to parse chain ID"); - info!( - "{} - target_block: {}", - order_hash, - target_block.as_u64().unwrap() - ); + let wallet = EthereumWallet::from( + private_key + .as_str() + .parse::() + .unwrap() + .with_chain_id(Some(chain_id)), + ); + let address = Address::from_str(&addr).unwrap(); - // estimate_gas always fails because of target block being a future block - /* - let gas_usage_result = self - .client - .estimate_gas(&action.execution.tx) - .await - .or_else(|err| { - if let Some(raw) = &err.as_error_resp().unwrap().data { - if let Ok(serde_value) = serde_json::from_str::(raw.get()) { - if let serde_json::Value::String(four_byte) = serde_value { - let error_code = ReactorErrorCode::from(four_byte.clone()); - match error_code { - ReactorErrorCode::OrderAlreadyFilled => { - info!( - "{} - Order already filled, skipping execution", - order_hash - ); - let metric_future = build_metric_future( - self.cloudwatch_client.clone(), - DimensionValue::PriorityExecutor, - CwMetrics::ExecutionSkippedAlreadyFilled(chain_id_u64), - 1.0, - ); - if let Some(metric_future) = metric_future { - send_metric_with_order_hash!(&order_hash, metric_future); - } - Err(anyhow::anyhow!("Order Already Filled")) - } - ReactorErrorCode::InvalidDeadline => { - info!( - "{} - Order past deadline, skipping execution", - order_hash - ); - let metric_future = build_metric_future( - self.cloudwatch_client.clone(), - DimensionValue::PriorityExecutor, - CwMetrics::ExecutionSkippedPastDeadline(chain_id_u64), - 1.0, - ); - if let Some(metric_future) = metric_future { - send_metric_with_order_hash!(&order_hash, metric_future); - } - Err(anyhow::anyhow!("Order Past Deadline")) - } - _ => Ok(GAS_LIMIT), - } - } else { - warn!("{} - Unexpected error data: {:?}", order_hash, serde_value); - Ok(GAS_LIMIT) - } - } else { - warn!("{} - Failed to parse error data: {:?}", order_hash, err); - Ok(GAS_LIMIT) - } - } else { - warn!("{} - Error estimating gas: {:?}", order_hash, err); - Ok(GAS_LIMIT) - } - }); + action.execution.tx.set_from(address); - let gas_usage = match gas_usage_result { - Ok(gas) => gas, - Err(e) => { - warn!("{} - Error getting gas usage: {}", order_hash, e); - // Release the key before returning - match self.key_store.release_key(public_address.clone()).await { - Ok(_) => { - info!("{} - Released key: {}", order_hash, public_address); - } - Err(release_err) => { - warn!("{} - Failed to release key: {}", order_hash, release_err); - } - } - return Err(e); - } - }; - */ - - let bid_priority_fee; - let base_fee = self - .client - .get_gas_price() - .await - .context("Error getting gas price: {}")?; - - if let Some(gas_bid_info) = action.execution.gas_bid_info { - // priority fee at which we'd break even, meaning 100% of profit goes to user in the form of price improvement - // TODO: use gas estimate here - bid_priority_fee = action - .metadata - .calculate_priority_fee(gas_bid_info.bid_percentage) - } else { - bid_priority_fee = Some(U256::from(50)); - } + // early return on OrderAlready filled + // always use 1_000_000 gas for now + let target_block = match action.metadata.target_block { + Some(b) => BlockId::Number(b.into()), + _ => BlockId::Number(BlockNumberOrTag::Latest), + }; - if bid_priority_fee.is_none() { info!( - "{} - No bid priority fee, indicating quote < amount_out_required; skipping", - order_hash + "{} - target_block: {}", + order_hash, + target_block.as_u64().unwrap() ); - // Release the key before returning - match self.key_store.release_key(public_address.clone()).await { - Ok(_) => { - info!("{} - Released key: {}", order_hash, public_address); - } - Err(release_err) => { - warn!("{} - Failed to release key: {}", order_hash, release_err); + + let base_fee = self + .client + .get_gas_price() + .await + .context("Error getting gas price: {}")?; + let bid_priority_fees = self.get_bids_for_order(&action, &order_hash); + + if bid_priority_fees.len() == 0 { + info!( + "{} - No bid priority fees, indicating quote < amount_out_required; skipping", + order_hash + ); + info!("{} - Quote < amount_out_required; skipping", order_hash); + return Err(anyhow::anyhow!("Quote < amount_out_required")); + } + + // Create a tx for each bid + let mut tx_requests: Vec> = Vec::new(); + for bid_priority_fee in bid_priority_fees.iter() { + if let Some(bid) = bid_priority_fee { + let mut tx_request = action.execution.tx.clone(); + let bid_priority_fee_128 = bid.to::(); + tx_request.set_gas_limit(GAS_LIMIT); + tx_request.set_max_fee_per_gas(base_fee + bid_priority_fee_128); + tx_request.set_max_priority_fee_per_gas(bid_priority_fee_128); + tx_requests.push(tx_request); } } - info!("{} - Quote < amount_out_required; skipping", order_hash); - return Err(anyhow::anyhow!("Quote < amount_out_required")); - } - let mut tx_request = action.execution.tx.clone(); - let bid_priority_fee_128 = bid_priority_fee.unwrap().to::(); - tx_request.set_gas_limit(GAS_LIMIT); - tx_request.set_max_fee_per_gas(base_fee + bid_priority_fee_128); - tx_request.set_max_priority_fee_per_gas(bid_priority_fee_128); + let sender_client = self.sender_client.clone(); + + // Retry up to 3 times to get the nonce. + let mut nonce = get_nonce_with_retry(&sender_client, address, &order_hash, 3).await?; - let sender_client = self.sender_client.clone(); + // Sort transactions by max_priority_fee_per_gas in descending order so that the highest bid is first + tx_requests.sort_by(|a, b| { + let a_fee = a.max_priority_fee_per_gas().unwrap(); + let b_fee = b.max_priority_fee_per_gas().unwrap(); + b_fee.cmp(&a_fee) + }); + + // Set unique nonces for each transaction + for tx_request in tx_requests.iter_mut() { + tx_request.set_nonce(nonce); + nonce += 1; + } + + info!("{} - Executing {} transactions in parallel from {:?}", order_hash, tx_requests.len(), address); - // Retry up to 3 times to get the nonce. - let nonce = { let mut attempts = 0; - loop { - match sender_client.get_transaction_count(address).await { - Ok(nonce) => break nonce, - Err(e) => { - if attempts < 2 { - attempts += 1; - } else { - return Err(anyhow::anyhow!( - "{} - Failed to get nonce after 3 attempts: {}", - order_hash, - e - )); + let mut success = false; + let mut block_number = None; + let mut retryable_failure = true; + + // Retry tx submission on retryable failures if none of the transactions succeeded + while attempts < MAX_RETRIES && !success && retryable_failure { + + let metric_future = build_metric_future( + self.cloudwatch_client.clone(), + DimensionValue::PriorityExecutor, + CwMetrics::TxSubmitted(chain_id), + 1.0, + ); + if let Some(metric_future) = metric_future { + send_metric_with_order_hash!(&Arc::new(order_hash.to_string()), metric_future); + } + + // Create futures for all transactions + let futures: Vec<_> = tx_requests.iter().map(|tx_request| { + self.send_transaction(&wallet, tx_request.clone(), &order_hash, chain_id_u64, target_block.as_u64()) + }).collect(); + + // Wait for all transactions to complete + let results = futures::future::join_all(futures).await; + + // Check results + retryable_failure = false; + for (i, result) in results.iter().enumerate() { + self.increment_tx_metric(&order_hash, chain_id, result); + match result { + Ok(TransactionOutcome::Success(result)) => { + success = true; + block_number = *result; + info!("{} - Transaction {} succeeded at block {}", order_hash, i, block_number.unwrap()); + break; + } + Ok(TransactionOutcome::Failure(result)) => { + if i == results.len() - 1 { + block_number = *result; + } + // Find the transaction that won the bid and compare the winning bid to our own bid + + } + Ok(TransactionOutcome::RetryableFailure) => { + retryable_failure = true; + // Continue to next attempt + } + Err(_) => { + // Continue to next attempt } } } - } - }; - tx_request.set_nonce(nonce); - info!("{} - Executing tx from {:?}", order_hash, address); - let mut attempts = 0; - - // Retry tx submission on retryable failures - let (block_number, status) = loop { - match self.send_transaction(&wallet, tx_request.clone(), &order_hash).await { - Ok(TransactionOutcome::Success(result)) => { - break (result, true); - } - Ok(TransactionOutcome::Failure(result)) => { - break (result, false); - } - Ok(TransactionOutcome::RetryableFailure) if attempts < MAX_RETRIES => { + if !success && attempts < MAX_RETRIES - 1 { attempts += 1; info!( - "{} - Order not fillable, retrying in {}ms (attempt {}/{})", + "{} - All transactions failed, retrying in {}ms (attempt {}/{})", order_hash, TX_BACKOFF_MS, attempts, MAX_RETRIES ); - tx_request.set_nonce(nonce + attempts as u64); + // Update nonces for next attempt + for tx_request in tx_requests.iter_mut() { + tx_request.set_nonce(nonce); + nonce += 1; + } tokio::time::sleep(tokio::time::Duration::from_millis(TX_BACKOFF_MS)).await; - continue; - } - Ok(TransactionOutcome::RetryableFailure) | Err(_) => { - break (None, false); } } - }; - // regardless of outcome, ensure we release the key - match self.key_store.release_key(public_address.clone()).await { - Ok(_) => { - info!("{} - Released key: {}", order_hash, public_address); - } - Err(e) => { - info!("{} - Failed to release key: {}", order_hash, e); - } - } - - // post key-release processing - if let Some(_) = &self.cloudwatch_client { - let metric_future = build_metric_future( - self.cloudwatch_client.clone(), - DimensionValue::PriorityExecutor, - receipt_status_to_metric(status, chain_id_u64), - 1.0, - ); - if let Some(metric_future) = metric_future { - // do not block current thread by awaiting in the background - send_metric_with_order_hash!(&order_hash, metric_future); - } - } - - if status { - let balance_eth = self - .client - .get_balance(address) - .await - .map_or_else(|_| None, |v| Some(format_units(v, "ether").unwrap())); - - // TODO: use if-let chains when it becomes stable https://github.com/rust-lang/rust/issues/53667 - // if let Some(balance_eth) = balance_eth && let Some(cw) = &self.cloudwatch_client { - if let Some(balance_eth) = balance_eth { - info!( - "{}- balance: {} at block {}", - order_hash, - balance_eth.clone(), - block_number.unwrap() - ); + // post key-release processing + if let Some(_) = &self.cloudwatch_client { let metric_future = build_metric_future( self.cloudwatch_client.clone(), DimensionValue::PriorityExecutor, - CwMetrics::Balance(format!("{:?}", address)), - balance_eth.parse::().unwrap_or(0.0), + receipt_status_to_metric(success, chain_id_u64), + 1.0, ); if let Some(metric_future) = metric_future { + // do not block current thread by awaiting in the background send_metric_with_order_hash!(&order_hash, metric_future); } } + + if success { + let balance_eth = self + .client + .get_balance(address) + .await + .map_or_else(|_| None, |v| Some(format_units(v, "ether").unwrap())); + + // TODO: use if-let chains when it becomes stable https://github.com/rust-lang/rust/issues/53667 + // if let Some(balance_eth) = balance_eth && let Some(cw) = &self.cloudwatch_client { + if let Some(balance_eth) = balance_eth { + info!( + "{}- balance: {} at block {}", + order_hash, + balance_eth.clone(), + block_number.unwrap() + ); + let metric_future = build_metric_future( + self.cloudwatch_client.clone(), + DimensionValue::PriorityExecutor, + CwMetrics::Balance(format!("{:?}", address)), + balance_eth.parse::().unwrap_or(0.0), + ); + if let Some(metric_future) = metric_future { + send_metric_with_order_hash!(&order_hash, metric_future); + } + } + } + + Ok(()) + }.await; + + // Ensure key is released if it was acquired + if let Some(addr) = public_address { + match self.key_store.release_key(addr.clone()).await { + Ok(_) => { + info!("{} - Released key: {}", order_hash, addr); + } + Err(e) => { + warn!("{} - Failed to release key: {}", order_hash, e); + } + } } + + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::{U256, U128, U64}; + use alloy::network::AnyNetwork; + use alloy::providers::{DynProvider, Provider, RootProvider}; + use alloy::rpc::types::TransactionRequest; + use crate::strategies::types::SubmitTxToMempoolWithExecutionMetadata; + use crate::strategies::priority_strategy::ExecutionMetadata; + use artemis_core::executors::mempool_executor::{SubmitTxToMempool, GasBidInfo}; + use std::sync::Arc; + + // Mock provider that implements the Provider trait + #[derive(Clone)] + struct MockProvider; + impl Provider for MockProvider { + fn root(&self) -> &RootProvider { + unimplemented!("Mock provider does not support root provider") + } + } + + // Helper function to create a test action + fn create_test_action( + quote_size: U256, + amount_required: U256, + gas_estimate: U256, + is_exact_output: bool, + target_block: Option, + ) -> SubmitTxToMempoolWithExecutionMetadata { + let action = SubmitTxToMempoolWithExecutionMetadata { + execution: SubmitTxToMempool { + tx: WithOtherFields::new(TransactionRequest::default()), + gas_bid_info: Some(GasBidInfo { + bid_percentage: U128::from(0), + total_profit: U128::from(0), + }), + }, + metadata: ExecutionMetadata { + quote: quote_size, + quote_eth: Some(quote_size), + exact_output: is_exact_output, + amount_required: amount_required, + gas_use_estimate_quote: gas_estimate, + order_hash: "test_hash".to_string(), + target_block: target_block.map(|b| U64::from(b)), + fallback_bid_scale_factor: Some(DEFAULT_FALLBACK_BID_SCALE_FACTOR), + }, + }; + action + } + + #[tokio::test] + async fn test_get_bids_for_order_small_quote() { + let executor = Public1559Executor::new( + Arc::new(DynProvider::new(MockProvider)), + Arc::new(DynProvider::new(MockProvider)), + Arc::new(KeyStore::new()), + None, + ); + + let action = create_test_action( + U256::from(9e17), // quote: 0.9 ETH + U256::from(8e17), // amount_required: 0.8 ETH + U256::from(100000), // gas_estimate: 100k gas + false, + None, + ); + + let bids = executor.get_bids_for_order(&action, "test_hash"); + assert_eq!(bids.len(), 1 + 3); // 1 quote-based bid + minimum 3 fallback bids + assert!(bids[0].is_some()); + } + + #[tokio::test] + async fn test_get_bids_for_order_large_quote() { + let executor = Public1559Executor::new( + Arc::new(DynProvider::new(MockProvider)), + Arc::new(DynProvider::new(MockProvider)), + Arc::new(KeyStore::new()), + None, + ); + + let action = create_test_action( + U256::from(1000e18), // quote: 1000 ETH + U256::from(900e18), // amount_required: 900 ETH + U256::from(100000), // gas_estimate: 100k gas + false, + None, + ); + + let bids = executor.get_bids_for_order(&action, "test_hash"); + assert_eq!(bids.len(), 1 + 3 + 4); // 1 quote-based bid + 3 fallback bids + 4 additional fallback bids + } + + #[tokio::test] + async fn test_get_bids_for_order_exact_output() { + let executor = Public1559Executor::new( + Arc::new(DynProvider::new(MockProvider)), + Arc::new(DynProvider::new(MockProvider)), + Arc::new(KeyStore::new()), + None, + ); + + let action = create_test_action( + U256::from(7e18), // quote: 7 ETH + U256::from(8e18), // amount_required: 8 ETH + U256::from(1000), // gas_estimate: 1k gas + true, + None, + ); + + let bids = executor.get_bids_for_order(&action, "test_hash"); + assert_eq!(bids.len(), 1 + 3 + 1); // 1 quote-based bid + minimum 3 fallback bids + 1 additional fallback bid + assert!(bids[0].is_some()); + } + + #[tokio::test] + async fn test_get_bids_for_order_no_gas_estimate() { + let executor = Public1559Executor::new( + Arc::new(DynProvider::new(MockProvider)), + Arc::new(DynProvider::new(MockProvider)), + Arc::new(KeyStore::new()), + None, + ); + + let action = create_test_action( + U256::from(2e17), // quote: 0.2 ETH + U256::from(1e17), // amount_required: 0.1 ETH + U256::from(0), // gas_estimate: 0 gas + false, + None, + ); + + let bids = executor.get_bids_for_order(&action, "test_hash"); + assert_eq!(bids.len(), 3); // 3 fallback bids should still be generated + } + + #[tokio::test] + async fn test_get_bids_for_order_too_little_quote() { + let executor = Public1559Executor::new( + Arc::new(DynProvider::new(MockProvider)), + Arc::new(DynProvider::new(MockProvider)), + Arc::new(KeyStore::new()), + None, + ); + + let action = create_test_action( + U256::from(2e17), // quote: 0.2 ETH + U256::from(3e17), // amount_required: 0.3 ETH + U256::from(10000), // gas_estimate: 10000 gas + false, + None, + ); + + let bids = executor.get_bids_for_order(&action, "test_hash"); + assert_eq!(bids.len(), 0); + } + + #[tokio::test] + async fn test_extremely_large_quote() { + let executor = Public1559Executor::new( + Arc::new(DynProvider::new(MockProvider)), + Arc::new(DynProvider::new(MockProvider)), + Arc::new(KeyStore::new()), + None, + ); + + let action = create_test_action( + U256::from(1e30), // quote: 1e12 ETH + U256::from(1e29), // amount_required: 1e11 ETH + U256::from(100000), // gas_estimate: 100k gas + false, + None, + ); - Ok(()) + let bids = executor.get_bids_for_order(&action, "test_hash"); + assert_eq!(bids.len(), 1 + 8); // 1 quote-based bid + max of 8 additional fallback bids (based on underflow check) } } \ No newline at end of file diff --git a/src/executors/reactor_error_code.rs b/src/executors/reactor_error_code.rs index 547a61e..6d897ff 100644 --- a/src/executors/reactor_error_code.rs +++ b/src/executors/reactor_error_code.rs @@ -14,6 +14,8 @@ pub enum ReactorErrorCode { OrderAlreadyFilled, InsufficientETH, InsufficientToken, + NativeTransferFailed, + AllowanceExpired, Unknown, } @@ -22,12 +24,21 @@ impl From for ReactorErrorCode { fn from(s: String) -> Self { // Remove quotes and whitespace before matching let cleaned = s.trim().trim_matches('\"'); - match cleaned { + // Take first 10 chars (including 0x) if longer + let code = if cleaned.len() > 10 { + &cleaned[..10] + } else { + cleaned + }; + + match code { "0xc6035520" => ReactorErrorCode::OrderNotFillable, "0xee3b3d4b" => ReactorErrorCode::OrderAlreadyFilled, "0x769d11e4" => ReactorErrorCode::InvalidDeadline, "0x6a12f104" => ReactorErrorCode::InsufficientETH, "0x675cae38" => ReactorErrorCode::InsufficientToken, + "0xf4b3b1bc" => ReactorErrorCode::NativeTransferFailed, + "0xd81b2f2e" => ReactorErrorCode::AllowanceExpired, _ => ReactorErrorCode::Unknown, } } @@ -41,6 +52,8 @@ impl std::fmt::Display for ReactorErrorCode { ReactorErrorCode::OrderAlreadyFilled => "OrderAlreadyFilled", ReactorErrorCode::InsufficientETH => "InsufficientETH", ReactorErrorCode::InsufficientToken => "InsufficientToken", + ReactorErrorCode::NativeTransferFailed => "NativeTransferFailed", + ReactorErrorCode::AllowanceExpired => "AllowanceExpired", ReactorErrorCode::Unknown => "Unknown", }; write!(f, "{}", s) diff --git a/src/main.rs b/src/main.rs index 797ebad..b310bba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,8 +69,19 @@ pub struct Args { pub aws_secret_arn: Option, /// Percentage of profit to pay in gas. - #[arg(long, required = true)] - pub bid_percentage: u128, + #[arg(long, required = false)] + pub bid_percentage: Option, + + /// Determines how aggressive to scale the fallback bids + /// 100 (default) = 1% of the profit + #[arg(long, required = false)] + pub fallback_bid_scale_factor: Option, + + /// Minimum block percentage buffer for priority orders. + /// This determines how much time to wait before the target block to submit the fill transaction. + /// Example: 120 = 120% of the block time which would be 2.4 seconds with a block time of 2 seconds. + #[arg(long, required = false)] + pub min_block_percentage_buffer: Option, /// Private key for sending txs. #[arg(long, required = true)] @@ -233,6 +244,8 @@ async fn main() -> Result<()> { let config = Config { bid_percentage: args.bid_percentage, + fallback_bid_scale_factor: args.fallback_bid_scale_factor, + min_block_percentage_buffer: args.min_block_percentage_buffer, executor_address: args.executor_address, }; diff --git a/src/shared.rs b/src/shared.rs index 742aa89..f307089 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -1,3 +1,7 @@ +use std::sync::Arc; + +use alloy::{network::AnyNetwork, providers::{DynProvider, Provider}}; +use alloy_primitives::Address; use serde::Deserialize; macro_rules! send_metric_with_order_hash { @@ -11,7 +15,14 @@ macro_rules! send_metric_with_order_hash { }; } +macro_rules! u256 { + ($($limb:expr),*) => { + alloy_primitives::Uint::from_limbs([$($limb, 0, 0, 0),*]) + }; +} + pub(crate) use send_metric_with_order_hash; +pub(crate) use u256; #[derive(Clone, Debug, Deserialize)] #[serde(tag = "type")] @@ -35,3 +46,30 @@ pub struct RouteInfo { #[serde(rename = "methodParameters")] pub method_parameters: MethodParameters, } + + +pub async fn get_nonce_with_retry( + sender_client: &Arc>, + address: Address, + order_hash: &str, + max_attempts: u32, +) -> Result { + let mut attempts = 0; + loop { + match sender_client.get_transaction_count(address).await { + Ok(nonce) => break Ok(nonce), + Err(e) => { + if attempts < max_attempts - 1 { + attempts += 1; + } else { + return Err(anyhow::anyhow!( + "{} - Failed to get nonce after {} attempts: {}", + order_hash, + max_attempts, + e + )); + } + } + } + } +} \ No newline at end of file diff --git a/src/strategies/dutchv3_strategy.rs b/src/strategies/dutchv3_strategy.rs index a54e411..c306df3 100644 --- a/src/strategies/dutchv3_strategy.rs +++ b/src/strategies/dutchv3_strategy.rs @@ -79,7 +79,9 @@ impl UniswapXDutchV3Fill { Self { client, executor_address: config.executor_address, - bid_percentage: config.bid_percentage, + bid_percentage: config + .bid_percentage + .expect("Config missing bid_percentage: cannot initialize UniswapXDutchV3Fill"), last_block_number: 0, last_block_timestamp: 0, open_orders: HashMap::new(), @@ -164,7 +166,7 @@ impl UniswapXDutchV3Fill { let OrderBatchData { orders, - amount_out_required, + amount_required, .. } = &event.request; @@ -178,12 +180,14 @@ impl UniswapXDutchV3Fill { return vec![]; } + let amount_required_u256 = U256::from_str_radix(&amount_required.to_string(), 10).ok(); + info!("Quote: {:?}, Amount required: {:?}", event.route.quote_gas_adjusted, amount_required_u256); if let Some(profit) = self.get_profit_eth(event) { info!( "Sending trade: num trades: {} routed quote: {}, batch needs: {}, profit: {} wei", filtered_orders.len(), event.route.quote_gas_adjusted, - amount_out_required, + amount_required.to_string(), profit ); let signed_orders = self @@ -206,10 +210,22 @@ impl UniswapXDutchV3Fill { // Must be able to cover min gas cost let sender_address = Address::from_str(&self.sender_address).unwrap(); req.set_from(sender_address); - let gas_usage = self.client.estimate_gas(&req).await.unwrap_or_else(|err| { - info!("Error estimating gas: {}", err); - 1_000_000 - }); + let gas_usage = self.client.estimate_gas(&req).await.map_or_else( + |err| { + info!("Error estimating gas: {}", err); + if err.to_string().contains("execution reverted") { + None + } else { + Some(1_000_000) + } + }, + Some, + ); + + if gas_usage.is_none() { + return vec![]; + } + let gas_usage = gas_usage.unwrap(); // Get the current min gas price let min_gas_price = self .get_arbitrum_min_gas_price(self.client.clone()) @@ -219,7 +235,7 @@ impl UniswapXDutchV3Fill { // gas price at which we'd break even, meaning 100% of profit goes to validator let breakeven_gas_price = profit / U256::from(gas_usage); // gas price corresponding to bid percentage - let bid_gas_price = breakeven_gas_price + let bid_gas_price: Uint<256, 4> = breakeven_gas_price .mul(U256::from(self.bid_percentage)) .div(U256::from(100)); if bid_gas_price < min_gas_price { @@ -294,7 +310,7 @@ impl UniswapXDutchV3Fill { fn get_order_batches(&self) -> HashMap { let mut order_batches: HashMap = HashMap::new(); - // group orders by token in and token out + // group orders by token in, token out, and order type (exact_in or exact_out) self.open_orders .iter() .filter(|(_, order_data)| !self.processing_orders.contains(&order_data.hash)) @@ -311,6 +327,11 @@ impl UniswapXDutchV3Fill { .iter() .fold(Uint::from(0), |sum, output| sum.wrapping_add(output.amount)); + let amount_required = if order_data.order.is_exact_output() { + amount_in + } else { + amount_out + }; // insert new order and update total amount out if let std::collections::hash_map::Entry::Vacant(e) = order_batches.entry(token_in_token_out.clone()) @@ -318,7 +339,8 @@ impl UniswapXDutchV3Fill { e.insert(OrderBatchData { orders: vec![order_data.clone()], amount_in, - amount_out_required: amount_out, + amount_out, + amount_required, token_in: order_data.resolved.input.token.clone(), token_out: order_data.resolved.outputs[0].token.clone(), chain_id: self.chain_id, @@ -327,11 +349,12 @@ impl UniswapXDutchV3Fill { let order_batch_data = order_batches.get_mut(&token_in_token_out).unwrap(); order_batch_data.orders.push(order_data.clone()); order_batch_data.amount_in = order_batch_data.amount_in.wrapping_add(amount_in); - order_batch_data.amount_out_required = order_batch_data - .amount_out_required - .wrapping_add(amount_out); + order_batch_data.amount_required = order_batch_data + .amount_required + .wrapping_add(amount_required); } }); + order_batches } diff --git a/src/strategies/priority_strategy.rs b/src/strategies/priority_strategy.rs index 38c9c23..4ca00ac 100644 --- a/src/strategies/priority_strategy.rs +++ b/src/strategies/priority_strategy.rs @@ -36,7 +36,7 @@ use tokio::sync::{ RwLock, }; use tracing::{debug, error, info, warn}; -use uniswapx_rs::order::{Order, OrderResolution, PriorityOrder, MPS}; +use uniswapx_rs::order::{Order, OrderResolution, PriorityOrder, BPS, MPS}; use super::types::{Action, Event}; @@ -57,42 +57,102 @@ fn get_block_time_ms(chain_id: u64) -> u64 { pub struct ExecutionMetadata { // amount of quote token we can get pub quote: U256, + pub quote_eth: Option, + // whether the order is an exact output order + pub exact_output: bool, // amount of quote token needed to fill the order - pub amount_out_required: U256, + pub amount_required: U256, + pub gas_use_estimate_quote: U256, pub order_hash: String, pub target_block: Option, + pub fallback_bid_scale_factor: Option, } impl ExecutionMetadata { pub fn new( quote: U256, - amount_out_required: U256, + quote_eth: Option, + exact_output: bool, + amount_required: U256, + gas_use_estimate_quote: U256, order_hash: &str, target_block: Option, + fallback_bid_scale_factor: Option, ) -> Self { Self { quote, - amount_out_required, + quote_eth, + exact_output, + amount_required, + gas_use_estimate_quote, order_hash: order_hash.to_owned(), target_block, + fallback_bid_scale_factor, } } - pub fn calculate_priority_fee(&self, bid_percentage: U128) -> Option { - if self.quote.le(&self.amount_out_required) { + pub fn calculate_priority_fee(&self, bid_bps: U128) -> Option { + // exact_out: quote must be less than amount_in_required + // exact_in: quote must be greater than amount_out_required + if (self.exact_output && self.quote.ge(&self.amount_required)) || + (!self.exact_output && self.quote.le(&self.amount_required)) { + info!("{} - quote is not less than amount_required, skipping", self.order_hash); return None; } - let profit_quote = self.quote.saturating_sub(self.amount_out_required); + // exact_out: profit = amount_in_required - quote + // exact_in: profit = quote - amount_out_required + let profit_quote = if self.exact_output { + self.amount_required.saturating_sub(self.quote) + } else { + self.quote.saturating_sub(self.amount_required) + }; + info!("{} - amount_required: {:?}", self.order_hash, self.amount_required); + info!("{} - quote: {:?}", self.order_hash, self.quote); + info!("{} - profit_quote: {:?}", self.order_hash, profit_quote); + info!("{} - bid_bps: {:?}", self.order_hash, bid_bps); let mps_of_improvement = profit_quote .saturating_mul(U256::from(MPS)) - .checked_div(self.amount_out_required)?; + .checked_div(self.amount_required)?; + info!("{} - mps_of_improvement: {:?}", self.order_hash, mps_of_improvement); let priority_fee = mps_of_improvement - .checked_mul(U256::from(bid_percentage))? - .checked_div(U256::from(100))?; + .checked_mul(U256::from(bid_bps))? + .checked_div(U256::from(BPS))?; + info!("{} - priority_fee: {:?}", self.order_hash, priority_fee); Some(priority_fee) } + + // Uses the gas_use_estimate_quote to calculate the maximum priority fee we can bid + // @param gas_buffer: The buffer to multiply the gas use estimate by + pub fn calculate_priority_fee_from_gas_use_estimate(&self, gas_buffer: U256) -> Option { + let gas_with_buffer = U256::from(self.gas_use_estimate_quote).checked_mul(gas_buffer)?; + + // exact_out: quote must be less than amount_in_required - gas_with_buffer + // exact_in: quote must be greater than amount_out_required + gas_with_buffer + if (self.exact_output && self.quote.ge(&self.amount_required.checked_sub(gas_with_buffer)?)) || + (!self.exact_output && self.quote.le(&self.amount_required.checked_add(gas_with_buffer)?)) { + return None; + } + + // exact_out: profit = amount_in_required - gas - quote + // exact_in: profit = quote - gas - amount_out_required + let profit_quote = if self.exact_output { + self.amount_required + .saturating_sub(gas_with_buffer) + .saturating_sub(self.quote) + } else { + self.quote + .saturating_sub(gas_with_buffer) + .saturating_sub(self.amount_required) + }; + + let mps_of_improvement = profit_quote + .saturating_mul(U256::from(MPS)) + .checked_div(self.amount_required)?; + + Some(mps_of_improvement) + } } /// Strategy for filling UniswapX Priority Orders @@ -112,8 +172,8 @@ pub struct UniswapXPriorityFill { cloudwatch_client: Option>, /// executor address executor_address: String, - /// Amount of profits to bid in gas - bid_percentage: u128, + min_block_percentage_buffer: Option, + fallback_bid_scale_factor: Option, last_block_number: RwLock, last_block_timestamp: RwLock, // map of new order hashes to order data @@ -142,7 +202,8 @@ impl UniswapXPriorityFill { client, cloudwatch_client, executor_address: config.executor_address, - bid_percentage: config.bid_percentage, + min_block_percentage_buffer: config.min_block_percentage_buffer, + fallback_bid_scale_factor: config.fallback_bid_scale_factor, last_block_number: RwLock::new(0), last_block_timestamp: RwLock::new(0), new_orders: Arc::new(DashMap::new()), @@ -209,6 +270,7 @@ impl UniswapXPriorityFill { *self.last_block_timestamp.read().await, get_block_time_ms(self.chain_id), Uint::from(0), + self.min_block_percentage_buffer.unwrap_or(100) ); let order_status = match resolved_order { OrderResolution::Expired | OrderResolution::Invalid => OrderStatus::Done, @@ -270,9 +332,10 @@ impl UniswapXPriorityFill { encoded_order: None, route: event.route.clone(), }; + info!("{} - Received {} order", order_hash, if order_data.order.is_exact_output() { "exact_out" } else { "exact_in" }); if let Some(route) = &order_data.route { if !route.method_parameters.calldata.is_empty() { - info!("{} - Received cached route for order", order_hash); + info!("{} - Received cached route for order with quote: {}", order_hash, route.quote); } } self.new_orders.insert(order_hash.clone(), order_data.clone()); @@ -415,16 +478,18 @@ impl UniswapXPriorityFill { fn get_order_batch(&self, order_data: &OrderData) -> OrderBatchData { let amount_in: Uint<256, 4> = order_data.resolved.input.amount; + info!("{} - outputs: {:?}", order_data.hash, order_data.resolved.outputs); let amount_out = order_data .resolved .outputs .iter() .fold(Uint::from(0), |sum, output| sum.wrapping_add(output.amount)); - + info!("{} - amount_out: {:?}", order_data.hash, amount_out); OrderBatchData { orders: vec![order_data.clone()], amount_in, - amount_out_required: amount_out, + amount_out, + amount_required: if order_data.order.is_exact_output() { amount_in } else { amount_out }, token_in: order_data.resolved.input.token.clone(), token_out: order_data.resolved.outputs[0].token.clone(), chain_id: self.chain_id, @@ -466,27 +531,22 @@ impl UniswapXPriorityFill { /// - we return the data needed to calculate the maximum MPS of improvement we can offer from our quote and the order specs fn get_execution_metadata( &self, - RoutedOrder { - request, - route, - target_block, - .. - }: &RoutedOrder, + routed_order: &RoutedOrder, ) -> Option { - let quote = U256::from_str_radix(&route.quote, 10).ok()?; - let amount_out_required = - U256::from_str_radix(&request.amount_out_required.to_string(), 10).ok()?; - if quote.le(&amount_out_required) { - info!("{} - Quote is less than amount out required", request.orders[0].hash); - return None; - } - + let quote = U256::from_str_radix(&routed_order.route.quote, 10).ok()?; + let amount_required = + U256::from_str_radix(&routed_order.request.amount_required.to_string(), 10).ok()?; + info!("{} - quote_eth: {:?}", routed_order.request.orders[0].hash, self.get_quote_eth(&routed_order)); Some({ ExecutionMetadata { quote, - amount_out_required, - order_hash: request.orders[0].hash.clone(), - target_block: target_block.map(|b| U64::from(b)), + quote_eth: self.get_quote_eth(&routed_order), + exact_output: routed_order.request.orders[0].order.is_exact_output(), + amount_required, + gas_use_estimate_quote: U256::from_str_radix(&routed_order.route.gas_use_estimate_quote, 10).ok()?, + order_hash: routed_order.request.orders[0].hash.clone(), + target_block: routed_order.target_block.map(|b| U64::from(b)), + fallback_bid_scale_factor: self.fallback_bid_scale_factor.clone(), } }) } @@ -651,16 +711,20 @@ impl UniswapXPriorityFill { self.processing_orders.insert(order_hash.clone(), order_data.value().clone()); } + // If EXACT_OUT, quote should be less than amount_required + let quote = U256::from_str_radix(&order_data.route.as_ref().unwrap().quote, 10).unwrap(); + if order_data.order.is_exact_output() && quote.ge(&order_data.resolved.input.amount) { + info!("{} - Quote indicates more input than swapper is willing to give, skipping", order_hash); + continue; + } + // If EXACT_IN, quote should be greater than amount_required + else if !order_data.order.is_exact_output() && quote.le(&order_data.resolved.outputs[0].amount) { + info!("{} - Quote indicates less output than swapper is willing to receive, skipping", order_hash); + continue; + } + let routed_order = RoutedOrder { - request: OrderBatchData { - orders: vec![order_data.value().clone()], - amount_in: order_data.resolved.input.amount, - amount_out_required: order_data.resolved.outputs.iter() - .fold(Uint::from(0), |sum, output| sum.wrapping_add(output.amount)), - token_in: order_data.resolved.input.token.clone(), - token_out: order_data.resolved.outputs[0].token.clone(), - chain_id: self.chain_id, - }, + request: self.get_order_batch(order_data.value()), route: OrderRoute { quote: order_data.route.as_ref().unwrap().quote.clone(), quote_gas_adjusted: order_data.route.as_ref().unwrap().quote_gas_adjusted.clone(), @@ -694,8 +758,8 @@ impl UniswapXPriorityFill { execution: SubmitTxToMempool { tx: fill_tx_request.clone(), gas_bid_info: Some(GasBidInfo { - bid_percentage: U128::from(self.bid_percentage), - // this field is not used for priority orders + // these fields are not used for priority orders + bid_percentage: U128::from(0), total_profit: U128::from(0), }), }, @@ -734,3 +798,383 @@ impl UniswapXPriorityFill { actions } } + +#[cfg(test)] +mod tests { + use super::*; + + + #[test] + fn test_calculate_priority_fee_exact_in() { + // Test case 1: Normal case with profit + let metadata = ExecutionMetadata::new( + U256::from(1000), // quote + Some(U256::from(1000)), // quote_eth + false, + U256::from(800), // amount_out_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let bid_bps = U128::from(5000); // 50% + let result = metadata.calculate_priority_fee(bid_bps); + assert!(result.is_some()); + // profit = 200 + // mps_improvement = (200 * MPS) / 800 = 2_500_000 + // priority_fee = (2_500_000 * 5000) / MPS = 1_250_000 + assert_eq!(result.unwrap(), U256::from(1_250_000)); + + // Test case 2: Quote equals amount required (no profit) + let metadata = ExecutionMetadata::new( + U256::from(800), // quote equals amount_out_required + Some(U256::from(800)), // quote_eth + false, + U256::from(800), // amount_out_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let result = metadata.calculate_priority_fee(bid_bps); + assert!(result.is_none()); + + // Test case 3: Quote less than amount required + let metadata = ExecutionMetadata::new( + U256::from(700), // quote less than amount_out_required + Some(U256::from(700)), // quote_eth + false, + U256::from(800), // amount_out_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let result = metadata.calculate_priority_fee(bid_bps); + assert!(result.is_none()); + + // Test case 4: Minimal profit case + let metadata = ExecutionMetadata::new( + U256::from(801), // quote just above amount_out_required + Some(U256::from(801)), // quote_eth + false, + U256::from(800), // amount_out_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let result = metadata.calculate_priority_fee(bid_bps); + assert!(result.is_some()); + // profit = 1 + // mps_improvement = (1 * MPS) / 800 = 12,500 + // priority_fee = (12,500 * 5000) / MPS = 6,250 + assert_eq!(result.unwrap(), U256::from(6_250)); + + // Test case 5: Zero bid_bps + let metadata = ExecutionMetadata::new( + U256::from(1000), // quote + Some(U256::from(1000)), // quote_eth + false, + U256::from(800), // amount_out_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let zero_bid_bps = U128::from(0); + let result = metadata.calculate_priority_fee(zero_bid_bps); + assert!(result.is_some()); + assert_eq!(result.unwrap(), U256::from(0)); + } + + + #[test] + fn test_calculate_priority_fee_exact_out() { + // Test case 1: Normal case with profit + let metadata = ExecutionMetadata::new( + U256::from(800), // quote + Some(U256::from(800)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let bid_bps = U128::from(5000); // 50% + let result = metadata.calculate_priority_fee(bid_bps); + assert!(result.is_some()); + // profit = 200 + // mps_improvement = (200 * MPS) / 1000 = 2_000_000 + // priority_fee = (2_000_000 * 5000) / BPS = 1_000_000 + assert_eq!(result.unwrap(), U256::from(1_000_000)); + + // Test case 2: Quote equals amount required (no profit) + let metadata = ExecutionMetadata::new( + U256::from(1000), // quote equals amount_in_required + Some(U256::from(1000)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None + ); + + let result = metadata.calculate_priority_fee(bid_bps); + assert!(result.is_none()); + + // Test case 3: Quote greater than amount required + let metadata = ExecutionMetadata::new( + U256::from(1100), // quote greater than amount_in_required + Some(U256::from(1100)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let result = metadata.calculate_priority_fee(bid_bps); + assert!(result.is_none()); + + // Test case 4: Minimal profit case + let metadata = ExecutionMetadata::new( + U256::from(999), // quote just below amount_in_required + Some(U256::from(999)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let result = metadata.calculate_priority_fee(bid_bps); + assert!(result.is_some()); + // profit = 1 + // mps_improvement = (1 * MPS) / 1000 = 10,000 + // priority_fee = (10,000 * 5000) / BPS = 5,000 + assert_eq!(result.unwrap(), U256::from(5_000)); + + // Test case 5: Zero bid_bps + let metadata = ExecutionMetadata::new( + U256::from(800), // quote + Some(U256::from(800)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + let zero_bid_bps = U128::from(0); + let result = metadata.calculate_priority_fee(zero_bid_bps); + assert!(result.is_some()); + assert_eq!(result.unwrap(), U256::from(0)); + } + + #[test] + fn test_calculate_priority_fee_from_gas_use_estimate_exact_in() { + // Test case 1: Normal case with profit + let metadata = ExecutionMetadata::new( + U256::from(1000), // quote + Some(U256::from(1000)), // quote_eth + false, + U256::from(800), // amount_out_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 2x + let mut gas_buffer = U256::from(2); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_some()); + // profit = 100, MPS = 10_000_000 + // bid: (100 * 10_000_000) / 800 = 1_250_000 + assert_eq!(result.unwrap(), U256::from(1_250_000)); + + // Test case 2: No profit after gas and required amount + let metadata = ExecutionMetadata::new( + U256::from(1000), // quote + Some(U256::from(1000)), // quote_eth + false, + U256::from(100), // amount_out_required + U256::from(1000000000), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 1x + gas_buffer = U256::from(1); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_none()); + + + // Test case 3: 1 profit after gas and required amount + let metadata = ExecutionMetadata::new( + U256::from(1000), // quote + Some(U256::from(1000)), // quote_eth + false, + U256::from(900), // amount_out_required + U256::from(99), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 1x + gas_buffer = U256::from(1); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_some()); + // profit = 1, MPS = 10_000_000 + // bid: (1 * 10_000_000) / 900 = 11,111 + assert_eq!(result.unwrap(), U256::from(11_111)); + + // Test case 4: Quote less than required amount plus gas + let metadata = ExecutionMetadata::new( + U256::from(1000), // quote + Some(U256::from(1000)), // quote_eth + false, + U256::from(900), // amount_out_required + U256::from(200), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 1x (900 + 200 > 1000) + gas_buffer = U256::from(1); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_none()); + + // Test case 5: Edge case with zero gas estimate + let metadata = ExecutionMetadata::new( + U256::from(1000), // quote + Some(U256::from(1000)), // quote_eth + false, + U256::from(800), // amount_out_required + U256::from(0), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 2x + gas_buffer = U256::from(2); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_some()); + // profit = 2000, MPS = 10_000_000 + // bid: (2000 * 10_000_000) / 800 = 2_500_000 + assert_eq!(result.unwrap(), U256::from(2_500_000)); + } + + #[test] + fn test_calculate_priority_fee_from_gas_use_estimate_exact_out() { + // Test case 1: Normal case with profit + let metadata = ExecutionMetadata::new( + U256::from(800), // quote + Some(U256::from(800)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(50), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 2x + let mut gas_buffer = U256::from(2); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_some()); + // profit = 100 + // bid: (100 * MPS) / 1000 = 1_000_000 + assert_eq!(result.unwrap(), U256::from(1_000_000)); + + // Test case 2: No profit after gas and required amount + let metadata = ExecutionMetadata::new( + U256::from(999), // quote just below required + Some(U256::from(999)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(2), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 1x (999 > 1000 - 2) + gas_buffer = U256::from(1); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_none()); + + // Test case 3: 1 wei profit after gas and required amount + let metadata = ExecutionMetadata::new( + U256::from(900), // quote + Some(U256::from(900)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(99), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 1x (900 < 1000 - 99) + gas_buffer = U256::from(1); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_some()); + // profit = 1 + // bid: (1 * MPS) / 1000 = 10,000 + assert_eq!(result.unwrap(), U256::from(10_000)); + + // Test case 4: Quote greater than required amount minus gas + let metadata = ExecutionMetadata::new( + U256::from(900), // quote + Some(U256::from(900)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(200), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 1x (900 > 1000 - 200) + gas_buffer = U256::from(1); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_none()); + + // Test case 5: Edge case with zero gas estimate + let metadata = ExecutionMetadata::new( + U256::from(800), // quote + Some(U256::from(800)), // quote_eth + true, // exact_output = true + U256::from(1000), // amount_in_required + U256::from(0), // gas_use_estimate_quote + "test_hash", + None, + None, + ); + + // With gas buffer of 2x + gas_buffer = U256::from(2); + let result = metadata.calculate_priority_fee_from_gas_use_estimate(gas_buffer); + assert!(result.is_some()); + // profit = 200 + // bid: (200 * MPS) / 1000 = 2_000_000 + assert_eq!(result.unwrap(), U256::from(2_000_000)); + } +} diff --git a/src/strategies/shared.rs b/src/strategies/shared.rs index ec2e306..4bf89da 100644 --- a/src/strategies/shared.rs +++ b/src/strategies/shared.rs @@ -114,12 +114,23 @@ pub trait UniswapXStrategy { fn get_profit_eth(&self, RoutedOrder { request, route, .. }: &RoutedOrder) -> Option { let quote = U256::from_str_radix(&route.quote, 10).ok()?; - let amount_out_required = - U256::from_str_radix(&request.amount_out_required.to_string(), 10).ok()?; - if quote.le(&amount_out_required) { - return None; - } - let profit_quote = quote.saturating_sub(amount_out_required); + let amount_required = + U256::from_str_radix(&request.amount_required.to_string(), 10).ok()?; + + // exact_out: quote must be less than amount_in_required + // exact_in: quote must be greater than amount_out_required + if (request.orders.first().unwrap().order.is_exact_output() && quote.ge(&amount_required)) || + (!request.orders.first().unwrap().order.is_exact_output() && quote.le(&amount_required)) { + return None; + } + + // exact_out: profit = amount_in_required - quote + // exact_in: profit = quote - amount_out_required + let profit_quote = if request.orders.first().unwrap().order.is_exact_output() { + amount_required.saturating_sub(quote) + } else { + quote.saturating_sub(amount_required) + }; if request.token_out.to_lowercase() == WETH_ADDRESS.to_lowercase() { return Some(profit_quote); @@ -133,6 +144,35 @@ pub trait UniswapXStrategy { .checked_div(U256::from_str_radix(&route.gas_use_estimate_quote, 10).ok()?) } + /// Converts the quote amount to ETH equivalent value + /// + /// For WETH output tokens, returns the quote directly since it's already in ETH. + /// For non-WETH output tokens, converts using the following formula: + /// quote_eth = quote * gas_wei / gas_in_quote + /// + /// # Arguments + /// * `request` - The order request containing token information + /// * `route` - The route containing quote and gas estimates + /// + /// # Returns + /// * `Some(U256)` - The quote value in ETH + /// * `None` - If any conversion fails or division by zero would occur + fn get_quote_eth(&self, RoutedOrder { request, route, .. }: &RoutedOrder) -> Option { + let quote = U256::from_str_radix(&route.quote, 10).ok()?; + + // If output token is WETH, quote is already in ETH + if request.token_out.to_lowercase() == WETH_ADDRESS.to_lowercase() { + return Some(quote); + } + + let gas_use_eth = U256::from_str_radix(&route.gas_use_estimate, 10) + .ok()? + .saturating_mul(U256::from_str_radix(&route.gas_price_wei, 10).ok()?); + quote + .saturating_mul(gas_use_eth) + .checked_div(U256::from_str_radix(&route.gas_use_estimate_quote, 10).ok()?) + } + /// Get the minimum gas price on Arbitrum /// https://docs.arbitrum.io/build-decentralized-apps/precompiles/reference#arbgasinfo async fn get_arbitrum_min_gas_price( diff --git a/src/strategies/types.rs b/src/strategies/types.rs index 6352a9f..446a0d2 100644 --- a/src/strategies/types.rs +++ b/src/strategies/types.rs @@ -31,8 +31,10 @@ pub enum Action { /// Configuration for variables we need to pass to the strategy. #[derive(Debug, Clone)] pub struct Config { - pub bid_percentage: u128, + pub bid_percentage: Option, pub executor_address: String, + pub min_block_percentage_buffer: Option, + pub fallback_bid_scale_factor: Option, } #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] diff --git a/src/strategies/uniswapx_strategy.rs b/src/strategies/uniswapx_strategy.rs index c0721f5..edf6fbf 100644 --- a/src/strategies/uniswapx_strategy.rs +++ b/src/strategies/uniswapx_strategy.rs @@ -74,7 +74,9 @@ impl UniswapXUniswapFill { Self { client, executor_address: config.executor_address, - bid_percentage: config.bid_percentage, + bid_percentage: config + .bid_percentage + .expect("Config missing bid_percentage: cannot initialize UniswapXUniswapFill"), last_block_number: 0, last_block_timestamp: 0, open_orders: HashMap::new(), @@ -149,7 +151,7 @@ impl UniswapXUniswapFill { let OrderBatchData { // orders, orders, - amount_out_required, + amount_required: amount_out_required, .. } = &event.request; @@ -277,6 +279,11 @@ impl UniswapXUniswapFill { .iter() .fold(Uint::from(0), |sum, output| sum.wrapping_add(output.amount)); + let amount_required = if order_data.order.is_exact_output() { + amount_in + } else { + amount_out + }; // insert new order and update total amount out if let std::collections::hash_map::Entry::Vacant(e) = order_batches.entry(token_in_token_out.clone()) @@ -284,7 +291,8 @@ impl UniswapXUniswapFill { e.insert(OrderBatchData { orders: vec![order_data.clone()], amount_in, - amount_out_required: amount_out, + amount_out, + amount_required, token_in: order_data.resolved.input.token.clone(), token_out: order_data.resolved.outputs[0].token.clone(), chain_id: self.chain_id, @@ -293,9 +301,9 @@ impl UniswapXUniswapFill { let order_batch_data = order_batches.get_mut(&token_in_token_out).unwrap(); order_batch_data.orders.push(order_data.clone()); order_batch_data.amount_in = order_batch_data.amount_in.wrapping_add(amount_in); - order_batch_data.amount_out_required = order_batch_data - .amount_out_required - .wrapping_add(amount_out); + order_batch_data.amount_required = order_batch_data + .amount_required + .wrapping_add(amount_required); } }); order_batches