Skip to content
12 changes: 8 additions & 4 deletions src/protocol/libp2p/bitswap/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
/// IPFS Bitswap protocol name as a string.
pub const PROTOCOL_NAME: &str = "/ipfs/bitswap/1.2.0";

/// Maximum Size for `/ipfs/bitswap/1.2.0` substream payload. Note this is bigger than 2 MiB max
/// block size to account for protobuf message overhead.
const MAX_PAYLOAD_SIZE: usize = 2_100_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rosarp probably this constant was the reason, why 2 MiB didn't work :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past it was exactly 2 MiB, but due to prefix and protobuf overhead the message didn't fit.

/// Maximum size for `/ipfs/bitswap/1.2.0` substream message. Includes enough room for protobuf
/// overhead. Enforced on the transport level.
pub const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;

/// Maximum batch size of all blocks in a single Bitswap message combined. Enforced on the
/// application protocol level.
pub const MAX_BATCH_SIZE: usize = 2 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just thinking, what about adding both to the Config:

pub struct Config {
    /// Protocol name.
    pub(crate) protocol: ProtocolName,
    /// Protocol codec.
    pub(crate) codec: ProtocolCodec,
    /// TX channel for sending events to the user protocol.
    pub(super) event_tx: Sender<BitswapEvent>,
    /// RX channel for receiving commands from the user.
    pub(super) cmd_rx: Receiver<BitswapCommand>,
    
    pub max_message_size: usize,
    pub max_batch_size: usize,
}

And use those constants for default value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 MiB are defined in the protocol spec, so i don't think we should add the possibility to "tune" it.


/// Bitswap configuration.
#[derive(Debug)]
Expand Down Expand Up @@ -61,7 +65,7 @@ impl Config {
cmd_rx,
event_tx,
protocol: ProtocolName::from(PROTOCOL_NAME),
codec: ProtocolCodec::UnsignedVarint(Some(MAX_PAYLOAD_SIZE)),
codec: ProtocolCodec::UnsignedVarint(Some(MAX_MESSAGE_SIZE)),
},
BitswapHandle::new(event_rx, cmd_tx),
)
Expand Down
283 changes: 263 additions & 20 deletions src/protocol/libp2p/bitswap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
PeerId,
};

use bytes::Bytes;
use cid::{Cid, Version};
use prost::Message;
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -40,7 +41,7 @@ pub use config::Config;
pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType};
pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{hash_map::Entry, vec_deque::Drain, HashMap, HashSet, VecDeque},
time::Duration,
};

Expand Down Expand Up @@ -535,16 +536,102 @@ async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) ->
}

async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) -> Result<(), Error> {
let mut response = schema::bitswap::Message {
// `wantlist` field must always be present. This is what the official Kubo
// IPFS implementation does.
// Send presences in a separate message to not deal with it when batching blocks below.
if let Some((message, cid_count)) =
presences_message(entries.iter().filter_map(|entry| match entry {
ResponseType::Presence { cid, presence } => Some((*cid, *presence)),
ResponseType::Block { .. } => None,
}))
{
if message.len() <= config::MAX_MESSAGE_SIZE {
tracing::trace!(
target: LOG_TARGET,
cid_count,
"sending Bitswap presence message",
);
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
Err(_) => return Err(Error::Timeout),
Ok(Err(e)) => return Err(Error::SubstreamError(e)),
Ok(Ok(())) => {}
}
} else {
// This should never happen in practice, but log a warning if the presence message
// exceeded [`config::MAX_MESSAGE_SIZE`].
tracing::warn!(
target: LOG_TARGET,
size = message.len(),
max_size = config::MAX_MESSAGE_SIZE,
"outgoing Bitswap presence message exceeded max size",
);
Comment on lines +546 to +565
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is similar to the block below, maybe we can group them to avoid duplications?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a good way of de-duplicating this, the data and the log messages are different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I agree with Alex, this could be maybe de-duplicated. In both cases, you are building let message = schema::bitswap::Message, so when building it we could check MAX_MESSAGE_SIZE or MAX_BATCH_SIZE.

Basically, we could reuse extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) for both - maybe with closure which would return length (for presence - 2 bytes?, for block real length)

}
}

