Skip to content

Commit d63dae0

Browse files
authored
Merge branch 'main' into jemalloc-profiling-support
2 parents d5d5cfe + 476bdff commit d63dae0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+2333
-2461
lines changed

Cargo.lock

Lines changed: 98 additions & 52 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ tower-http = "0.4"
101101
tracing-opentelemetry = "0.31"
102102
tracing-serde = "0.2"
103103
vergen = "8"
104+
walkdir = "2.5.0"
105+
quote = "1.0.41"
106+
syn = "2.0.108"
107+
prettyplease = "0.2.37"
108+
proc-macro2 = "1.0.103"
109+
alloy-sol-macro-input = "1.4.1"
110+
alloy-sol-macro-expander = "1.4.1"
104111

105112
[workspace.lints]
106113
clippy.cast_possible_wrap = "deny"

crates/autopilot/src/boundary/events/settlement.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl AlloyEventRetrieving for GPv2SettlementContract {
2828
Filter::new().address(self.address)
2929
}
3030

31-
fn provider(&self) -> &contracts::alloy::Provider {
31+
fn provider(&self) -> &alloy::providers::DynProvider {
3232
&self.provider
3333
}
3434
}
@@ -75,7 +75,9 @@ impl EventStoring<(GPv2SettlementEvents, Log)> for Indexer {
7575
database::settlements::delete(&mut transaction, from_block).await?;
7676
transaction.commit().await?;
7777

78-
self.settlement_observer.update().await;
78+
self.settlement_observer
79+
.post_process_outstanding_settlement_transactions()
80+
.await;
7981
Ok(())
8082
}
8183

@@ -84,7 +86,9 @@ impl EventStoring<(GPv2SettlementEvents, Log)> for Indexer {
8486
crate::database::events::append_events(&mut transaction, events).await?;
8587
transaction.commit().await?;
8688

87-
self.settlement_observer.update().await;
89+
self.settlement_observer
90+
.post_process_outstanding_settlement_transactions()
91+
.await;
8892
Ok(())
8993
}
9094
}

crates/autopilot/src/database/ethflow_events/event_retriever.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl EthFlowRefundRetriever {
2929
impl AlloyEventRetrieving for EthFlowRefundRetriever {
3030
type Event = CoWSwapEthFlow::CoWSwapEthFlowEvents;
3131

32-
fn provider(&self) -> &contracts::alloy::Provider {
32+
fn provider(&self) -> &alloy::providers::DynProvider {
3333
&self.web3.alloy
3434
}
3535

crates/autopilot/src/database/onchain_order_events/event_retriever.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl AlloyEventRetrieving for CoWSwapOnchainOrdersContract {
4242
]))
4343
}
4444

45-
fn provider(&self) -> &contracts::alloy::Provider {
45+
fn provider(&self) -> &alloy::providers::DynProvider {
4646
&self.web3.alloy
4747
}
4848
}

