-
Notifications
You must be signed in to change notification settings - Fork 27
bitswap: Split block responses into batches under 2 MiB #516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
4b6a21c
a287a3f
4a30464
2fa321c
6aa634e
49f7fdc
811d82a
2527ad9
b1e01f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ use crate::{ | |
| PeerId, | ||
| }; | ||
|
|
||
| use bytes::Bytes; | ||
| use cid::{Cid, Version}; | ||
| use prost::Message; | ||
| use tokio::sync::mpsc::{Receiver, Sender}; | ||
|
|
@@ -40,7 +41,7 @@ pub use config::Config; | |
| pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType}; | ||
| pub use schema::bitswap::{wantlist::WantType, BlockPresenceType}; | ||
| use std::{ | ||
| collections::{hash_map::Entry, HashMap, HashSet}, | ||
| collections::{hash_map::Entry, vec_deque::Drain, HashMap, HashSet, VecDeque}, | ||
| time::Duration, | ||
| }; | ||
|
|
||
|
|
@@ -535,16 +536,102 @@ async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) -> | |
| } | ||
|
|
||
| async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) -> Result<(), Error> { | ||
| let mut response = schema::bitswap::Message { | ||
| // `wantlist` field must always be present. This is what the official Kubo | ||
| // IPFS implementation does. | ||
| // Send presences in a separate message to not deal with it when batching blocks below. | ||
| if let Some((message, cid_count)) = | ||
| presences_message(entries.iter().filter_map(|entry| match entry { | ||
| ResponseType::Presence { cid, presence } => Some((*cid, *presence)), | ||
| ResponseType::Block { .. } => None, | ||
| })) | ||
| { | ||
| if message.len() <= config::MAX_MESSAGE_SIZE { | ||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| cid_count, | ||
| "sending Bitswap presence message", | ||
| ); | ||
| match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { | ||
| Err(_) => return Err(Error::Timeout), | ||
| Ok(Err(e)) => return Err(Error::SubstreamError(e)), | ||
| Ok(Ok(())) => {} | ||
| } | ||
| } else { | ||
| // This should never happen in practice, but log a warning if the presence message | ||
| // exceeded [`config::MAX_MESSAGE_SIZE`]. | ||
| tracing::warn!( | ||
| target: LOG_TARGET, | ||
| size = message.len(), | ||
| max_size = config::MAX_MESSAGE_SIZE, | ||
| "outgoing Bitswap presence message exceeded max size", | ||
| ); | ||
|
Comment on lines
+546
to
+565
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this is similar to the block below, maybe we can group them to avoid duplications?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't think of a good way of de-duplicating this, the data and the log messages are different.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well, I agree with Alex, this could be maybe de-duplicated. In both cases, you are building Basically, we could reuse |
||
| } | ||
| } | ||
|
|
||
| // Send blocks in batches of up to [`config::MAX_BATCH_SIZE`] bytes. | ||
| let mut blocks = entries | ||
| .into_iter() | ||
| .filter_map(|entry| match entry { | ||
| ResponseType::Block { cid, block } => Some((cid, block)), | ||
| ResponseType::Presence { .. } => None, | ||
| }) | ||
| .collect::<VecDeque<_>>(); | ||
|
|
||
| while let Some(batch) = extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) { | ||
| if let Some((message, block_count)) = blocks_message(batch) { | ||
| if message.len() <= config::MAX_MESSAGE_SIZE { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically, this IF does not make sense, if we ensure that MAX_BATCH_SIZE < MAX_MESSAGE_SIZE
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is highly unlikely in practice, but due to protobuf overhead message size can be > |
||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| block_count, | ||
| "sending Bitswap blocks message", | ||
| ); | ||
| match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { | ||
| Err(_) => return Err(Error::Timeout), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dq: This will drop any remaining block because we consider the connection as unhealthy? However, this might be expected under heavy load that the protocol handle will not be able to keep up with messages. Maybe we can propagate this to higher levels or is the timeout error sufficient to retry later on?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good question I don't have the right an answer to. Indeed the entire response will be dropped. I am inclined to think that if the protocol handle is not able to keep up with messages, we shouldn't retry sending. Instead it will be up to the Bitswap client code to handle the timeout and repeat the query, may be querying another peer. It would be good to investigate how Kubo handles this and if this will automatically work.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| Ok(Err(e)) => return Err(Error::SubstreamError(e)), | ||
| Ok(Ok(())) => {} | ||
| } | ||
| } else { | ||
| // This should never happen in practice, but log a warning if the blocks message | ||
| // exceeded [`config::MAX_MESSAGE_SIZE`]. | ||
| tracing::warn!( | ||
| target: LOG_TARGET, | ||
| size = message.len(), | ||
dmitry-markin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| max_size = config::MAX_MESSAGE_SIZE, | ||
| "outgoing Bitswap blocks message exceeded max size", | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn presences_message( | ||
| presences: impl IntoIterator<Item = (Cid, BlockPresenceType)>, | ||
| ) -> Option<(Bytes, usize)> { | ||
| let message = schema::bitswap::Message { | ||
| // Set wantlist to not cause null pointer dereference in older versions of Kubo. | ||
| wantlist: Some(Default::default()), | ||
| block_presences: presences | ||
| .into_iter() | ||
| .map(|(cid, presence)| schema::bitswap::BlockPresence { | ||
| cid: cid.to_bytes(), | ||
| r#type: presence as i32, | ||
| }) | ||
| .collect(), | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| for entry in entries { | ||
| match entry { | ||
| ResponseType::Block { cid, block } => { | ||
| let count = message.block_presences.len(); | ||
|
|
||
| (count > 0).then(|| (message.encode_to_vec().into(), count)) | ||
| } | ||
|
|
||
| fn blocks_message(blocks: impl IntoIterator<Item = (Cid, Vec<u8>)>) -> Option<(Bytes, usize)> { | ||
| let message = schema::bitswap::Message { | ||
| // Set wantlist to not cause null pointer dereference in older versions of Kubo. | ||
| wantlist: Some(Default::default()), | ||
| payload: blocks | ||
| .into_iter() | ||
| .map(|(cid, block)| { | ||
| let prefix = Prefix { | ||
| version: cid.version(), | ||
| codec: cid.codec(), | ||
|
|
@@ -553,24 +640,179 @@ async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) -> | |
| } | ||
| .to_bytes(); | ||
|
|
||
| response.payload.push(schema::bitswap::Block { | ||
| schema::bitswap::Block { | ||
| prefix, | ||
| data: block, | ||
| }); | ||
| } | ||
| ResponseType::Presence { cid, presence } => { | ||
| response.block_presences.push(schema::bitswap::BlockPresence { | ||
| cid: cid.to_bytes(), | ||
| r#type: presence as i32, | ||
| }); | ||
| } | ||
| }) | ||
| .collect(), | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| let count = message.payload.len(); | ||
|
|
||
| (count > 0).then(|| (message.encode_to_vec().into(), count)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does Bitswap work, shouldn't we send also empty Message? Doesn't it cause disconnect or something?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we passed an empty iterator (e.g., no block presence at all), it doesn't make sense to send empty message. |
||
| } | ||
|
|
||
| /// Extract a batch of blocks of no more than `max_size` from `blocks`. | ||
| /// Returns `None` if no more blocks are left. | ||
| fn extract_next_batch<'a>( | ||
| blocks: &'a mut VecDeque<(Cid, Vec<u8>)>, | ||
| max_size: usize, | ||
dmitry-markin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) -> Option<Drain<'a, (Cid, Vec<u8>)>> { | ||
| // Get rid of oversized blocks to not stall the processing by not being able to queue them. | ||
| loop { | ||
| if let Some(block) = blocks.front() { | ||
| if block.1.len() > max_size { | ||
| tracing::warn!( | ||
| target: LOG_TARGET, | ||
| size = block.1.len(), | ||
| max_size, | ||
dmitry-markin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "outgoing Bitswap block exceeded max batch size", | ||
| ); | ||
| blocks.pop_front(); | ||
| } else { | ||
| break; | ||
| } | ||
| } else { | ||
| return None; | ||
| } | ||
| } | ||
|
|
||
| let message = response.encode_to_vec().into(); | ||
| match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { | ||
| Err(_) => Err(Error::Timeout), | ||
| Ok(Err(e)) => Err(Error::SubstreamError(e)), | ||
| Ok(Ok(())) => Ok(()), | ||
| // Determine how many blocks we can batch. Note that we can always batch at least one | ||
| // block due to check above. | ||
| let mut total_size = 0; | ||
| let mut block_count = 0; | ||
|
|
||
| for b in blocks.iter() { | ||
| let next_block_size = b.1.len(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this batching also count with
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it's only about total block size. As per spec, 2 MiB blocks must go through, so the actual message size will be higher. |
||
| if total_size + next_block_size > max_size { | ||
| break; | ||
| } | ||
| total_size += next_block_size; | ||
| block_count += 1; | ||
| } | ||
|
|
||
| Some(blocks.drain(..block_count)) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use cid::multihash::Multihash; | ||
|
|
||
| use super::*; | ||
|
|
||
| fn cid(block: &[u8]) -> Cid { | ||
| let codec = 0x55; | ||
| let multihash = Code::Sha2_256.digest(block); | ||
| let multihash = | ||
| Multihash::wrap(multihash.code(), multihash.digest()).expect("to be valid multihash"); | ||
|
|
||
| Cid::new_v1(codec, multihash) | ||
| } | ||
|
|
||
| #[test] | ||
| fn extract_next_batch_fits_max_size() { | ||
| let max_size = 100; | ||
|
|
||
| let block1 = vec![0x01; 10]; | ||
| let block2 = vec![0x02; 10]; | ||
| let block3 = vec![0x03; 10]; | ||
|
|
||
| let blocks = vec![ | ||
| (cid(&block1), block1), | ||
| (cid(&block2), block2), | ||
| (cid(&block3), block3), | ||
| ]; | ||
| let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>(); | ||
|
|
||
| let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); | ||
| assert_eq!(batch.collect::<Vec<_>>(), blocks); | ||
|
|
||
| assert!(extract_next_batch(&mut blocks_deque, max_size).is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn extract_next_batch_chunking_exact() { | ||
| let max_size = 20; | ||
|
|
||
| let block1 = vec![0x01; 10]; | ||
| let block2 = vec![0x02; 10]; | ||
| let block3 = vec![0x03; 10]; | ||
|
|
||
| let blocks = vec![ | ||
| (cid(&block1), block1.clone()), | ||
| (cid(&block2), block2.clone()), | ||
| (cid(&block3), block3.clone()), | ||
| ]; | ||
| let chunk1 = vec![ | ||
| (cid(&block1), block1.clone()), | ||
| (cid(&block2), block2.clone()), | ||
| ]; | ||
| let chunk2 = vec![(cid(&block3), block3.clone())]; | ||
| let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>(); | ||
|
|
||
| let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); | ||
| assert_eq!(batch.collect::<Vec<_>>(), chunk1); | ||
|
|
||
| let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); | ||
| assert_eq!(batch.collect::<Vec<_>>(), chunk2); | ||
|
|
||
| assert!(extract_next_batch(&mut blocks_deque, max_size).is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn extract_next_batch_chunking_less_than() { | ||
| let max_size = 20; | ||
|
|
||
| let block1 = vec![0x01; 10]; | ||
| let block2 = vec![0x02; 9]; | ||
| let block3 = vec![0x03; 10]; | ||
|
|
||
| let blocks = vec![ | ||
| (cid(&block1), block1.clone()), | ||
| (cid(&block2), block2.clone()), | ||
| (cid(&block3), block3.clone()), | ||
| ]; | ||
| let chunk1 = vec![ | ||
| (cid(&block1), block1.clone()), | ||
| (cid(&block2), block2.clone()), | ||
| ]; | ||
| let chunk2 = vec![(cid(&block3), block3.clone())]; | ||
| let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>(); | ||
|
|
||
| let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); | ||
| assert_eq!(batch.collect::<Vec<_>>(), chunk1); | ||
|
|
||
| let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); | ||
| assert_eq!(batch.collect::<Vec<_>>(), chunk2); | ||
|
|
||
| assert!(extract_next_batch(&mut blocks_deque, max_size).is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn extract_next_batch_oversized_blocks_discarded() { | ||
| let max_size = 20; | ||
|
|
||
| let block1 = vec![0x01; 10]; | ||
| let block2 = vec![0x02; 101]; | ||
| let block3 = vec![0x03; 10]; | ||
|
|
||
| let blocks = vec![ | ||
| (cid(&block1), block1.clone()), | ||
| (cid(&block2), block2.clone()), | ||
| (cid(&block3), block3.clone()), | ||
| ]; | ||
| let chunk1 = vec![(cid(&block1), block1.clone())]; | ||
| let chunk2 = vec![(cid(&block3), block3.clone())]; | ||
| let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>(); | ||
|
|
||
| let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); | ||
| assert_eq!(batch.collect::<Vec<_>>(), chunk1); | ||
|
|
||
| let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap(); | ||
| assert_eq!(batch.collect::<Vec<_>>(), chunk2); | ||
|
|
||
| assert!(extract_next_batch(&mut blocks_deque, max_size).is_none()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rosarp probably this constant was the reason, why 2 MiB didn't work :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past it was exactly 2 MiB, but due to prefix and protobuf overhead the message didn't fit.