diff --git a/Cargo.lock b/Cargo.lock index 1f90e055fbe..8e8aebf9bf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5810,6 +5810,7 @@ dependencies = [ "thiserror 2.0.17", "time 0.3.44", "tokio", + "tokio-util", "tracing", "url", "vbs", diff --git a/crates/hotshot/task-impls/Cargo.toml b/crates/hotshot/task-impls/Cargo.toml index 230f1415c5d..dd4b7a10e84 100644 --- a/crates/hotshot/task-impls/Cargo.toml +++ b/crates/hotshot/task-impls/Cargo.toml @@ -43,6 +43,7 @@ tracing = { workspace = true } url = { workspace = true } vbs = { workspace = true } vec1 = { workspace = true } +tokio-util = { workspace = true } [lints] workspace = true diff --git a/crates/hotshot/task-impls/src/builder.rs b/crates/hotshot/task-impls/src/builder.rs index 2327df163e5..dd6a0e770ab 100644 --- a/crates/hotshot/task-impls/src/builder.rs +++ b/crates/hotshot/task-impls/src/builder.rs @@ -60,6 +60,7 @@ impl From for BuilderClientError { } /// Client for builder API +#[derive(Clone)] pub struct BuilderClient { /// Underlying surf_disco::Client for the legacy builder api client: Client, diff --git a/crates/hotshot/task-impls/src/transactions.rs b/crates/hotshot/task-impls/src/transactions.rs index 20e2711cc0b..661e4c57350 100644 --- a/crates/hotshot/task-impls/src/transactions.rs +++ b/crates/hotshot/task-impls/src/transactions.rs @@ -5,13 +5,13 @@ // along with the HotShot repository. If not, see . use std::{ + collections::HashMap, sync::Arc, time::{Duration, Instant}, }; use async_broadcast::{Receiver, Sender}; use async_trait::async_trait; -use futures::{stream::FuturesUnordered, StreamExt}; use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo; use hotshot_task::task::TaskState; use hotshot_types::{ @@ -29,7 +29,12 @@ use hotshot_types::{ utils::{is_epoch_transition, is_last_block, ViewInner}, }; use hotshot_utils::anytrace::*; -use tokio::time::{sleep, timeout}; +use tokio::{ + spawn, + task::JoinSet, + time::{sleep, timeout}, +}; +use tokio_util::task::AbortOnDropHandle; use tracing::instrument; use vbs::version::{StaticVersionType, Version}; @@ -41,17 +46,6 @@ use crate::{ // Parameters for builder querying algorithm -/// Proportion of builders queried in first batch, dividend -const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2; -/// Proportion of builders queried in the first batch, divisor -const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3; -/// Time the first batch of builders has to respond -const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700); -/// Multiplier for extra time to give to the second batch of builders -const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2; -/// Minimum amount of time allotted to both batches, cannot be cut shorter if the first batch -/// responds extremely fast. -const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300); /// Delay between re-tries on unsuccessful calls const RETRY_DELAY: Duration = Duration::from_millis(100); @@ -516,7 +510,7 @@ impl TransactionTaskState { match timeout( self.builder_timeout .saturating_sub(task_start_time.elapsed()), - self.block_from_builder(parent_comm, parent_view, &parent_comm_sig), + self.get_block(parent_comm, parent_view, &parent_comm_sig), ) .await { @@ -545,178 +539,207 @@ impl TransactionTaskState { None } - /// Query the builders for available blocks. Queries only fraction of the builders - /// based on the response time. - async fn get_available_blocks( + async fn get_block( &self, parent_comm: VidCommitment, view_number: TYPES::View, parent_comm_sig: &<::SignatureKey as SignatureKey>::PureAssembledSignatureType, - ) -> Vec<(AvailableBlockInfo, usize)> { - let tasks = self - .builder_clients - .iter() - .enumerate() - .map(|(builder_idx, client)| async move { - client - .available_blocks( - parent_comm, - view_number.u64(), - self.public_key.clone(), - parent_comm_sig, + ) -> anyhow::Result> { + // Create a `JoinSet` that joins tasks to get block information from all of the builder clients + let mut join_set = JoinSet::new(); + + // Create a map so we can later re-associate a task with its builder client + let mut task_to_client = HashMap::new(); + + // Spawn tasks to get block information from all of the builder clients simultaneously + for client in self.builder_clients.iter() { + // Clone the things we need in the closure + let public_key = self.public_key.clone(); + let parent_comm_sig = parent_comm_sig.clone(); + let client = client.clone(); + let client_clone = client.clone(); + + // Spawn the task to get block information from the builder client + let id = join_set + .spawn(async move { + Self::get_block_info_from_builder( + &client, + &public_key, + &parent_comm, + view_number, + &parent_comm_sig, ) .await - .map(move |blocks| { - blocks - .into_iter() - .map(move |block_info| (block_info, builder_idx)) - }) - }) - .collect::>(); - let mut results = Vec::with_capacity(self.builder_clients.len()); - let query_start = Instant::now(); - let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND) - .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR); - let mut tasks = tasks.take(threshold); - while let Some(result) = tasks.next().await { - results.push(result); - if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF { - break; - } + }) + .id(); + + // Add the task id to builder client mapping + task_to_client.insert(id, client_clone); } - let timeout = sleep(std::cmp::max( - query_start - .elapsed() - .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER), - BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()), - )); - futures::pin_mut!(timeout); - let mut tasks = tasks.into_inner().take_until(timeout); - while let Some(result) = tasks.next().await { - results.push(result); + + // We need this channel to deal with responses as they become completed. This is because the `JoinSet` doesn't + // return tasks in the order in which they completed if more than one was ready. In our scenario, if one fails, + // we still want to use the result from the next least latent builder. + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); + let _join_task = AbortOnDropHandle::new(spawn(async move { + while let Some(result) = join_set.join_next_with_id().await { + let _ = sender.send(result); + } + })); + + // The first builder to return block information should be the closest/least latent. It doesn't include + // the actual block itself, so we need to ask for it + while let Some(result) = receiver.recv().await { + // Match on the result to get the block information + let (task_id, block_info) = match result { + Ok((task_id, Ok(block_info))) => (task_id, block_info), + Ok((_, Err(err))) => { + tracing::warn!("Failed to get block info from builder: {err:#}"); + continue; + }, + Err(err) => { + tracing::warn!("Failed to join task: {err:#}"); + continue; + }, + }; + + // Get the builder client from the map + let client = task_to_client + .get(&task_id) + .ok_or_else(|| anyhow::anyhow!("missing builder client for task"))?; + + // For each block info, + for block_info in block_info { + // Get the actual block from the builder + let block = match Self::get_block_from_builder( + client, + &self.public_key, + &self.private_key, + view_number, + &block_info, + ) + .await + { + Ok(block) => block, + Err(err) => { + tracing::warn!("Failed to get block from builder: {err:#}"); + continue; + }, + }; + + // If we got here, we successfully claimed a valid block + return Ok(block); + } } - results - .into_iter() - .filter_map(|result| result.ok()) - .flatten() - .collect::>() + + Err(anyhow::anyhow!("no blocks were successfully claimed")) } - /// Get a block from builder. - /// Queries the sufficiently fast builders for available blocks and chooses the one with the - /// best fee/byte ratio, re-trying with the next best one in case of failure. - /// - /// # Errors - /// If none of the builder reports any available blocks or claiming block fails for all of the - /// builders. - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")] - async fn block_from_builder( - &self, - parent_comm: VidCommitment, + /// Get a block from the specified builder client. These blocks are validated and sorted by size in descending + /// order (so the largest block is first) + async fn get_block_info_from_builder( + client: &BuilderClientBase, + public_key: &TYPES::SignatureKey, + parent_comm: &VidCommitment, view_number: TYPES::View, parent_comm_sig: &<::SignatureKey as SignatureKey>::PureAssembledSignatureType, - ) -> Result> { - let mut available_blocks = self - .get_available_blocks(parent_comm, view_number, parent_comm_sig) - .await; - - available_blocks.sort_by(|(l, _), (r, _)| { - // We want the block with the highest fee per byte of data we're going to have to - // process, thus our comparison function is: - // (l.offered_fee / l.block_size) < (r.offered_fee / r.block_size) - // To avoid floating point math (which doesn't even have an `Ord` impl) we multiply - // through by the denominators to get - // l.offered_fee * r.block_size < r.offered_fee * l.block_size - // We cast up to u128 to avoid overflow. - (u128::from(l.offered_fee) * u128::from(r.block_size)) - .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size))) - }); + ) -> anyhow::Result>> { + // Get the available blocks from the builder + let mut available_blocks = client + .available_blocks( + *parent_comm, + view_number.u64(), + public_key.clone(), + parent_comm_sig, + ) + .await + .map_err(|e| anyhow::anyhow!("failed to get available blocks: {e:#}"))?; + // Return early if no blocks were available if available_blocks.is_empty() { - tracing::info!("No available blocks"); - bail!("No available blocks"); + return Err(anyhow::anyhow!("no blocks were available")); } - for (block_info, builder_idx) in available_blocks { - // Verify signature over chosen block. + // Retain only block info with valid signatures + available_blocks.retain(|block_info| { + // Validate the signature over the block info if !block_info.sender.validate_block_info_signature( &block_info.signature, block_info.block_size, block_info.offered_fee, &block_info.block_hash, ) { - tracing::warn!("Failed to verify available block info response message signature"); - continue; + tracing::warn!("Block info signature was invalid"); + return false; } + true + }); - let request_signature = match <::SignatureKey as SignatureKey>::sign( - &self.private_key, - block_info.block_hash.as_ref(), - ) { - Ok(request_signature) => request_signature, - Err(err) => { - tracing::error!(%err, "Failed to sign block hash"); - continue; - }, - }; - - let response = { - let client = &self.builder_clients[builder_idx]; + // Return early if none of them had valid signatures + if available_blocks.is_empty() { + anyhow::bail!("no valid block info was received"); + } - let (block, either_header_input) = futures::join! { - client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature), - client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature) - }; + // Sort the blocks by size in descending order so that larger blocks are first + available_blocks.sort_by(|a, b| b.block_size.cmp(&a.block_size)); - let block_data = match block { - Ok(block_data) => block_data, - Err(err) => { - tracing::warn!(%err, "Error claiming block data"); - continue; - }, - }; + // Return the information about the (first) largest block + Ok(available_blocks) + } - let Ok(either_header_input) = either_header_input - .inspect_err(|err| tracing::warn!(%err, "Error claiming header input")) - else { - continue; - }; + /// Get the actual block from the given builder + async fn get_block_from_builder( + client: &BuilderClientBase, + public_key: &TYPES::SignatureKey, + private_key: &::PrivateKey, + view_number: TYPES::View, + block_info: &AvailableBlockInfo, + ) -> anyhow::Result> { + // Sign the block hash that we're requesting + let request_signature = <::SignatureKey as SignatureKey>::sign( + private_key, + block_info.block_hash.as_ref(), + ) + .map_err(|err| anyhow::anyhow!("failed to sign block hash for claim request: {err:#}"))?; - let Some(header_input) = either_header_input - .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata) - else { - tracing::warn!( - "Failed to verify available new or legacy block header input data \ - response message signature" - ); - continue; - }; + // Claim both the block and the block header input + let (block, header_input) = futures::join! { + client.claim_block(block_info.block_hash.clone(), view_number.u64(), public_key.clone(), &request_signature), + client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), public_key.clone(), &request_signature) + }; - // verify the signature over the message - if !block_data.validate_signature() { - tracing::warn!( - "Failed to verify available block data response message signature" - ); - continue; - } + // Get the block + let block = block.map_err(|err| anyhow::anyhow!("failed to claim block: {err:#}"))?; - let fee = BuilderFee { - fee_amount: block_info.offered_fee, - fee_account: header_input.sender, - fee_signature: header_input.fee_signature, - }; + // Get the block header input + let header_input = header_input + .map_err(|err| anyhow::anyhow!("failed to claim block header input: {err:#}"))?; - BuilderResponse { - fee, - block_payload: block_data.block_payload, - metadata: block_data.metadata, - } - }; + // Validate the signature of the header input + let Some(header_input) = + header_input.validate_signature_and_get_input(block_info.offered_fee, &block.metadata) + else { + anyhow::bail!("failed to validate header input signature"); + }; - return Ok(response); + // Validate the block's signature + if !block.validate_signature() { + anyhow::bail!("failed to validate block signature"); } - bail!("Couldn't claim a block from any of the builders"); + // Create the builder fee + let fee = BuilderFee { + fee_amount: block_info.offered_fee, + fee_account: header_input.sender, + fee_signature: header_input.fee_signature, + }; + + // Create and return the response + Ok(BuilderResponse { + fee, + block_payload: block.block_payload, + metadata: block.metadata, + }) } }