Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

132 changes: 93 additions & 39 deletions tycho-integration-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod metrics;
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
str::FromStr,
sync::{Arc, RwLock},
time::Duration,
};
Expand Down Expand Up @@ -35,6 +36,7 @@ use tycho_test::{
protocol_stream_processor::ProtocolStreamProcessor,
rfq_stream_processor::RFQStreamProcessor, StreamUpdate, UpdateType,
},
validation::{batch_validate_components, get_validator, Validator},
};

#[derive(Parser, Clone)]
Expand Down Expand Up @@ -313,10 +315,11 @@ async fn process_update(
metrics::record_protocol_sync_state(protocol, sync_state);
}

// Process updated states in parallel
let semaphore = Arc::new(Semaphore::new(cli.parallel_simulations as usize));
let mut tasks = Vec::new();
// Collect all components to process (both updated and stale) for batch validation
let mut components_to_process: Vec<(String, ProtocolComponent, Box<dyn ProtocolSim>)> =
Vec::new();

// Collect updated components
for (id, state) in update
.update
.states
Expand All @@ -332,55 +335,39 @@ async fn process_update(
match states.get(id) {
Some(comp) => comp.clone(),
None => {
warn!(id=%id, "Component not found in cached protocol pairs. Potential causes: \
there was an error decoding the component, the component was evicted from the cache, \
or the component was never added to the cache. Skipping...");
warn!(id=%id, "Component not found in cached protocol pairs. Skipping...");
continue;
}
}
}
UpdateType::Rfq => match update.update.new_pairs.get(id) {
Some(comp) => comp.clone(),
None => {
warn!(id=%id, "Component not found in update's new pairs. Potential cause: \
the `states` and `new_pairs` lists don't contain the same items. Skipping...");
warn!(id=%id, "Component not found in update's new pairs. Skipping...");
continue;
}
},
};
let block = block.clone();
let state_id = id.clone();
let state = state.clone_box();
let permit = semaphore
.clone()
.acquire_owned()
.await
.into_diagnostic()
.wrap_err("Failed to acquire permit")?;

let task = tokio::spawn(async move {
let simulation_id = generate_simulation_id(&component.protocol_system, &state_id);
let result =
process_state(&simulation_id, chain, component, &block, state_id, state).await;
drop(permit);
result
});
tasks.push(task);
components_to_process.push((id.clone(), component, state.clone_box()));
}

