Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions crates/driver/src/domain/competition/bad_orders/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use {
super::Quality,
crate::{
domain::competition::order,
infra::{observe::metrics, solver},
},
dashmap::DashMap,
std::{
sync::Arc,
time::{Duration, Instant},
},
};

#[derive(Default, Debug)]
struct OrderStatistics {
attempts: u32,
fails: u32,
flagged_unsupported_at: Option<Instant>,
}

/// Monitors orders to determine whether they are considered "unsupported" based
/// on the ratio of failing to total settlement encoding attempts. An order must
/// have participated in at least `REQUIRED_MEASUREMENTS` settlement attempts to
/// be evaluated. If, at that point, the ratio of failures is greater than or
/// equal to `FAILURE_RATIO`, the order is considered unsupported.
///
/// This detector tracks settlement simulation failures at the order level
/// rather than the token level, avoiding the problem of banning good tokens due
/// to solver-specific issues or bad solutions.
#[derive(Clone)]
pub struct Detector {
failure_ratio: f64,
required_measurements: u32,
counter: Arc<DashMap<order::Uid, OrderStatistics>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order needs to be dropped from this cache once it is executed. We shouldn't accumulate all the failed orders here indefinitely.

log_only: bool,
order_freeze_time: Duration,
solver: solver::Name,
}

impl Detector {
pub fn new(
failure_ratio: f64,
required_measurements: u32,
log_only: bool,
order_freeze_time: Duration,
solver: solver::Name,
) -> Self {
Self {
failure_ratio,
required_measurements,
counter: Default::default(),
log_only,
order_freeze_time,
solver,
}
}

pub fn get_quality(&self, uid: &order::Uid, now: Instant) -> Quality {
let Some(stats) = self.counter.get(uid) else {
return Quality::Unknown;
};

if stats
.flagged_unsupported_at
.is_some_and(|t| now.duration_since(t) > self.order_freeze_time)
{
// Sometimes orders only cause issues temporarily (e.g., insufficient balance
// that gets topped up later). If the order's freeze period expired we pretend
// we don't have enough information to give it another chance. If it still
// behaves badly it will get frozen immediately.
return Quality::Unknown;
}

match self.log_only {
true => Quality::Supported,
false => self.quality_based_on_stats(&stats),
}
}

fn quality_based_on_stats(&self, stats: &OrderStatistics) -> Quality {
if stats.attempts < self.required_measurements {
return Quality::Unknown;
}
let order_failure_ratio = f64::from(stats.fails) / f64::from(stats.attempts);
match order_failure_ratio >= self.failure_ratio {
true => Quality::Unsupported,
false => Quality::Supported,
}
}

/// Updates the orders that participated in settlements by
/// incrementing their attempt count.
/// `failure` indicates whether the settlement encoding/simulation was
/// successful or not.
pub fn update_orders(&self, order_uids: &[order::Uid], failure: bool) {
let now = Instant::now();
let mut new_unsupported_orders = vec![];

for uid in order_uids {
let mut stats = self
.counter
.entry(*uid)
.and_modify(|counter| {
counter.attempts += 1;
counter.fails += u32::from(failure);
})
.or_insert_with(|| OrderStatistics {
attempts: 1,
fails: u32::from(failure),
flagged_unsupported_at: None,
});
Comment on lines +103 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we shouldn't accumulate successful orders here for sure.


// order needs to be frozen as unsupported for a while
if self.quality_based_on_stats(&stats) == Quality::Unsupported
&& stats
.flagged_unsupported_at
.is_none_or(|t| now.duration_since(t) > self.order_freeze_time)
{
new_unsupported_orders.push(*uid);
stats.flagged_unsupported_at = Some(now);
}
}

if !new_unsupported_orders.is_empty() {
tracing::debug!(
orders = ?new_unsupported_orders,
"mark orders as unsupported"
);
metrics::get()
.bad_orders_detected
.with_label_values(&[&self.solver.0, "metrics"])
.inc_by(new_unsupported_orders.len() as u64);
}
}
}

