Skip to content

Commit 476bdff

Browse files
authored
Post process settlements in parallel (#3853)
# Description Currently we post-process settlements (i.e. associate the tx with a solution proposed by a solver) serially. IIRC it was done that way because it was simpler and that step simply ran in a background task at the time. Since then this logic has been moved onto the hot path so all the time we spend there delays the creation of the next auction. Additionally with the introduction of combinatorial auctions it's now possible and relatively common to have multiple settlement transactions in the same block. Whenever we have multiple settlements in one block this PR should result in a significant performance uplift because we process multiple concurrently. # Changes replaced loop to post-process auctions serially with logic that first fetches ALL unprocessed settlements and then works on up to 10 of them concurrently. This is fine for the post processing logic (as opposed to the raw event indexing) because it's okay for post-processing to happen out of order. So if the we successfully post-process settlement `n+1` but not settlement `n` (e.g. due to a crash) the DB query would still just return all unprocessed events instead of all NEW unprocessed events. I also adjusted how retrying works in this code. Instead of returning some result enum (ok, invalid, nothing_to_do) to make the serial loop retry something we now have a function `retry_with_sleep` that simply retires a passed in future n times and returns an `Option` to indicate whether it was successful or not. This is used in 2 places: 1. fetching all outstanding settlements 2. post-processing individual outstanding settlement With that the new logic should be as robust as the old one while being IMO easier to reason about. ## How to test Not sure how to test the improvement specifically. Since this is a performance optimization and I don't really want to test the internals of the implementation having a new test that makes sure that a big number of settlements can be post-processed would be enough to test correctness and for the performance aspect we'd have to deploy this on the cluster. Regarding the performance I temporarily deployed it to staging and it produced the expected effect of reducing the spikiness that comes from multiple settlements needing to be post-processed in the same block. <img width="954" height="270" alt="Screenshot 2025-11-03 at 22 12 26" src="https://github.com/user-attachments/assets/b459fb76-bf44-40db-894e-46ffd3d67f7d" />
1 parent 914055c commit 476bdff

File tree

4 files changed

+90
-74
lines changed

4 files changed

+90
-74
lines changed

crates/autopilot/src/boundary/events/settlement.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ impl EventStoring<(GPv2SettlementEvents, Log)> for Indexer {
7575
database::settlements::delete(&mut transaction, from_block).await?;
7676
transaction.commit().await?;
7777

78-
self.settlement_observer.update().await;
78+
self.settlement_observer
79+
.post_process_outstanding_settlement_transactions()
80+
.await;
7981
Ok(())
8082
}
8183

@@ -84,7 +86,9 @@ impl EventStoring<(GPv2SettlementEvents, Log)> for Indexer {
8486
crate::database::events::append_events(&mut transaction, events).await?;
8587
transaction.commit().await?;
8688

87-
self.settlement_observer.update().await;
89+
self.settlement_observer
90+
.post_process_outstanding_settlement_transactions()
91+
.await;
8892
Ok(())
8993
}
9094
}

crates/autopilot/src/domain/settlement/observer.rs

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use {
2020
},
2121
anyhow::{Context, Result, anyhow},
2222
ethrpc::alloy::conversions::IntoLegacy,
23+
futures::StreamExt,
24+
rand::Rng,
2325
std::time::Duration,
2426
};
2527

@@ -29,80 +31,68 @@ pub struct Observer {
2931
persistence: infra::Persistence,
3032
}
3133

