Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::domain::{competition::bad_tokens::Quality, eth},
crate::domain::{competition::bad_orders::Quality, eth},
dashmap::DashMap,
std::{
sync::Arc,
Expand Down
276 changes: 276 additions & 0 deletions crates/driver/src/domain/competition/bad_orders/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
use {
super::Quality,
crate::{
domain::competition::order::Uid,
infra::{observe::metrics, solver},
},
dashmap::DashMap,
std::{
sync::{Arc, Weak},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
},
};

#[derive(Debug)]
struct OrderStatistics {
attempts: u32,
fails: u32,
flagged_unsupported_at: Option<Instant>,
/// When an order was last seen in a solution. This
/// timestamp is used to determine whether the order's
/// metrics can be evicted from the cache to avoid bloat.
last_seen_at: Instant,
}

/// Monitors orders to determine whether they are considered "unsupported" based
/// on the ratio of failing to total settlement encoding attempts. An order must
/// have participated in at least `REQUIRED_MEASUREMENTS` attempts to be
/// evaluated. If, at that point, the ratio of failures is greater than or equal
/// to `FAILURE_RATIO`, the order is considered unsupported.
#[derive(Clone)]
pub struct Detector {
failure_ratio: f64,
required_measurements: u32,
counter: Arc<DashMap<Uid, OrderStatistics>>,
log_only: bool,
order_freeze_time: Duration,
solver: solver::Name,
}

impl Detector {
pub fn new(
failure_ratio: f64,
required_measurements: u32,
log_only: bool,
order_freeze_time: Duration,
gc_interval: Duration,
gc_max_age: Duration,
solver: solver::Name,
) -> Self {
let counter = Arc::new(DashMap::default());

Self::spawn_gc_task(Arc::downgrade(&counter), gc_interval, gc_max_age);

Self {
failure_ratio,
required_measurements,
counter: counter.clone(),
log_only,
order_freeze_time,
solver,
}
}

pub fn get_quality(&self, order: &Uid, now: Instant) -> Quality {
let Some(stats) = self.counter.get(order) else {
return Quality::Unknown;
};

if stats
.flagged_unsupported_at
.is_some_and(|t| now.duration_since(t) > self.order_freeze_time)
{
// Sometimes tokens only cause issues temporarily. If the token's freeze
// period expired we pretend we don't have enough information to give it
// another chance. If it still behaves badly it will get frozen immediately.
return Quality::Unknown;
}

match self.log_only {
true => Quality::Supported,
false => self.quality_based_on_stats(&stats),
}
}

fn quality_based_on_stats(&self, stats: &OrderStatistics) -> Quality {
if stats.attempts < self.required_measurements {
return Quality::Unknown;
}
let token_failure_ratio = f64::from(stats.fails) / f64::from(stats.attempts);
match token_failure_ratio >= self.failure_ratio {
true => Quality::Unsupported,
false => Quality::Supported,
}
}

/// Updates the orders that participated in settlements by
/// incrementing their attempt count.
/// `failure` indicates whether the settlement was successful or not.
pub fn update_orders(&self, orders: &[Uid], failure: bool) {
let now = Instant::now();
let mut new_unsupported_orders = vec![];
orders.iter().for_each(|order| {
let mut stats = self
.counter
.entry(*order)
.and_modify(|counter| {
counter.attempts += 1;
counter.fails += u32::from(failure);
counter.last_seen_at = now;
})
.or_insert_with(|| OrderStatistics {
attempts: 1,
fails: u32::from(failure),
flagged_unsupported_at: None,
last_seen_at: now,
});

// order needs to be frozen as unsupported for a while
if self.quality_based_on_stats(&stats) == Quality::Unsupported
&& stats
.flagged_unsupported_at
.is_none_or(|t| now.duration_since(t) > self.order_freeze_time)
{
new_unsupported_orders.push(order);
stats.flagged_unsupported_at = Some(now);
}
});

if !new_unsupported_orders.is_empty() {
tracing::debug!(
orders = ?new_unsupported_orders,
"mark order as unsupported"
);
metrics::get()
.bad_orders_detected
.with_label_values(&[&self.solver.0])
.inc_by(new_unsupported_orders.len() as u64);
}
}

/// Spawns a background tasks that periodically evicts items from the cache
/// that are no longer relevant to avoid bloat.
fn spawn_gc_task(
cache: Weak<DashMap<Uid, OrderStatistics>>,
interval: Duration,
max_age: Duration,
) {
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(interval);
while let Some(cache) = cache.upgrade() {
let now = Instant::now();
let now_as_unix = current_unix_timestamp();

cache.retain(|uid, stats| {
u64::from(uid.valid_to()) > now_as_unix
&& now.duration_since(stats.last_seen_at) < max_age
});
interval.tick().await;
}
tracing::debug!("terminating gc task because cache was dropped");
});
}
}