// Send blocks in batches of up to [`config::MAX_BATCH_SIZE`] bytes.
let mut blocks = entries
.into_iter()
.filter_map(|entry| match entry {
ResponseType::Block { cid, block } => Some((cid, block)),
ResponseType::Presence { .. } => None,
})
.collect::<VecDeque<_>>();

while let Some(batch) = extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) {
if let Some((message, block_count)) = blocks_message(batch) {
if message.len() <= config::MAX_MESSAGE_SIZE {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, this IF does not make sense, if we ensure that MAX_BATCH_SIZE < MAX_MESSAGE_SIZE

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is highly unlikely in practice, but due to protobuf overhead message size can be > MAX_BATCH_SIZE.

tracing::trace!(
target: LOG_TARGET,
block_count,
"sending Bitswap blocks message",
);
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
Err(_) => return Err(Error::Timeout),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: This will drop any remaining block because we consider the connection as unhealthy?

However, this might be expected under heavy load that the protocol handle will not be able to keep up with messages. Maybe we can propagate this to higher levels or is the timeout error sufficient to retry later on?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question I don't have the right an answer to. Indeed the entire response will be dropped. I am inclined to think that if the protocol handle is not able to keep up with messages, we shouldn't retry sending. Instead it will be up to the Bitswap client code to handle the timeout and repeat the query, may be querying another peer.

It would be good to investigate how Kubo handles this and if this will automatically work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok(Err(e)) => return Err(Error::SubstreamError(e)),
Ok(Ok(())) => {}
}
} else {
// This should never happen in practice, but log a warning if the blocks message
// exceeded [`config::MAX_MESSAGE_SIZE`].
tracing::warn!(
target: LOG_TARGET,
size = message.len(),
max_size = config::MAX_MESSAGE_SIZE,
"outgoing Bitswap blocks message exceeded max size",
);
}
}
}

Ok(())
}

fn presences_message(
presences: impl IntoIterator<Item = (Cid, BlockPresenceType)>,
) -> Option<(Bytes, usize)> {
let message = schema::bitswap::Message {
// Set wantlist to not cause null pointer dereference in older versions of Kubo.
wantlist: Some(Default::default()),
block_presences: presences
.into_iter()
.map(|(cid, presence)| schema::bitswap::BlockPresence {
cid: cid.to_bytes(),
r#type: presence as i32,
})
.collect(),
..Default::default()
};

for entry in entries {
match entry {
ResponseType::Block { cid, block } => {
let count = message.block_presences.len();

(count > 0).then(|| (message.encode_to_vec().into(), count))
}

fn blocks_message(blocks: impl IntoIterator<Item = (Cid, Vec<u8>)>) -> Option<(Bytes, usize)> {
let message = schema::bitswap::Message {
// Set wantlist to not cause null pointer dereference in older versions of Kubo.
wantlist: Some(Default::default()),
payload: blocks
.into_iter()
.map(|(cid, block)| {
let prefix = Prefix {
version: cid.version(),
codec: cid.codec(),
Expand All @@ -553,24 +640,180 @@ async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) ->
}
.to_bytes();

response.payload.push(schema::bitswap::Block {
schema::bitswap::Block {
prefix,
data: block,
});
}
ResponseType::Presence { cid, presence } => {
response.block_presences.push(schema::bitswap::BlockPresence {
cid: cid.to_bytes(),
r#type: presence as i32,
});
}
})
.collect(),
..Default::default()
};

let count = message.payload.len();

(count > 0).then(|| (message.encode_to_vec().into(), count))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Bitswap work, shouldn't we send also empty Message? Doesn't it cause disconnect or something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we passed an empty iterator (e.g., no block presence at all), it doesn't make sense to send empty message.

}

/// Extract a batch of blocks of no more than `max_size` from `blocks`.
/// Returns `None` if no more blocks are left.
fn extract_next_batch<'a>(
blocks: &'a mut VecDeque<(Cid, Vec<u8>)>,
max_size: usize,
) -> Option<Drain<'a, (Cid, Vec<u8>)>> {
// Get rid of oversized blocks to not stall the processing by not being able to queue them.
loop {
if let Some(block) = blocks.front() {
if block.1.len() > max_size {
tracing::warn!(
target: LOG_TARGET,
cid = block.0.to_string(),
size = block.1.len(),
max_size,
"outgoing Bitswap block exceeded max batch size",
);
blocks.pop_front();
} else {
break;
}
} else {
return None;
}
}

let message = response.encode_to_vec().into();
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
Err(_) => Err(Error::Timeout),
Ok(Err(e)) => Err(Error::SubstreamError(e)),
Ok(Ok(())) => Ok(()),
// Determine how many blocks we can batch. Note that we can always batch at least one
// block due to check above.
let mut total_size = 0;
let mut block_count = 0;

for b in blocks.iter() {
let next_block_size = b.1.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this batching also count with prefix and struct bytes overhead (2-3 bytes)?

                schema::bitswap::Block {
                    prefix,
                    data: block,
                });

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's only about total block size. As per spec, 2 MiB blocks must go through, so the actual message size will be higher.

if total_size + next_block_size > max_size {
break;
}
total_size += next_block_size;
block_count += 1;
}