32-
enum IndexSuccess {
33-
NothingToDo,
34-
IndexedSettlement,
35-
SkippedInvalidTransaction,
36-
}
37-
3834
impl Observer {
3935
/// Creates a new Observer and asynchronously schedules the first update
4036
/// run.
4137
pub fn new(eth: infra::Ethereum, persistence: infra::Persistence) -> Self {
4238
Self { eth, persistence }
4339
}
4440

45-
/// Fetches all the available missing data needed for bookkeeping.
46-
/// This needs to get called after indexing a new settlement event
47-
/// since this code needs that data to already be present in the DB.
48-
pub async fn update(&self) {
49-
const MAX_RETRIES: usize = 5;
50-
let mut attempts = 0;
51-
while attempts < MAX_RETRIES {
52-
match self.single_update().await {
53-
Ok(IndexSuccess::IndexedSettlement) => {
54-
tracing::debug!("on settlement event updater ran and processed event");
55-
}
56-
Ok(IndexSuccess::SkippedInvalidTransaction) => {
57-
tracing::warn!("stored default values for unindexable transaction");
58-
}
59-
Ok(IndexSuccess::NothingToDo) => {
60-
tracing::debug!("on settlement event updater ran without update");
41+
/// Post processes all outstanding settlements. This involves decoding the
42+
/// settlement details from the transaction and associating it with a
43+
/// solution proposed by a solver for the auction specified at the end of
44+
/// the transaction call data. If no solution can be found a dummy mapping
45+
/// gets saved to mark the settlement as processed. This can happen when a
46+
/// solver submits a solution despite not winning or if the settlement
47+
/// belongs to an auction that was arbitrated in another environment (i.e.
48+
/// prod vs. staging).
49+
pub async fn post_process_outstanding_settlement_transactions(&self) {
50+
let settlements =
51+
match Self::retry_with_sleep(|| self.persistence.get_settlements_without_auction())
52+
.await
53+
{
54+
Ok(settlements) => settlements,
55+
Err(errs) => {
56+
tracing::warn!(?errs, "failed to fetch unprocessed settlements");
6157
return;
6258
}
63-
Err(err) => {
64-
tracing::debug!(?err, "encountered retryable error");
65-
// wait a little to give temporary errors a chance to resolve themselves
66-
const TEMP_ERROR_BACK_OFF: Duration = Duration::from_millis(100);
67-
tokio::time::sleep(TEMP_ERROR_BACK_OFF).await;
68-
attempts += 1;
69-
continue;
70-
}
71-
}
59+
};
7260

73-
// everything worked fine -> reset our attempts for the next settlement
74-
attempts = 0;
61+
if settlements.is_empty() {
62+
tracing::debug!("no unprocessed settlements found");
63+
return;
7564
}
76-
}
77-
78-
/// Update database for settlement events that have not been processed yet.
79-
///
80-
/// Returns whether an update was performed.
81-
async fn single_update(&self) -> Result<IndexSuccess> {
82-
// Find a settlement event that has not been processed yet.
83-
let Some(event) = self
84-
.persistence
85-
.get_settlement_without_auction()
86-
.await
87-
.context("failed to fetch unprocessed tx from DB")?
88-
else {
89-
return Ok(IndexSuccess::NothingToDo);
90-
};
9165

92-
tracing::debug!(tx = ?event.transaction, "found unprocessed settlement");
66+
// On mainnet it's common to have multiple settlements in the
67+
// same block. So even if we process every block immediately,
68+
// we should still post-process multiple settlements concurrently.
69+
const MAX_CONCURRENCY: usize = 10;
70+
futures::stream::iter(settlements)
71+
.for_each_concurrent(MAX_CONCURRENCY, |settlement| async move {
72+
tracing::debug!(tx = ?settlement.transaction, "start post processing of settlement");
73+
match Self::retry_with_sleep(|| self.post_process_settlement(settlement)).await {
74+
Ok(_) => tracing::debug!(
75+
tx = ?settlement.transaction,
76+
"successfully post-processed settlement"
77+
),
78+
Err(errs) => tracing::warn!(
79+
tx = ?settlement.transaction,
80+
?errs,
81+
"gave up on post-processing settlement"
82+
),
83+
}
84+
})
85+
.await;
86+
}
9387

88+
async fn post_process_settlement(&self, settlement: eth::SettlementEvent) -> Result<()> {
9489
let settlement_data = self
95-
.fetch_auction_data_for_transaction(event.transaction)
90+
.fetch_auction_data_for_transaction(settlement.transaction)
9691
.await?;
9792
self.persistence
98-
.save_settlement(event, settlement_data.as_ref())
93+
.save_settlement(settlement, settlement_data.as_ref())
9994
.await
100-
.context("failed to update settlement")?;
101-
102-
match settlement_data {
103-
None => Ok(IndexSuccess::SkippedInvalidTransaction),
104-
Some(_) => Ok(IndexSuccess::IndexedSettlement),
105-
}
95+
.context("failed to update settlement")
10696
}
10797

10898
/// Inspects the calldata of the transaction, decodes the arguments, and
@@ -179,4 +169,28 @@ impl Observer {
179169
}
180170
}
181171
}
172+
173+
async fn retry_with_sleep<F, OK, ERR>(future: impl Fn() -> F) -> Result<OK, Vec<ERR>>
174+
where
175+
F: Future<Output = Result<OK, ERR>>,
176+
ERR: std::fmt::Debug,
177+
{
178+
const MAX_RETRIES: usize = 5;
179+
180+
let mut errors = Vec::new();
181+
let mut tries = 0;
182+
while tries < MAX_RETRIES {
183+
match future().await {
184+
Ok(res) => return Ok(res),
185+
Err(err) => {
186+
errors.push(err);
187+
tries += 1;
188+
// wait a little to give temporary errors a chance to resolve themselves
189+
let timeout_with_jitter = 50u64 + rand::thread_rng().gen_range(0..=50);
190+
tokio::time::sleep(Duration::from_millis(timeout_with_jitter)).await;
191+
}
192+
}
193+
}
194+
Err(errors)
195+
}
182196
}

crates/autopilot/src/infra/persistence/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -594,17 +594,18 @@ impl Persistence {
594594

595595
/// Returns the oldest settlement event for which the accociated auction is
596596
/// not yet populated in the database.
597-
pub async fn get_settlement_without_auction(
597+
pub async fn get_settlements_without_auction(
598598
&self,
599-
) -> Result<Option<domain::eth::SettlementEvent>, DatabaseError> {
599+
) -> Result<Vec<domain::eth::SettlementEvent>, DatabaseError> {
600600
let _timer = Metrics::get()
601601
.database_queries
602602
.with_label_values(&["get_settlement_without_auction"])
603603
.start_timer();
604604

605605
let mut ex = self.postgres.pool.acquire().await?;
606-
let event = database::settlements::get_settlement_without_auction(&mut ex)
606+
let events = database::settlements::get_settlements_without_auction(&mut ex)
607607
.await?
608+
.into_iter()
608609
.map(|event| {
609610
let event = domain::eth::SettlementEvent {
610611
block: u64::try_from(event.block_number)
@@ -615,8 +616,8 @@ impl Persistence {
615616
};
616617
Ok::<_, DatabaseError>(event)
617618
})
618-
.transpose()?;
619-
Ok(event)
619+
.collect::<Result<Vec<_>, _>>()?;
620+
Ok(events)
620621
}
621622

622623
/// Returns the trade events that are associated with the settlement event

crates/database/src/settlements.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,16 @@ pub struct SettlementEvent {
3131
}
3232

3333
#[instrument(skip_all)]
34-
pub async fn get_settlement_without_auction(
34+
pub async fn get_settlements_without_auction(
3535
ex: &mut PgConnection,
36-
) -> Result<Option<SettlementEvent>, sqlx::Error> {
36+
) -> Result<Vec<SettlementEvent>, sqlx::Error> {
3737
const QUERY: &str = r#"
3838
SELECT block_number, log_index, tx_hash
3939
FROM settlements
4040
WHERE auction_id IS NULL
4141
ORDER BY block_number ASC
42-
LIMIT 1
4342
"#;
44-
sqlx::query_as(QUERY).fetch_optional(ex).await
43+
sqlx::query_as(QUERY).fetch_all(ex).await
4544
}
4645

4746
#[instrument(skip_all)]
@@ -195,10 +194,8 @@ mod tests {
195194
.await
196195
.unwrap();
197196

198-
let settlement = get_settlement_without_auction(&mut db)
199-
.await
200-
.unwrap()
201-
.unwrap();
197+
let settlements = get_settlements_without_auction(&mut db).await.unwrap();
198+
let settlement = &settlements[0];
202199

203200
assert_eq!(settlement.block_number, event.block_number);
204201
assert_eq!(settlement.log_index, event.log_index);
@@ -207,8 +204,8 @@ mod tests {
207204
.await
208205
.unwrap();
209206

210-
let settlement = get_settlement_without_auction(&mut db).await.unwrap();
207+
let settlement = get_settlements_without_auction(&mut db).await.unwrap();
211208

212-
assert!(settlement.is_none());
209+
assert!(settlement.is_empty());
213210
}
214211
}

0 commit comments

Comments
 (0)