fn current_unix_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}

#[cfg(test)]
mod tests {
use {super::*, crate::util::Bytes};

/// Tests that an order only gets marked temporarily as unsupported.
/// After the freeze period it will be allowed again.
#[tokio::test]
async fn unfreeze_bad_orders() {
const FREEZE_DURATION: Duration = Duration::from_millis(50);
let detector = Detector::new(
0.5,
2,
false,
FREEZE_DURATION,
Duration::from_hours(1),
Duration::from_hours(1),
solver::Name("mysolver".to_string()),
);

let order = Uid(Bytes([1; 56]));
let order_quality = || detector.get_quality(&order, Instant::now());

// order is reported as unknown while we don't have enough measurements
assert_eq!(order_quality(), Quality::Unknown);
detector.update_orders(&[order], true);
assert_eq!(order_quality(), Quality::Unknown);
detector.update_orders(&[order], true);

// after we got enough measurements the order gets marked as bad
assert_eq!(order_quality(), Quality::Unsupported);

// after the freeze period is over the token gets reported as unknown again
tokio::time::sleep(FREEZE_DURATION).await;
assert_eq!(order_quality(), Quality::Unknown);

// after an unfreeze another bad measurement is enough to freeze it again
detector.update_orders(&[order], true);
assert_eq!(order_quality(), Quality::Unsupported);
}

/// Tests that the GC task correctly evicts orders that are expired
/// or have not been seen for the configured amount of time.
#[tokio::test]
async fn evict_outdated_entries() {
const FREEZE_DURATION: Duration = Duration::from_millis(50);
const GC_INTERVAL: Duration = Duration::from_millis(10);
const GC_CYCLES_UNTIL_EVICTION: u32 = 5;
let gc_max_age = GC_INTERVAL * GC_CYCLES_UNTIL_EVICTION;

// this spawns a gc task that evicts entries from the cache
let detector = Detector::new(
0.5,
2,
false,
FREEZE_DURATION,
GC_INTERVAL,
gc_max_age,
solver::Name("mysolver".to_string()),
);

let long_valid_to = (current_unix_timestamp() + 1000) as u32;
let short_valid_to = 0; // already expired -> evict on first GC run

let long_order = Uid::from_parts(Default::default(), Default::default(), long_valid_to);
let short_order = Uid::from_parts(Default::default(), Default::default(), short_valid_to);

assert_eq!(detector.counter.len(), 0);
detector.update_orders(&[long_order, short_order], true);
assert_eq!(detector.counter.len(), 2);
assert!(detector.counter.get(&long_order).is_some());
assert!(detector.counter.get(&short_order).is_some());

// The gc task and this test operate on an interval. In order to avoid
// issues due to variance we wait half a GC interval to make sure
// our assertions always happen in the middle between 2 GC runs.
tokio::time::sleep(GC_INTERVAL / 2).await;
let mut interval = tokio::time::interval(GC_INTERVAL);

// after 1 GC cycle the expired order was evicted
assert_eq!(detector.counter.len(), 1);
assert!(detector.counter.get(&long_order).is_some());

for _ in 0..(GC_CYCLES_UNTIL_EVICTION - 1) {
interval.tick().await;
}

// order was still not evicted because the max age has not been reached yet
assert_eq!(detector.counter.len(), 1);
assert!(detector.counter.get(&long_order).is_some());

// add another measurement to extend lifetime in cache
detector.update_orders(&[long_order], true);

// metrics are still in the cache after almost max_age * 2
for _ in 0..=(GC_CYCLES_UNTIL_EVICTION - 1) {
interval.tick().await;
}
assert_eq!(detector.counter.len(), 1);
assert!(detector.counter.get(&long_order).is_some());

// after one more GC cycle the order finally gets evicted
interval.tick().await;
assert_eq!(detector.counter.len(), 0);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
//! This module implements logic to detect orders that
//! a solver is not able to support. The module supports
//! flagging individual tokens that are not supported outright.
//! A bad token could for example be one that forbids trading
//! with AMMs, only allows 1 transfer per transaction/block, or
//! was simply built with a buggy compiler which makes it incompatible
//! with the settlement contract (see <https://github.com/cowprotocol/services/pull/781>).
//!
//! Additionally there are some heuristics to detect when an
//! order itself is somehow broken or causes issues and slipped through
//! other detection mechanisms. One big error case is orders adjusting
//! debt postions in lending protocols. While pre-checks might correctly
//! detect that the EIP 1271 signature is valid the transfer of the token
//! would fail because the user's debt position is not collateralized enough.
//! In other words the bad order detection is a last fail safe in case
//! we were not able to predict issues with orders and pre-emptively
//! filter them out of the auction.
use {
crate::domain::{competition::Auction, eth},
crate::domain::{
competition::{Auction, order::Uid},
eth,
},
futures::{StreamExt, stream::FuturesUnordered},
std::{collections::HashMap, fmt, time::Instant},
};
Expand Down Expand Up @@ -71,6 +91,12 @@ impl Detector {

let mut supported_orders: Vec<_> = supported_orders
.into_iter()
.filter(|order| {
self.metrics
.as_ref()
.map(|metrics| metrics.get_quality(&order.uid, now))
!= Some(Quality::Unsupported)
})
.filter_map(|order| {
let sell = self.get_token_quality(order.sell.token, now);
let buy = self.get_token_quality(order.buy.token, now);
Expand Down Expand Up @@ -123,16 +149,16 @@ impl Detector {
}

/// Updates the tokens quality metric for successful operation.
pub fn encoding_succeeded(&self, token_pairs: &[(eth::TokenAddress, eth::TokenAddress)]) {
pub fn encoding_succeeded(&self, orders: &[Uid]) {
if let Some(metrics) = &self.metrics {
metrics.update_tokens(token_pairs, false);
metrics.update_orders(orders, false);
}
}

/// Updates the tokens quality metric for failures.
pub fn encoding_failed(&self, token_pairs: &[(eth::TokenAddress, eth::TokenAddress)]) {
pub fn encoding_failed(&self, orders: &[Uid]) {
if let Some(metrics) = &self.metrics {
metrics.update_tokens(token_pairs, true);
metrics.update_orders(orders, true);
}
}

Expand All @@ -142,21 +168,10 @@ impl Detector {
Some(quality) => return *quality,
}

if let Some(Quality::Unsupported) = self
.simulation_detector
self.simulation_detector
.as_ref()
.map(|d| d.get_quality(&token, now))
{
return Quality::Unsupported;
}

if let Some(Quality::Unsupported) =
self.metrics.as_ref().map(|m| m.get_quality(&token, now))
{
return Quality::Unsupported;
}

Quality::Unknown
.unwrap_or(Quality::Unknown)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
domain::competition::{
Order,
bad_tokens::{Quality, cache::Cache},
bad_orders::{Quality, cache::Cache},
order,
},
infra::{self, observe::metrics},
Expand Down Expand Up @@ -99,7 +99,7 @@ impl Detector {
Ok(TokenQuality::Bad { reason }) => {
tracing::debug!(reason, token=?sell_token.0, "cache token as unsupported");
// All solvers share the same cache for the simulation detector, so there is no need to specify the solver name here.
metrics::get().bad_tokens_detected.with_label_values(&["any", "simulation"]).inc();
metrics::get().bad_tokens_detected.inc();
inner
.cache
.update_quality(sell_token, false, now);
Expand Down
Loading
Loading