Skip to content

Commit 810cd14

Browse files
Merge pull request #408 from propeller-heads/cluster-test/dc/ENG-5103-test-stale-components
feat(cluster-test): Test stale components
2 parents 198ba33 + 61e270b commit 810cd14

File tree

5 files changed

+168
-40
lines changed

5 files changed

+168
-40
lines changed

src/evm/protocol/uniswap_v4/state.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{any::Any, collections::HashMap};
1+
use std::{any::Any, collections::HashMap, fmt};
22

33
use alloy::primitives::{Address, Sign, I256, U256};
44
use num_bigint::BigUint;
@@ -40,7 +40,7 @@ use crate::evm::protocol::{
4040
vm::constants::EXTERNAL_ACCOUNT,
4141
};
4242

43-
#[derive(Clone, Debug)]
43+
#[derive(Clone)]
4444
pub struct UniswapV4State {
4545
liquidity: u128,
4646
sqrt_price: U256,
@@ -51,6 +51,19 @@ pub struct UniswapV4State {
5151
pub hook: Option<Box<dyn HookHandler>>,
5252
}
5353

54+
impl fmt::Debug for UniswapV4State {
55+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56+
f.debug_struct("UniswapV4State")
57+
.field("liquidity", &self.liquidity)
58+
.field("sqrt_price", &self.sqrt_price)
59+
.field("fees", &self.fees)
60+
.field("tick", &self.tick)
61+
.field("ticks", &self.ticks)
62+
.field("tick_spacing", &self.tick_spacing)
63+
.finish_non_exhaustive()
64+
}
65+
}
66+
5467
impl PartialEq for UniswapV4State {
5568
fn eq(&self, other: &Self) -> bool {
5669
match (&self.hook, &other.hook) {

src/evm/protocol/vm/state.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use std::{
33
any::Any,
44
collections::{HashMap, HashSet},
5-
fmt::Debug,
5+
fmt::{self, Debug},
66
str::FromStr,
77
};
88

@@ -34,7 +34,7 @@ use crate::evm::{
3434
},
3535
};
3636

37-
#[derive(Clone, Debug)]
37+
#[derive(Clone)]
3838
pub struct EVMPoolState<D: EngineDatabaseInterface + Clone + Debug>
3939
where
4040
<D as DatabaseRef>::Error: Debug,
@@ -70,6 +70,23 @@ where
7070
adapter_contract: TychoSimulationContract<D>,
7171
}
7272

73+
impl<D> Debug for EVMPoolState<D>
74+
where
75+
D: EngineDatabaseInterface + Clone + Debug,
76+
<D as DatabaseRef>::Error: Debug,
77+
<D as EngineDatabaseInterface>::Error: Debug,
78+
{
79+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80+
f.debug_struct("EVMPoolState")
81+
.field("id", &self.id)
82+
.field("tokens", &self.tokens)
83+
.field("balances", &self.balances)
84+
.field("involved_contracts", &self.involved_contracts)
85+
.field("contract_balances", &self.contract_balances)
86+
.finish_non_exhaustive()
87+
}
88+
}
89+
7390
impl<D> EVMPoolState<D>
7491
where
7592
D: EngineDatabaseInterface + Clone + Debug + 'static,

src/rfq/protocols/bebop/state.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{any::Any, collections::HashMap};
1+
use std::{any::Any, collections::HashMap, fmt};
22

33
use async_trait::async_trait;
44
use num_bigint::BigUint;
@@ -19,14 +19,23 @@ use crate::rfq::{
1919
protocols::bebop::{client::BebopClient, models::BebopPriceData},
2020
};
2121

22-
#[derive(Debug, Clone)]
22+
#[derive(Clone)]
2323
pub struct BebopState {
2424
pub base_token: Token,
2525
pub quote_token: Token,
2626
pub price_data: BebopPriceData,
2727
pub client: BebopClient,
2828
}
2929

