Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/aws_utils/cloudwatch_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub const ROUTING_MS: &str = "RoutingMs";
pub const TX_SUCCEEDED_METRIC: &str = "TransactionSucceeded";
pub const TX_REVERTED_METRIC: &str = "TransactionReverted";
pub const TX_SUBMITTED_METRIC: &str = "TransactionSubmitted";
pub const ORDER_RECEIVED_METRIC: &str = "OrderReceived";
pub const ORDER_BID_METRIC: &str = "OrderBid";
pub const ORDER_FILLED_METRIC: &str = "OrderFilled";
pub const TX_STATUS_UNKNOWN_METRIC: &str = "TransactionStatusUnknown";
pub const LATEST_BLOCK: &str = "LatestBlock";
pub const EXECUTION_ATTEMPTED_METRIC: &str = "ExecutionAttempted";
Expand Down Expand Up @@ -86,6 +89,9 @@ pub enum CwMetrics {
TxSucceeded(u64),
TxReverted(u64),
TxSubmitted(u64),
OrderReceived(u64),
OrderBid(u64),
OrderFilled(u64),
TxStatusUnknown(u64),
LatestBlock(u64),
RevertCode(u64, String), // chain_id and revert code string
Expand All @@ -112,6 +118,9 @@ impl From<CwMetrics> for String {
CwMetrics::TxSucceeded(chain_id) => format!("{}-{}", chain_id, TX_SUCCEEDED_METRIC),
CwMetrics::TxReverted(chain_id) => format!("{}-{}", chain_id, TX_REVERTED_METRIC),
CwMetrics::TxSubmitted(chain_id) => format!("{}-{}", chain_id, TX_SUBMITTED_METRIC),
CwMetrics::OrderReceived(chain_id) => format!("{}-{}", chain_id, ORDER_RECEIVED_METRIC),
CwMetrics::OrderBid(chain_id) => format!("{}-{}", chain_id, ORDER_BID_METRIC),
CwMetrics::OrderFilled(chain_id) => format!("{}-{}", chain_id, ORDER_FILLED_METRIC),
CwMetrics::TxStatusUnknown(chain_id) => {
format!("{}-{}", chain_id, TX_STATUS_UNKNOWN_METRIC)
}
Expand Down
40 changes: 30 additions & 10 deletions src/executors/dutch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,20 @@ impl Executor<SubmitTxToMempool> for DutchExecutor {
.to_string()
.parse::<u64>()
.unwrap();
let metric_future = build_metric_future(
self.cloudwatch_client.clone(),
DimensionValue::V3Executor,
CwMetrics::TxSubmitted(chain_id),
1.0,
);
if let Some(metric_future) = metric_future {
// do not block current thread by awaiting in the background
send_metric!(metric_future);
}

let send_metric_if_some = |metric| {
if let Some(metric_future) = build_metric_future(
self.cloudwatch_client.clone(),
DimensionValue::V3Executor,
metric,
1.0,
) {
send_metric!(metric_future);
}
};

send_metric_if_some(CwMetrics::OrderBid(chain_id));
send_metric_if_some(CwMetrics::TxSubmitted(chain_id));

let tx_request_for_revert = action.tx.clone();
let tx = action.tx.build(&wallet).await?;
Expand Down Expand Up @@ -230,6 +234,22 @@ impl Executor<SubmitTxToMempool> for DutchExecutor {
}
}
}
else {
let send_metric_if_some = |metric| {
if let Some(metric_future) = build_metric_future(
self.cloudwatch_client.clone(),
DimensionValue::V3Executor,
metric,
1.0,
) {
send_metric!(metric_future);
}
};

send_metric_if_some(CwMetrics::OrderFilled(chain_id));
send_metric_if_some(CwMetrics::TxSucceeded(chain_id));
info!("Transaction succeeded");
}

(Some(receipt), status)
}
Expand Down
51 changes: 44 additions & 7 deletions src/executors/priority_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
build_metric_future, receipt_status_to_metric, revert_code_to_metric, CwMetrics, DimensionValue
},
executors::reactor_error_code::ReactorErrorCode,
shared::{get_nonce_with_retry, send_metric_with_order_hash, u256},
shared::{burn_nonce, get_nonce_with_retry, send_metric_with_order_hash, u256},
strategies::{keystore::KeyStore, types::SubmitTxToMempoolWithExecutionMetadata}
};
use crate::executors::reactor_error_code::get_revert_reason;
Expand All @@ -34,6 +34,7 @@ static GWEI_PER_ETH: U256 = u256!(1_000_000_000);
const QUOTE_ETH_LOG10_THRESHOLD: usize = 8;
// The number of bps to add to the base bid for each fallback bid
const DEFAULT_FALLBACK_BID_SCALE_FACTOR: u64 = 50;
const CONFIRMATION_TIMEOUT: u64 = 30;