#[cfg(test)]
mod tests {
use {super::*, crate::util::Bytes};

fn test_uid(value: u8) -> order::Uid {
order::Uid(Bytes([value; 56]))
}

/// Tests that an order only gets marked temporarily as unsupported.
/// After the freeze period it will be allowed again.
#[tokio::test]
async fn unfreeze_bad_orders() {
const FREEZE_DURATION: Duration = Duration::from_millis(50);
let detector = Detector::new(
0.5,
2,
false,
FREEZE_DURATION,
solver::Name("mysolver".to_string()),
);

let order_a = test_uid(1);
let order_b = test_uid(2);
let order_quality = || detector.get_quality(&order_a, Instant::now());

// order is reported as unknown while we don't have enough measurements
assert_eq!(order_quality(), Quality::Unknown);
detector.update_orders(&[order_a, order_b], true);
assert_eq!(order_quality(), Quality::Unknown);
detector.update_orders(&[order_a, order_b], true);

// after we got enough measurements the order gets marked as bad
assert_eq!(order_quality(), Quality::Unsupported);

// after the freeze period is over the order gets reported as unknown again
tokio::time::sleep(FREEZE_DURATION).await;
assert_eq!(order_quality(), Quality::Unknown);

// after an unfreeze another bad measurement is enough to freeze it again
detector.update_orders(&[order_a, order_b], true);
assert_eq!(order_quality(), Quality::Unsupported);
}

#[test]
fn different_orders_tracked_independently() {
let detector = Detector::new(
0.5,
2,
false,
Duration::from_secs(60),
solver::Name("mysolver".to_string()),
);

let order_a = test_uid(1);
let order_b = test_uid(2);

// order_a fails twice
detector.update_orders(&[order_a], true);
detector.update_orders(&[order_a], true);

// order_b succeeds twice
detector.update_orders(&[order_b], false);
detector.update_orders(&[order_b], false);

let now = Instant::now();
assert_eq!(detector.get_quality(&order_a, now), Quality::Unsupported);
assert_eq!(detector.get_quality(&order_b, now), Quality::Supported);
}
}
99 changes: 99 additions & 0 deletions crates/driver/src/domain/competition/bad_orders/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use {
crate::domain::competition::{Auction, order},
std::fmt,
};

pub mod metrics;

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Quality {
/// Order is likely to produce working solutions when included.
Supported,
/// Order will likely produce failing solutions when included.
/// This can have many reasons:
/// * order-specific issues (bad pre/post interactions, signature problems)
/// * insufficient balance or approval
/// * order targeting problematic tokens
/// * malicious or buggy order parameters
Unsupported,
/// The detection strategy does not have enough data to make an informed
/// decision.
Unknown,
}

#[derive(Default)]
pub struct Detector {
metrics: Option<metrics::Detector>,
}

impl Detector {
/// Creates a new detector without any detection mechanisms enabled.
pub fn new() -> Self {
Self { metrics: None }
}

/// Enables detection of unsupported orders based on settlement simulation
/// failure heuristics.
pub fn with_metrics_detector(&mut self, detector: metrics::Detector) -> &mut Self {
self.metrics = Some(detector);
self
}

/// Removes all unsupported orders from the auction.
pub fn filter_unsupported_orders_in_auction(&self, mut auction: Auction) -> Auction {
let now = std::time::Instant::now();

// reuse the original allocation
let all_orders = std::mem::take(&mut auction.orders);
let mut removed_uids = Vec::new();

let supported_orders: Vec<_> = all_orders
.into_iter()
.filter_map(|order| {
let quality = self.get_order_quality(&order.uid, now);
match quality {
Quality::Supported | Quality::Unknown => Some(order),
Quality::Unsupported => {
removed_uids.push(order.uid);
None
}
}
})
.collect();

auction.orders = supported_orders;
if !removed_uids.is_empty() {
tracing::debug!(orders = ?removed_uids, "ignored orders flagged as unsupported");
}

auction
}

/// Updates the order quality metric for successful settlements.
pub fn encoding_succeeded(&self, order_uids: &[order::Uid]) {
if let Some(metrics) = &self.metrics {
metrics.update_orders(order_uids, false);
}
}

/// Updates the order quality metric for failed settlements.
pub fn encoding_failed(&self, order_uids: &[order::Uid]) {
if let Some(metrics) = &self.metrics {
metrics.update_orders(order_uids, true);
}
}

fn get_order_quality(&self, uid: &order::Uid, now: std::time::Instant) -> Quality {
if let Some(Quality::Unsupported) = self.metrics.as_ref().map(|m| m.get_quality(uid, now)) {
return Quality::Unsupported;
}

Quality::Unknown
}
}