30+
impl fmt::Debug for BebopState {
31+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32+
f.debug_struct("BebopState")
33+
.field("base_token", &self.base_token)
34+
.field("quote_token", &self.quote_token)
35+
.finish_non_exhaustive()
36+
}
37+
}
38+
3039
impl BebopState {
3140
pub fn new(
3241
base_token: Token,

src/rfq/protocols/hashflow/state.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{any::Any, collections::HashMap};
1+
use std::{any::Any, collections::HashMap, fmt};
22

33
use async_trait::async_trait;
44
use num_bigint::BigUint;
@@ -19,16 +19,25 @@ use crate::rfq::{
1919
protocols::hashflow::{client::HashflowClient, models::HashflowMarketMakerLevels},
2020
};
2121

22-
#[derive(Debug, Clone)]
22+
#[derive(Clone)]
2323
pub struct HashflowState {
2424
pub base_token: Token,
2525
pub quote_token: Token,
2626
pub levels: HashflowMarketMakerLevels,
27-
#[allow(dead_code)]
2827
pub market_maker: String,
2928
pub client: HashflowClient,
3029
}
3130

31+
impl fmt::Debug for HashflowState {
32+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33+
f.debug_struct("HashflowState")
34+
.field("base_token", &self.base_token)
35+
.field("quote_token", &self.quote_token)
36+
.field("market_maker", &self.market_maker)
37+
.finish_non_exhaustive()
38+
}
39+
}
40+
3241
impl HashflowState {
3342
pub fn new(
3443
base_token: Token,

tycho-integration-test/src/main.rs

Lines changed: 111 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ mod metrics;
33
mod stream_processor;
44

55
use std::{
6-
collections::HashMap,
6+
collections::{HashMap, HashSet},
77
fmt::Debug,
8-
num::NonZeroUsize,
98
sync::{Arc, RwLock},
109
time::Duration,
1110
};
@@ -21,10 +20,10 @@ use alloy_chains::NamedChain;
2120
use clap::Parser;
2221
use dotenv::dotenv;
2322
use itertools::Itertools;
24-
use lru::LruCache;
2523
use miette::{miette, IntoDiagnostic, NarratableReportHandler, WrapErr};
2624
use num_bigint::BigUint;
2725
use num_traits::{ToPrimitive, Zero};
26+
use rand::prelude::IndexedRandom;
2827
use tokio::sync::Semaphore;
2928
use tracing::{error, info, warn};
3029
use tracing_subscriber::EnvFilter;
@@ -98,10 +97,14 @@ struct Cli {
9897
#[arg(long, default_value_t = 5, value_parser = clap::value_parser!(u8).range(1..))]
9998
parallel_simulations: u8,
10099

101-
/// Maximum number of simulations to run per protocol update
100+
/// Maximum number of simulations (of updated states) to run per update
102101
#[arg(long, default_value_t = 10, value_parser = clap::value_parser!(u16).range(1..))]
103102
max_simulations: u16,
104103

104+
/// Maximum number of simulations (of stale states) to run per update per protocol
105+
#[arg(long, default_value_t = 10, value_parser = clap::value_parser!(u16).range(1..))]
106+
max_simulations_stale: u16,
107+
105108
/// The RFQ stream will skip messages for this duration (in seconds) after processing a message
106109
#[arg(long, default_value_t = 600)]
107110
skip_messages_duration: u64,
@@ -120,6 +123,13 @@ impl Debug for Cli {
120123
}
121124
}
122125

126+
#[derive(Default)]
127+
struct TychoState {
128+
states: HashMap<String, Box<dyn ProtocolSim>>,
129+
components: HashMap<String, ProtocolComponent>,
130+
component_ids_by_protocol: HashMap<String, HashSet<String>>,
131+
}
132+
123133
#[tokio::main]
124134
async fn main() -> miette::Result<()> {
125135
miette::set_hook(Box::new(|_| Box::new(NarratableReportHandler::new())))?;
@@ -192,13 +202,7 @@ async fn run(cli: Cli) -> miette::Result<()> {
192202
}
193203
}
194204

195-
// Assuming a ProtocolComponent instance can be around 1KB (2 tokens, 2 contract_ids, 6 static
196-
// attributes) 250,000 entries would use 250MB of memory.
197-
// In a 25min test, the cache increased at a rate of ~2 items/minute, or ~3k items/day, so it
198-
// would take ~80 days to get full and start dropping the least used items.
199-
let protocol_pairs = Arc::new(RwLock::new(LruCache::new(
200-
NonZeroUsize::new(250_000).ok_or_else(|| miette!("Invalid NonZeroUsize"))?,
201-
)));
205+
let tycho_state = Arc::new(RwLock::new(TychoState::default()));
202206

203207
// Process streams updates
204208
info!("Waiting for first protocol update...");
@@ -214,15 +218,15 @@ async fn run(cli: Cli) -> miette::Result<()> {
214218

215219
let cli = cli.clone();
216220
let rpc_tools = rpc_tools.clone();
217-
let protocol_pairs = protocol_pairs.clone();
221+
let tycho_state = tycho_state.clone();
218222
let permit = semaphore
219223
.clone()
220224
.acquire_owned()
221225
.await
222226
.into_diagnostic()
223227
.wrap_err("Failed to acquire permit")?;
224228
tokio::spawn(async move {
225-
if let Err(e) = process_update(cli, chain, rpc_tools, protocol_pairs, &update).await {
229+
if let Err(e) = process_update(cli, chain, rpc_tools, tycho_state, &update).await {
226230
warn!("{}", format_error_chain(&e));
227231
}
228232
drop(permit);
@@ -236,7 +240,7 @@ async fn process_update(
236240
cli: Arc<Cli>,
237241
chain: Chain,
238242
rpc_tools: RPCTools,
239-
protocol_pairs: Arc<RwLock<LruCache<String, ProtocolComponent>>>,
243+
tycho_state: Arc<RwLock<TychoState>>,
240244
update: &StreamUpdate,
241245
) -> miette::Result<()> {
242246
info!(
@@ -264,21 +268,24 @@ async fn process_update(
264268

265269
if let UpdateType::Protocol = update.update_type {
266270
{
267-
let mut pairs = protocol_pairs
271+
let mut current_state = tycho_state
268272
.write()
269-
.map_err(|e| miette!("Failed to acquire write lock on protocol pairs: {e}"))?;
270-
let prev_size = pairs.len();
273+
.map_err(|e| miette!("Failed to acquire write lock on Tycho state: {e}"))?;
271274
for (id, comp) in update.update.new_pairs.iter() {
272-
pairs.put(id.clone(), comp.clone());
273-
}
274-
let new_size = pairs.len();
275-
let cap = pairs.cap().get();
276-
if new_size != prev_size {
277-
info!(size=%new_size, capacity=%cap, "Protocol components cache updated");
275+
current_state
276+
.components
277+
.insert(id.clone(), comp.clone());
278+
current_state
279+
.component_ids_by_protocol
280+
.entry(comp.protocol_system.clone())
281+
.or_insert_with(HashSet::new)
282+
.insert(id.clone());
278283
}
279-
if new_size == cap {
280-
warn!(size=%new_size, capacity=%cap, "Protocol components cache reached capacity, \
281-
least recently used items will be evicted on new insertions");
284+
for (id, state) in update.update.states.iter() {
285+
// this overwrites existing entries
286+
current_state
287+
.states
288+
.insert(id.clone(), state.clone());
282289
}
283290
}
284291
// Record block processing latency
@@ -312,7 +319,7 @@ async fn process_update(
312319
metrics::record_protocol_sync_state(protocol, sync_state);
313320
}
314321

315-
// Process states in parallel
322+
// Process updated states in parallel
316323
let semaphore = Arc::new(Semaphore::new(cli.parallel_simulations as usize));
317324
let mut tasks = Vec::new();
318325

@@ -324,10 +331,11 @@ async fn process_update(
324331
{
325332
let component = match update.update_type {
326333
UpdateType::Protocol => {
327-
let mut pairs = protocol_pairs
328-
.write()
329-
.map_err(|e| miette!("Failed to acquire read lock on protocol pairs: {e}"))?;
330-
match pairs.get(id) {
334+
let states = &tycho_state
335+
.read()
336+
.map_err(|e| miette!("Failed to acquire read lock on Tycho state: {e}"))?
337+
.components;
338+
match states.get(id) {
331339
Some(comp) => comp.clone(),
332340
None => {
333341
warn!(id=%id, "Component not found in cached protocol pairs. Potential causes: \
@@ -366,6 +374,76 @@ async fn process_update(
366374
tasks.push(task);
367375
}
368376

377+
// Select states that were not updated in this block to test simulation and execution
378+
let selected_ids = {
379+
let current_state = tycho_state
380+
.read()
381+
.map_err(|e| miette!("Failed to acquire write lock on Tycho state: {e}"))?;
382+
383+
let mut all_selected_ids = Vec::new();
384+
for component_ids in current_state
385+
.component_ids_by_protocol
386+
.values()
387+
{
388+
// Filter out IDs that are in the current update
389+
let available_ids: Vec<_> = component_ids
390+
.iter()
391+
.filter(|id| !update.update.states.keys().contains(id))
392+
.cloned()
393+
.collect();
394+
395+
let protocol_selected_ids: Vec<_> = available_ids
396+
.choose_multiple(
397+
&mut rand::rng(),
398+
(cli.max_simulations_stale as usize).min(available_ids.len()),
399+
)
400+
.cloned()
401+
.collect();
402+
403+
all_selected_ids.extend(protocol_selected_ids);
404+
}
405+
all_selected_ids
406+
};
407+
408+
for id in selected_ids {
409+
let (component, state) = {
410+
let current_state = tycho_state
411+
.read()
412+
.map_err(|e| miette!("Failed to acquire read lock on Tycho state: {e}"))?;
413+
414+
match (current_state.components.get(&id), current_state.states.get(&id)) {
415+
(Some(comp), Some(state)) => (comp.clone(), state.clone()),
416+
(None, _) => {
417+
error!(id=%id, "Component not found in saved protocol components.");
418+
continue;
419+
}
420+
(_, None) => {
421+
error!(id=%id, "State not found in saved protocol states");
422+
continue;
423+
}
424+
}
425+
};
426+
427+
let block = block.clone();
428+
let state_id = id.clone();
429+
let state = state.clone_box();
430+
let permit = semaphore
431+
.clone()
432+
.acquire_owned()
433+
.await
434+
.into_diagnostic()
435+
.wrap_err("Failed to acquire permit")?;
436+
437+
let task = tokio::spawn(async move {
438+
let simulation_id = generate_simulation_id(&component.protocol_system, &state_id);
439+
let result =
440+
process_state(&simulation_id, chain, component, &block, state_id, state).await;
441+
drop(permit);
442+
result
443+
});
444+
tasks.push(task);
445+
}
446+
369447
let mut block_execution_info = HashMap::new();
370448

371449
for task in tasks {
@@ -504,6 +582,7 @@ async fn process_state(
504582
token_in = %token_in.address,
505583
token_out = %token_out.address,
506584
error = %format_error_chain(&e),
585+
state = ?state,
507586
"Get limits operation failed: {}", format_error_chain(&e)
508587
);
509588
metrics::record_get_limits_failure(&component.protocol_system);
@@ -548,6 +627,7 @@ async fn process_state(
548627
token_out = %token_out.address,
549628
amount_in = %amount_in,
550629
error = %format_error_chain(&e),
630+
state = ?state,
551631
"Get amount out operation failed: {}", format_error_chain(&e)
552632
);
553633
metrics::record_get_amount_out_failure(&component.protocol_system);

0 commit comments

Comments
 (0)