// Select states that were not updated in this block to test simulation and execution
// Collect stale components (not updated in this block)
let selected_ids = {
let current_state = tycho_state
.read()
.map_err(|e| miette!("Failed to acquire write lock on Tycho state: {e}"))?;

let mut all_selected_ids = Vec::new();

// Add component IDs from always_test_components that are not in the current update
for component_id in &cli.always_test_components {
if !update.update.states.keys().contains(component_id)
// Ensure that the component exists in the Tycho DB
&& current_state.components.contains_key(component_id)
if !update
.update
.states
.keys()
.contains(component_id) &&
current_state
.components
.contains_key(component_id)
{
all_selected_ids.push(component_id.clone());
}
Expand All @@ -390,7 +377,6 @@ async fn process_update(
.component_ids_by_protocol
.values()
{
// Filter out IDs that are in the current update or already in all_selected_ids
let available_ids: Vec<_> = component_ids
.iter()
.filter(|id| {
Expand All @@ -412,13 +398,13 @@ async fn process_update(
all_selected_ids
};

for id in selected_ids {
for id in &selected_ids {
let (component, state) = {
let current_state = tycho_state
.read()
.map_err(|e| miette!("Failed to acquire read lock on Tycho state: {e}"))?;

match (current_state.components.get(&id), current_state.states.get(&id)) {
match (current_state.components.get(id), current_state.states.get(id)) {
(Some(comp), Some(state)) => (comp.clone(), state.clone()),
(None, _) => {
error!(id=%id, "Component not found in saved protocol components.");
Expand All @@ -430,10 +416,79 @@ async fn process_update(
}
}
};
components_to_process.push((id.clone(), component, state.clone_box()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put all of this component selecting in one big method? I think it fits nicely together now!

}

// Collect components that implement Validator for batch validation
let mut validator_components: Vec<(
&dyn Validator,
tycho_common::Bytes,
Vec<tycho_common::models::token::Token>,
String, // protocol_system
)> = Vec::new();

for (id, component, state) in &components_to_process {
let component_id = tycho_common::Bytes::from_str(id)
.unwrap_or_else(|_| tycho_common::Bytes::from(id.as_bytes()));

if let Some(validator) = get_validator(&component.protocol_system, state.as_ref()) {
validator_components.push((
validator,
component_id,
component.tokens.clone(),
component.protocol_system.clone(),
));
}
}

// Batch validate all components of this block in a single call
if !validator_components.is_empty() {
// Extract just the validator data (without protocol_system) for batch_validate_components
// TODO do this neater
let validator_data: Vec<_> = validator_components
.iter()
.map(|(validator, id, tokens, _protocol)| (*validator, id.clone(), tokens.clone()))
.collect();

let results =
batch_validate_components(&cli.rpc_url, &validator_data, block.header.number).await;

for (i, result) in results.iter().enumerate() {
let component_id = &validator_components[i].1;
let protocol = &validator_components[i].3;
match result {
Ok(passed) => {
if *passed {
info!(
component_id = %component_id,
"State validation passed"
);
} else {
error!(
component_id = %component_id,
"State validation failed"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we log here the delta message? 🤔

);
metrics::record_validation_failure(protocol);
}
}
Err(e) => {
error!(
component_id = %component_id,
error = %e,
"Error validating component"
);
metrics::record_validation_failure(protocol);
}
}
}
}

// Process all components (updated and stale) in parallel
let semaphore = Arc::new(Semaphore::new(cli.parallel_simulations as usize));
let mut tasks = Vec::new();

for (id, component, state) in components_to_process {
let block = block.clone();
let state_id = id.clone();
let state = state.clone_box();
let permit = semaphore
.clone()
.acquire_owned()
Expand All @@ -442,9 +497,8 @@ async fn process_update(
.wrap_err("Failed to acquire permit")?;

let task = tokio::spawn(async move {
let simulation_id = generate_simulation_id(&component.protocol_system, &state_id);
let result =
process_state(&simulation_id, chain, component, &block, state_id, state).await;
let simulation_id = generate_simulation_id(&component.protocol_system, &id);
let result = process_state(&simulation_id, chain, component, &block, id, state).await;
drop(permit);
result
});
Expand Down
13 changes: 13 additions & 0 deletions tycho-integration-test/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ pub fn initialize_metrics() {
"tycho_integration_protocol_update_block_delay_blocks",
"Number of blocks behind current that protocol updates are received"
);
describe_counter!(
"tycho_integration_validation_failures_total",
"Total number of failed state validations"
);
}

/// Record the duration between block timestamp and component receipt
Expand Down Expand Up @@ -162,6 +166,15 @@ pub fn record_protocol_update_block_delay(block_delay: u64) {
histogram!("tycho_integration_protocol_update_block_delay_blocks").record(block_delay as f64);
}

/// Record a failed validation
pub fn record_validation_failure(protocol: &str) {
counter!(
"tycho_integration_validation_failures_total",
"protocol" => protocol.to_string(),
)
.increment(1);
}

/// Creates and runs the Prometheus metrics exporter using Actix Web.
/// Returns a JoinHandle that should be awaited to detect server failures.
pub async fn create_metrics_exporter(port: u16) -> Result<tokio::task::JoinHandle<Result<()>>> {
Expand Down
2 changes: 2 additions & 0 deletions tycho-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ edition = "2021"
alloy = { workspace = true, features = ["rpc-types-eth", "sol-types"] }
alloy-chains = "0.2.6"
alloy-rpc-types-trace = "1.0.38"
async-trait = "0.1.88"
colored = "3.0.0"
dotenv = { workspace = true }
futures = { workspace = true }
hex = "0.4.3"
itertools = { workspace = true }
lru = "0.16.1"
num-bigint = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions tycho-test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod execution;
pub mod rpc_tools;
pub mod stream_processor;
pub mod validation;
pub use rpc_tools::RPCTools;
Loading