/// An executor that sends transactions to the public mempool.
pub struct PriorityExecutor {
Expand Down Expand Up @@ -106,13 +107,18 @@ impl PriorityExecutor {
match result {
Ok(tx) => {
info!("{} - Waiting for confirmations", order_hash);
let receipt = tx
.with_required_confirmations(0)
.get_receipt()
.await
.map_err(|e| {
let receipt = match tokio::time::timeout(
std::time::Duration::from_secs(CONFIRMATION_TIMEOUT),
tx.with_required_confirmations(0).get_receipt()
).await {
Ok(receipt_result) => receipt_result.map_err(|e| {
anyhow::anyhow!("{} - Error waiting for confirmations: {}", order_hash, e)
});
}),
Err(_) => {
warn!("{} - Timed out waiting for transaction receipt", order_hash);
return Ok(TransactionOutcome::Failure(None));
}
};


match receipt {
Expand Down Expand Up @@ -179,6 +185,17 @@ impl PriorityExecutor {
}
Err(e) => {
warn!("{} - Error sending transaction: {}", order_hash, e);
// If the nonce is already used, burn the nonce for the next transaction
if e.to_string().contains("replacement transaction underpriced") {
info!("{} - Nonce already used, burning nonce for next transaction", order_hash);
burn_nonce(
&self.sender_client,
wallet,
tx_request_for_revert.from.unwrap(),
tx_request_for_revert.nonce.unwrap(),
order_hash
).await?;
}
Ok(TransactionOutcome::Failure(None))
}
}
Expand Down Expand Up @@ -356,6 +373,7 @@ impl Executor<SubmitTxToMempoolWithExecutionMetadata> for PriorityExecutor {

// Retry up to 3 times to get the nonce.
let mut nonce = get_nonce_with_retry(&self.client, address, &order_hash, 3).await?;
info!("{} - Nonce: {}", order_hash, nonce);

// Sort transactions by max_priority_fee_per_gas in descending order so that the highest bid is first
tx_requests.sort_by(|a, b| {
Expand All @@ -370,6 +388,15 @@ impl Executor<SubmitTxToMempoolWithExecutionMetadata> for PriorityExecutor {
nonce += 1;
}

let metric_future = build_metric_future(
self.cloudwatch_client.clone(),
DimensionValue::PriorityExecutor,
CwMetrics::OrderBid(chain_id_u64),
1.0,
);
if let Some(metric_future) = metric_future {
send_metric_with_order_hash!(&order_hash, metric_future);
}
info!("{} - Executing {} transactions in parallel from {:?}", order_hash, tx_requests.len(), address);

let mut attempts = 0;
Expand Down Expand Up @@ -406,6 +433,16 @@ impl Executor<SubmitTxToMempoolWithExecutionMetadata> for PriorityExecutor {
Ok(TransactionOutcome::Success(result)) => {
success = true;
block_number = *result;

let metric_future = build_metric_future(
self.cloudwatch_client.clone(),
DimensionValue::PriorityExecutor,
CwMetrics::OrderFilled(chain_id_u64),
1.0,
);
if let Some(metric_future) = metric_future {
send_metric_with_order_hash!(&order_hash, metric_future);
}
info!("{} - Transaction {} succeeded at block {}", order_hash, i, block_number.unwrap());
break;
}
Expand Down
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ async fn main() -> Result<()> {
));
client = Some(wss_provider.clone());
sender_client = Some(wss_provider.clone());
info!("Using WSS provider: {}", wss);
}

// Initialize HTTP provider if specified
if let Some(http_endpoint) = args.http {
info!("Using HTTP provider: {}", http_endpoint);
let http_client = ClientBuilder::default().http(http_endpoint.parse()?);
let http_provider = Arc::new(DynProvider::<AnyNetwork>::new(
ProviderBuilder::new()
Expand Down Expand Up @@ -291,6 +293,7 @@ async fn main() -> Result<()> {
OrderType::DutchV3 => {
let uniswapx_strategy = UniswapXDutchV3Fill::new(
client.clone().unwrap(),
cloudwatch_client.clone(),
config.clone(),
batch_sender,
route_receiver,
Expand Down
77 changes: 75 additions & 2 deletions src/shared.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::sync::Arc;

use alloy::{network::AnyNetwork, providers::{DynProvider, Provider}};
use alloy_primitives::Address;
use alloy::{network::{AnyNetwork, EthereumWallet, TransactionBuilder, ReceiptResponse}, providers::{DynProvider, Provider}, rpc::types::TransactionRequest, serde::WithOtherFields};
use alloy_primitives::{Address, U256};
use serde::Deserialize;

const NONCE_BURN_GAS_MULTIPLIER: u128 = 10;
const NONCE_BURN_PRIORITY_FEE: u128 = 100000; // 0.0001 gwei
const ETH_TRANSFER_GAS: u64 = 21000;

macro_rules! send_metric_with_order_hash {
($order_hash: expr, $future: expr) => {
let hash = Arc::clone($order_hash);
Expand Down Expand Up @@ -72,4 +76,73 @@ pub async fn get_nonce_with_retry(
}
}
}
}

/// @notice Burns a specific nonce by sending a 0 ETH transaction to self with a high gas price.
/// @dev This function is used to invalidate a nonce by creating a dummy transaction.
/// @param provider The Ethereum provider used to send the transaction.
/// @param wallet The wallet used to sign the transaction.
/// @param address The address whose nonce will be burned.
/// @param nonce The specific nonce to burn.
/// @param order_hash A string identifier for logging and tracing purposes.
/// @return Returns Ok(()) if the transaction is sent and confirmed, or an error otherwise.
pub async fn burn_nonce(
provider: &Arc<DynProvider<AnyNetwork>>,
wallet: &EthereumWallet,
address: Address,
nonce: u64,
order_hash: &str,
) -> Result<(), anyhow::Error> {
let base_fee = provider
.get_gas_price()
.await?;

// Create a dummy transaction that sends 0 ETH to self with high gas price
let tx_request = WithOtherFields::new(TransactionRequest {
from: Some(address),
to: Some(address.into()),
value: Some(U256::ZERO),
nonce: Some(nonce),
gas: Some(ETH_TRANSFER_GAS), // Standard ETH transfer gas
gas_price: Some(base_fee * NONCE_BURN_GAS_MULTIPLIER),
max_fee_per_gas: Some(base_fee * NONCE_BURN_GAS_MULTIPLIER),
max_priority_fee_per_gas: Some(NONCE_BURN_PRIORITY_FEE),
..Default::default()
});

// Sign and send the transaction
let tx = tx_request.build(wallet).await?;
let result = provider.send_tx_envelope(tx).await;

match result {
Ok(tx) => {
tracing::info!("{} - Waiting for confirmations", order_hash);
let receipt = tx
.with_required_confirmations(0)
.get_receipt()
.await
.map_err(|e| {
anyhow::anyhow!("{} - Error waiting for confirmations: {}", order_hash, e)
});

match receipt {
Ok(receipt) => {
let status = receipt.status();
tracing::info!(
"{} - Nonce burn: tx_hash: {:?}, status: {}",
order_hash, receipt.transaction_hash, status,
);
Ok(())
}
Err(e) => {
tracing::error!("{} - Error burning nonce: {}", order_hash, e);
return Err(anyhow::anyhow!("{} - Error burning nonce: {}", order_hash, e));
}
}
}
Err(e) => {
tracing::error!("{} - Error sending nonce burn transaction: {}", order_hash, e);
return Err(anyhow::anyhow!("{} - Error sending nonce burn transaction: {}", order_hash, e));
}
}
}
Loading