impl fmt::Debug for Detector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Detector").finish()
}
}
22 changes: 18 additions & 4 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use {
};

pub mod auction;
pub mod bad_orders;
pub mod bad_tokens;
pub mod order;
mod pre_processing;
Expand Down Expand Up @@ -68,6 +69,7 @@ pub struct Competition {
/// Cached solutions with the most recent solutions at the front.
pub settlements: Mutex<VecDeque<Settlement>>,
pub bad_tokens: Arc<bad_tokens::Detector>,
pub bad_orders: Arc<bad_orders::Detector>,
fetcher: Arc<pre_processing::DataAggregator>,
settle_queue: mpsc::Sender<SettleRequest>,
order_sorting_strategies: Vec<Arc<dyn sorting::SortingStrategy>>,
Expand All @@ -83,6 +85,7 @@ impl Competition {
simulator: Simulator,
mempools: Mempools,
bad_tokens: Arc<bad_tokens::Detector>,
bad_orders: Arc<bad_orders::Detector>,
fetcher: Arc<DataAggregator>,
order_sorting_strategies: Vec<Arc<dyn sorting::SortingStrategy>>,
) -> Arc<Self> {
Expand All @@ -98,6 +101,7 @@ impl Competition {
settlements: Default::default(),
settle_queue: settle_sender,
bad_tokens,
bad_orders,
fetcher,
order_sorting_strategies,
});
Expand Down Expand Up @@ -242,6 +246,7 @@ impl Competition {
.map(|solution| async move {
let id = solution.id().clone();
let token_pairs = solution.token_pairs();
let order_uids = solution.order_uids();
observe::encoding(&id);
let settlement = solution
.encode(
Expand All @@ -251,19 +256,21 @@ impl Competition {
self.solver.solver_native_token(),
)
.await;
(id, token_pairs, settlement)
(id, token_pairs, order_uids, settlement)
})
.collect::<FuturesUnordered<_>>()
.filter_map(|(id, token_pairs, result)| async move {
.filter_map(|(id, token_pairs, order_uids, result)| async move {
match result {
Ok(solution) => {
self.bad_tokens.encoding_succeeded(&token_pairs);
self.bad_orders.encoding_succeeded(&order_uids);
Some(solution)
}
// don't report on errors coming from solution merging
Err(_err) if id.solutions().len() > 1 => None,
Err(err) => {
self.bad_tokens.encoding_failed(&token_pairs);
self.bad_orders.encoding_failed(&order_uids);
observe::encoding_failed(self.solver.name(), &id, &err);
notify::encoding_failed(&self.solver, auction.id(), &id, &err);
None
Expand Down Expand Up @@ -767,9 +774,16 @@ impl Competition {
if !self.solver.config().flashloans_enabled {
auction.orders.retain(|o| o.app_data.flashloan().is_none());
}
self.bad_tokens
// First filter by bad tokens (simulation-based detection)
auction = self
.bad_tokens
.filter_unsupported_orders_in_auction(auction)
.await
.await;
// Then filter by bad orders (metrics-based detection)
auction = self
.bad_orders
.filter_unsupported_orders_in_auction(auction);
auction
}
}

Expand Down
Loading
Loading