Some(blocks.drain(..block_count))
}

#[cfg(test)]
mod tests {
use cid::multihash::Multihash;

use super::*;

fn cid(block: &[u8]) -> Cid {
let codec = 0x55;
let multihash = Code::Sha2_256.digest(block);
let multihash =
Multihash::wrap(multihash.code(), multihash.digest()).expect("to be valid multihash");

Cid::new_v1(codec, multihash)
}

#[test]
fn extract_next_batch_fits_max_size() {
let max_size = 100;

let block1 = vec![0x01; 10];
let block2 = vec![0x02; 10];
let block3 = vec![0x03; 10];

let blocks = vec![
(cid(&block1), block1),
(cid(&block2), block2),
(cid(&block3), block3),
];
let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();

let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
assert_eq!(batch.collect::<Vec<_>>(), blocks);

assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
}

#[test]
fn extract_next_batch_chunking_exact() {
let max_size = 20;

let block1 = vec![0x01; 10];
let block2 = vec![0x02; 10];
let block3 = vec![0x03; 10];

let blocks = vec![
(cid(&block1), block1.clone()),
(cid(&block2), block2.clone()),
(cid(&block3), block3.clone()),
];
let chunk1 = vec![
(cid(&block1), block1.clone()),
(cid(&block2), block2.clone()),
];
let chunk2 = vec![(cid(&block3), block3.clone())];
let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();

let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
assert_eq!(batch.collect::<Vec<_>>(), chunk1);

let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
assert_eq!(batch.collect::<Vec<_>>(), chunk2);

assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
}

#[test]
fn extract_next_batch_chunking_less_than() {
let max_size = 20;

let block1 = vec![0x01; 10];
let block2 = vec![0x02; 9];
let block3 = vec![0x03; 10];

let blocks = vec![
(cid(&block1), block1.clone()),
(cid(&block2), block2.clone()),
(cid(&block3), block3.clone()),
];
let chunk1 = vec![
(cid(&block1), block1.clone()),
(cid(&block2), block2.clone()),
];
let chunk2 = vec![(cid(&block3), block3.clone())];
let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();

let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
assert_eq!(batch.collect::<Vec<_>>(), chunk1);

let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
assert_eq!(batch.collect::<Vec<_>>(), chunk2);

assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
}

#[test]
fn extract_next_batch_oversized_blocks_discarded() {
let max_size = 20;

let block1 = vec![0x01; 10];
let block2 = vec![0x02; 101];
let block3 = vec![0x03; 10];

let blocks = vec![
(cid(&block1), block1.clone()),
(cid(&block2), block2.clone()),
(cid(&block3), block3.clone()),
];
let chunk1 = vec![(cid(&block1), block1.clone())];
let chunk2 = vec![(cid(&block3), block3.clone())];
let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();

let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
assert_eq!(batch.collect::<Vec<_>>(), chunk1);

let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
assert_eq!(batch.collect::<Vec<_>>(), chunk2);

assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
}
}
Loading