diff --git a/src/protocol/libp2p/bitswap/config.rs b/src/protocol/libp2p/bitswap/config.rs index 1156aeca..b5ce71a4 100644 --- a/src/protocol/libp2p/bitswap/config.rs +++ b/src/protocol/libp2p/bitswap/config.rs @@ -30,9 +30,13 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; /// IPFS Bitswap protocol name as a string. pub const PROTOCOL_NAME: &str = "/ipfs/bitswap/1.2.0"; -/// Maximum Size for `/ipfs/bitswap/1.2.0` substream payload. Note this is bigger than 2 MiB max -/// block size to account for protobuf message overhead. -const MAX_PAYLOAD_SIZE: usize = 2_100_000; +/// Maximum size for `/ipfs/bitswap/1.2.0` substream message. Includes enough room for protobuf +/// overhead. Enforced on the transport level. +pub const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024; + +/// Maximum batch size of all blocks in a single Bitswap message combined. Enforced on the +/// application protocol level. +pub const MAX_BATCH_SIZE: usize = 2 * 1024 * 1024; /// Bitswap configuration. #[derive(Debug)] @@ -61,7 +65,7 @@ impl Config { cmd_rx, event_tx, protocol: ProtocolName::from(PROTOCOL_NAME), - codec: ProtocolCodec::UnsignedVarint(Some(MAX_PAYLOAD_SIZE)), + codec: ProtocolCodec::UnsignedVarint(Some(MAX_MESSAGE_SIZE)), }, BitswapHandle::new(event_rx, cmd_tx), ) diff --git a/src/protocol/libp2p/bitswap/mod.rs b/src/protocol/libp2p/bitswap/mod.rs index fea1cc54..9c4cac9c 100644 --- a/src/protocol/libp2p/bitswap/mod.rs +++ b/src/protocol/libp2p/bitswap/mod.rs @@ -31,6 +31,7 @@ use crate::{ PeerId, }; +use bytes::Bytes; use cid::{Cid, Version}; use prost::Message; use tokio::sync::mpsc::{Receiver, Sender}; @@ -40,7 +41,7 @@ pub use config::Config; pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType}; pub use schema::bitswap::{wantlist::WantType, BlockPresenceType}; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, vec_deque::Drain, HashMap, HashSet, VecDeque}, time::Duration, }; @@ -535,16 +536,102 @@ async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) -> } async fn send_response(substream: &mut Substream, entries: Vec) -> Result<(), Error> { - let mut response = schema::bitswap::Message { - // `wantlist` field must always be present. This is what the official Kubo - // IPFS implementation does. + // Send presences in a separate message to not deal with it when batching blocks below. + if let Some((message, cid_count)) = + presences_message(entries.iter().filter_map(|entry| match entry { + ResponseType::Presence { cid, presence } => Some((*cid, *presence)), + ResponseType::Block { .. } => None, + })) + { + if message.len() <= config::MAX_MESSAGE_SIZE { + tracing::trace!( + target: LOG_TARGET, + cid_count, + "sending Bitswap presence message", + ); + match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { + Err(_) => return Err(Error::Timeout), + Ok(Err(e)) => return Err(Error::SubstreamError(e)), + Ok(Ok(())) => {} + } + } else { + // This should never happen in practice, but log a warning if the presence message + // exceeded [`config::MAX_MESSAGE_SIZE`]. + tracing::warn!( + target: LOG_TARGET, + size = message.len(), + max_size = config::MAX_MESSAGE_SIZE, + "outgoing Bitswap presence message exceeded max size", + ); + } + } + + // Send blocks in batches of up to [`config::MAX_BATCH_SIZE`] bytes. + let mut blocks = entries + .into_iter() + .filter_map(|entry| match entry { + ResponseType::Block { cid, block } => Some((cid, block)), + ResponseType::Presence { .. } => None, + }) + .collect::>(); + + while let Some(batch) = extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) { + if let Some((message, block_count)) = blocks_message(batch) { + if message.len() <= config::MAX_MESSAGE_SIZE { + tracing::trace!( + target: LOG_TARGET, + block_count, + "sending Bitswap blocks message", + ); + match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { + Err(_) => return Err(Error::Timeout), + Ok(Err(e)) => return Err(Error::SubstreamError(e)), + Ok(Ok(())) => {} + } + } else { + // This should never happen in practice, but log a warning if the blocks message + // exceeded [`config::MAX_MESSAGE_SIZE`]. + tracing::warn!( + target: LOG_TARGET, + size = message.len(), + max_size = config::MAX_MESSAGE_SIZE, + "outgoing Bitswap blocks message exceeded max size", + ); + } + } + } + + Ok(()) +} + +fn presences_message( + presences: impl IntoIterator, +) -> Option<(Bytes, usize)> { + let message = schema::bitswap::Message { + // Set wantlist to not cause null pointer dereference in older versions of Kubo. wantlist: Some(Default::default()), + block_presences: presences + .into_iter() + .map(|(cid, presence)| schema::bitswap::BlockPresence { + cid: cid.to_bytes(), + r#type: presence as i32, + }) + .collect(), ..Default::default() }; - for entry in entries { - match entry { - ResponseType::Block { cid, block } => { + let count = message.block_presences.len(); + + (count > 0).then(|| (message.encode_to_vec().into(), count)) +} + +fn blocks_message(blocks: impl IntoIterator)>) -> Option<(Bytes, usize)> { + let message = schema::bitswap::Message { + // Set wantlist to not cause null pointer dereference in older versions of Kubo. + wantlist: Some(Default::default()), + payload: blocks + .into_iter() + .map(|(cid, block)| { let prefix = Prefix { version: cid.version(), codec: cid.codec(), @@ -553,24 +640,180 @@ async fn send_response(substream: &mut Substream, entries: Vec) -> } .to_bytes(); - response.payload.push(schema::bitswap::Block { + schema::bitswap::Block { prefix, data: block, - }); - } - ResponseType::Presence { cid, presence } => { - response.block_presences.push(schema::bitswap::BlockPresence { - cid: cid.to_bytes(), - r#type: presence as i32, - }); + } + }) + .collect(), + ..Default::default() + }; + + let count = message.payload.len(); + + (count > 0).then(|| (message.encode_to_vec().into(), count)) +} + +/// Extract a batch of blocks of no more than `max_size` from `blocks`. +/// Returns `None` if no more blocks are left. +fn extract_next_batch<'a>( + blocks: &'a mut VecDeque<(Cid, Vec)>, + max_batch_size: usize, +) -> Option)>> { + // Get rid of oversized blocks to not stall the processing by not being able to queue them. + loop { + if let Some(block) = blocks.front() { + if block.1.len() > max_batch_size { + tracing::warn!( + target: LOG_TARGET, + cid = block.0.to_string(), + size = block.1.len(), + max_batch_size, + "outgoing Bitswap block exceeded max batch size", + ); + blocks.pop_front(); + } else { + break; } + } else { + return None; } } - let message = response.encode_to_vec().into(); - match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { - Err(_) => Err(Error::Timeout), - Ok(Err(e)) => Err(Error::SubstreamError(e)), - Ok(Ok(())) => Ok(()), + // Determine how many blocks we can batch. Note that we can always batch at least one + // block due to check above. + let mut total_size = 0; + let mut block_count = 0; + + for b in blocks.iter() { + let next_block_size = b.1.len(); + if total_size + next_block_size > max_batch_size { + break; + } + total_size += next_block_size; + block_count += 1; + } + + Some(blocks.drain(..block_count)) +} + +#[cfg(test)] +mod tests { + use cid::multihash::Multihash; + + use super::*; + + fn cid(block: &[u8]) -> Cid { + let codec = 0x55; + let multihash = Code::Sha2_256.digest(block); + let multihash = + Multihash::wrap(multihash.code(), multihash.digest()).expect("to be valid multihash"); + + Cid::new_v1(codec, multihash) + } + + #[test] + fn extract_next_batch_fits_max_size() { + let max_size = 100; + + let block1 = vec![0x01; 10]; + let block2 = vec![0x02; 10]; + let block3 = vec![0x03; 10]; + + let blocks = vec![ + (cid(&block1), block1), + (cid(&block2), block2), + (cid(&block3), block3), + ]; + let mut blocks_deque = blocks.iter().cloned().collect::>(); + + let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); + assert_eq!(batch.collect::>(), blocks); + + assert!(extract_next_batch(&mut blocks_deque, max_size).is_none()); + } + + #[test] + fn extract_next_batch_chunking_exact() { + let max_size = 20; + + let block1 = vec![0x01; 10]; + let block2 = vec![0x02; 10]; + let block3 = vec![0x03; 10]; + + let blocks = vec![ + (cid(&block1), block1.clone()), + (cid(&block2), block2.clone()), + (cid(&block3), block3.clone()), + ]; + let chunk1 = vec![ + (cid(&block1), block1.clone()), + (cid(&block2), block2.clone()), + ]; + let chunk2 = vec![(cid(&block3), block3.clone())]; + let mut blocks_deque = blocks.iter().cloned().collect::>(); + + let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); + assert_eq!(batch.collect::>(), chunk1); + + let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); + assert_eq!(batch.collect::>(), chunk2); + + assert!(extract_next_batch(&mut blocks_deque, max_size).is_none()); + } + + #[test] + fn extract_next_batch_chunking_less_than() { + let max_size = 20; + + let block1 = vec![0x01; 10]; + let block2 = vec![0x02; 9]; + let block3 = vec![0x03; 10]; + + let blocks = vec![ + (cid(&block1), block1.clone()), + (cid(&block2), block2.clone()), + (cid(&block3), block3.clone()), + ]; + let chunk1 = vec![ + (cid(&block1), block1.clone()), + (cid(&block2), block2.clone()), + ]; + let chunk2 = vec![(cid(&block3), block3.clone())]; + let mut blocks_deque = blocks.iter().cloned().collect::>(); + + let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); + assert_eq!(batch.collect::>(), chunk1); + + let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); + assert_eq!(batch.collect::>(), chunk2); + + assert!(extract_next_batch(&mut blocks_deque, max_size).is_none()); + } + + #[test] + fn extract_next_batch_oversized_blocks_discarded() { + let max_size = 20; + + let block1 = vec![0x01; 10]; + let block2 = vec![0x02; 101]; + let block3 = vec![0x03; 10]; + + let blocks = vec![ + (cid(&block1), block1.clone()), + (cid(&block2), block2.clone()), + (cid(&block3), block3.clone()), + ]; + let chunk1 = vec![(cid(&block1), block1.clone())]; + let chunk2 = vec![(cid(&block3), block3.clone())]; + let mut blocks_deque = blocks.iter().cloned().collect::>(); + + let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); + assert_eq!(batch.collect::>(), chunk1); + + let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); + assert_eq!(batch.collect::>(), chunk2); + + assert!(extract_next_batch(&mut blocks_deque, max_size).is_none()); } }