crates/autopilot/src/database/onchain_order_events/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -600,9 +600,9 @@ fn convert_onchain_order_placement(
600600
// executed fast (we don't want to reserve the user's ETH for too long)
601601
if quote.as_ref().is_ok_and(|quote| {
602602
!order_data.within_market(QuoteAmounts {
603-
sell: quote.sell_amount.into_legacy(),
604-
buy: quote.buy_amount.into_legacy(),
605-
fee: quote.fee_amount.into_legacy(),
603+
sell: quote.sell_amount,
604+
buy: quote.buy_amount,
605+
fee: quote.fee_amount,
606606
})
607607
}) {
608608
tracing::debug!(%order_uid, ?owner, "order is outside market price");
@@ -786,7 +786,7 @@ mod test {
786786
super::*,
787787
crate::database::Config,
788788
alloy::primitives::U256,
789-
contracts::alloy::{CoWSwapOnchainOrders, InstanceExt},
789+
contracts::alloy::CoWSwapOnchainOrders,
790790
database::{byte_array::ByteArray, onchain_broadcasted_orders::OnchainOrderPlacement},
791791
ethcontract::H160,
792792
ethrpc::Web3,

crates/autopilot/src/domain/settlement/observer.rs

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use {
2020
},
2121
anyhow::{Context, Result, anyhow},
2222
ethrpc::alloy::conversions::IntoLegacy,
23+
futures::StreamExt,
24+
rand::Rng,
2325
std::time::Duration,
2426
};
2527

@@ -29,80 +31,68 @@ pub struct Observer {
2931
persistence: infra::Persistence,
3032
}
3133

32-
enum IndexSuccess {
33-
NothingToDo,
34-
IndexedSettlement,
35-
SkippedInvalidTransaction,
36-
}
37-
3834
impl Observer {
3935
/// Creates a new Observer and asynchronously schedules the first update
4036
/// run.
4137
pub fn new(eth: infra::Ethereum, persistence: infra::Persistence) -> Self {
4238
Self { eth, persistence }
4339
}
4440

45-
/// Fetches all the available missing data needed for bookkeeping.
46-
/// This needs to get called after indexing a new settlement event
47-
/// since this code needs that data to already be present in the DB.
48-
pub async fn update(&self) {
49-
const MAX_RETRIES: usize = 5;
50-
let mut attempts = 0;
51-
while attempts < MAX_RETRIES {
52-
match self.single_update().await {
53-
Ok(IndexSuccess::IndexedSettlement) => {
54-
tracing::debug!("on settlement event updater ran and processed event");
55-
}
56-
Ok(IndexSuccess::SkippedInvalidTransaction) => {
57-
tracing::warn!("stored default values for unindexable transaction");
58-
}
59-
Ok(IndexSuccess::NothingToDo) => {
60-
tracing::debug!("on settlement event updater ran without update");
41+
/// Post processes all outstanding settlements. This involves decoding the
42+
/// settlement details from the transaction and associating it with a
43+
/// solution proposed by a solver for the auction specified at the end of
44+
/// the transaction call data. If no solution can be found a dummy mapping
45+
/// gets saved to mark the settlement as processed. This can happen when a
46+
/// solver submits a solution despite not winning or if the settlement
47+
/// belongs to an auction that was arbitrated in another environment (i.e.
48+
/// prod vs. staging).
49+
pub async fn post_process_outstanding_settlement_transactions(&self) {
50+
let settlements =
51+
match Self::retry_with_sleep(|| self.persistence.get_settlements_without_auction())
52+
.await
53+
{
54+
Ok(settlements) => settlements,
55+
Err(errs) => {
56+
tracing::warn!(?errs, "failed to fetch unprocessed settlements");
6157
return;
6258
}
63-
Err(err) => {
64-
tracing::debug!(?err, "encountered retryable error");
65-
// wait a little to give temporary errors a chance to resolve themselves
66-
const TEMP_ERROR_BACK_OFF: Duration = Duration::from_millis(100);
67-
tokio::time::sleep(TEMP_ERROR_BACK_OFF).await;
68-
attempts += 1;
69-
continue;
70-
}
71-
}
59+
};
7260

73-
// everything worked fine -> reset our attempts for the next settlement
74-
attempts = 0;
61+
if settlements.is_empty() {
62+
tracing::debug!("no unprocessed settlements found");
63+
return;
7564
}
76-
}
77-
78-
/// Update database for settlement events that have not been processed yet.
79-
///
80-
/// Returns whether an update was performed.
81-
async fn single_update(&self) -> Result<IndexSuccess> {
82-
// Find a settlement event that has not been processed yet.
83-
let Some(event) = self
84-
.persistence
85-
.get_settlement_without_auction()
86-
.await
87-
.context("failed to fetch unprocessed tx from DB")?
88-
else {
89-
return Ok(IndexSuccess::NothingToDo);
90-
};
9165

92-
tracing::debug!(tx = ?event.transaction, "found unprocessed settlement");
66+
// On mainnet it's common to have multiple settlements in the
67+
// same block. So even if we process every block immediately,
68+
// we should still post-process multiple settlements concurrently.
69+
const MAX_CONCURRENCY: usize = 10;
70+
futures::stream::iter(settlements)
71+
.for_each_concurrent(MAX_CONCURRENCY, |settlement| async move {
72+
tracing::debug!(tx = ?settlement.transaction, "start post processing of settlement");
73+
match Self::retry_with_sleep(|| self.post_process_settlement(settlement)).await {
74+
Ok(_) => tracing::debug!(
75+
tx = ?settlement.transaction,
76+
"successfully post-processed settlement"
77+
),
78+
Err(errs) => tracing::warn!(
79+
tx = ?settlement.transaction,
80+
?errs,
81+
"gave up on post-processing settlement"
82+
),
83+
}
84+
})
85+
.await;
86+
}
9387

88+
async fn post_process_settlement(&self, settlement: eth::SettlementEvent) -> Result<()> {
9489
let settlement_data = self
95-
.fetch_auction_data_for_transaction(event.transaction)
90+
.fetch_auction_data_for_transaction(settlement.transaction)
9691
.await?;
9792
self.persistence
98-
.save_settlement(event, settlement_data.as_ref())
93+
.save_settlement(settlement, settlement_data.as_ref())
9994
.await
100-
.context("failed to update settlement")?;
101-
102-
match settlement_data {
103-
None => Ok(IndexSuccess::SkippedInvalidTransaction),
104-
Some(_) => Ok(IndexSuccess::IndexedSettlement),
105-
}
95+
.context("failed to update settlement")
10696
}
10797

10898
/// Inspects the calldata of the transaction, decodes the arguments, and
@@ -179,4 +169,28 @@ impl Observer {
179169
}
180170
}
181171
}
172+
173+
async fn retry_with_sleep<F, OK, ERR>(future: impl Fn() -> F) -> Result<OK, Vec<ERR>>
174+
where
175+
F: Future<Output = Result<OK, ERR>>,
176+
ERR: std::fmt::Debug,
177+
{
178+
const MAX_RETRIES: usize = 5;
179+
180+
let mut errors = Vec::new();
181+
let mut tries = 0;
182+
while tries < MAX_RETRIES {
183+
match future().await {
184+
Ok(res) => return Ok(res),
185+
Err(err) => {
186+
errors.push(err);
187+
tries += 1;
188+
// wait a little to give temporary errors a chance to resolve themselves
189+
let timeout_with_jitter = 50u64 + rand::thread_rng().gen_range(0..=50);
190+
tokio::time::sleep(Duration::from_millis(timeout_with_jitter)).await;
191+
}
192+
}
193+
}
194+
Err(errors)
195+
}
182196
}

crates/autopilot/src/infra/blockchain/contracts.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use {
66
GPv2AllowListAuthentication,
77
GPv2Settlement,
88
HooksTrampoline,
9-
InstanceExt,
109
WETH9,
1110
support::Balances,
1211
},

crates/autopilot/src/infra/persistence/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -594,17 +594,18 @@ impl Persistence {
594594

595595
/// Returns the oldest settlement event for which the accociated auction is
596596
/// not yet populated in the database.
597-
pub async fn get_settlement_without_auction(
597+
pub async fn get_settlements_without_auction(
598598
&self,
599-
) -> Result<Option<domain::eth::SettlementEvent>, DatabaseError> {
599+
) -> Result<Vec<domain::eth::SettlementEvent>, DatabaseError> {
600600
let _timer = Metrics::get()
601601
.database_queries
602602
.with_label_values(&["get_settlement_without_auction"])
603603
.start_timer();
604604

605605
let mut ex = self.postgres.pool.acquire().await?;
606-
let event = database::settlements::get_settlement_without_auction(&mut ex)
606+
let events = database::settlements::get_settlements_without_auction(&mut ex)
607607
.await?
608+
.into_iter()
608609
.map(|event| {
609610
let event = domain::eth::SettlementEvent {
610611
block: u64::try_from(event.block_number)
@@ -615,8 +616,8 @@ impl Persistence {
615616
};
616617
Ok::<_, DatabaseError>(event)
617618
})
618-
.transpose()?;
619-
Ok(event)
619+
.collect::<Result<Vec<_>, _>>()?;
620+
Ok(events)
620621
}
621622

622623
/// Returns the trade events that are associated with the settlement event

crates/autopilot/src/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use {
2727
alloy::eips::BlockNumberOrTag,
2828
chain::Chain,
2929
clap::Parser,
30-
contracts::alloy::{BalancerV2Vault, GPv2Settlement, IUniswapV3Factory, InstanceExt, WETH9},
30+
contracts::alloy::{BalancerV2Vault, GPv2Settlement, IUniswapV3Factory, WETH9},
3131
ethcontract::H160,
3232
ethrpc::{
3333
Web3,

0 commit comments

Comments
 (0)