-
Notifications
You must be signed in to change notification settings - Fork 27
bitswap: Split block responses into batches under 2 MiB #516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if message.len() <= config::MAX_MESSAGE_SIZE { | ||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| cid_count, | ||
| "sending Bitswap presence message", | ||
| ); | ||
| match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { | ||
| Err(_) => return Err(Error::Timeout), | ||
| Ok(Err(e)) => return Err(Error::SubstreamError(e)), | ||
| Ok(Ok(())) => {} | ||
| } | ||
| } else { | ||
| // This should never happen in practice, but log a warning if the presence message | ||
| // exceeded [`config::MAX_MESSAGE_SIZE`]. | ||
| tracing::warn!( | ||
| target: LOG_TARGET, | ||
| size = message.len(), | ||
| max_size = config::MAX_MESSAGE_SIZE, | ||
| "outgoing Bitswap presence message exceeded max size", | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is similar to the block below, maybe we can group them to avoid duplications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of a good way of de-duplicating this, the data and the log messages are different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, I agree with Alex, this could be maybe de-duplicated. In both cases, you are building let message = schema::bitswap::Message, so when building it we could check MAX_MESSAGE_SIZE or MAX_BATCH_SIZE.
Basically, we could reuse extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) for both - maybe with closure which would return length (for presence - 2 bytes?, for block real length)
| "sending Bitswap blocks message", | ||
| ); | ||
| match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { | ||
| Err(_) => return Err(Error::Timeout), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dq: This will drop any remaining block because we consider the connection as unhealthy?
However, this might be expected under heavy load that the protocol handle will not be able to keep up with messages. Maybe we can propagate this to higher levels or is the timeout error sufficient to retry later on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good question I don't have the right an answer to. Indeed the entire response will be dropped. I am inclined to think that if the protocol handle is not able to keep up with messages, we shouldn't retry sending. Instead it will be up to the Bitswap client code to handle the timeout and repeat the query, may be querying another peer.
It would be good to investigate how Kubo handles this and if this will automatically work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lexnv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one! 👍
|
|
||
| let count = message.payload.len(); | ||
|
|
||
| (count > 0).then(|| (message.encode_to_vec().into(), count)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does Bitswap work, shouldn't we send also empty Message? Doesn't it cause disconnect or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we passed an empty iterator (e.g., no block presence at all), it doesn't make sense to send empty message.
|
|
||
| while let Some(batch) = extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) { | ||
| if let Some((message, block_count)) = blocks_message(batch) { | ||
| if message.len() <= config::MAX_MESSAGE_SIZE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, this IF does not make sense, if we ensure that MAX_BATCH_SIZE < MAX_MESSAGE_SIZE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is highly unlikely in practice, but due to protobuf overhead message size can be > MAX_BATCH_SIZE.
|
|
||
| /// Maximum Size for `/ipfs/bitswap/1.2.0` substream payload. Note this is bigger than 2 MiB max | ||
| /// block size to account for protobuf message overhead. | ||
| const MAX_PAYLOAD_SIZE: usize = 2_100_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rosarp probably this constant was the reason, why 2 MiB didn't work :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past it was exactly 2 MiB, but due to prefix and protobuf overhead the message didn't fit.
|
|
||
| /// Maximum batch size of all blocks in a single Bitswap message combined. Enforced on the | ||
| /// application protocol level. | ||
| pub const MAX_BATCH_SIZE: usize = 2 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just thinking, what about adding both to the Config:
pub struct Config {
/// Protocol name.
pub(crate) protocol: ProtocolName,
/// Protocol codec.
pub(crate) codec: ProtocolCodec,
/// TX channel for sending events to the user protocol.
pub(super) event_tx: Sender<BitswapEvent>,
/// RX channel for receiving commands from the user.
pub(super) cmd_rx: Receiver<BitswapCommand>,
pub max_message_size: usize,
pub max_batch_size: usize,
}
And use those constants for default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 MiB are defined in the protocol spec, so i don't think we should add the possibility to "tune" it.
| let mut block_count = 0; | ||
|
|
||
| for b in blocks.iter() { | ||
| let next_block_size = b.1.len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this batching also count with prefix and struct bytes overhead (2-3 bytes)?
schema::bitswap::Block {
prefix,
data: block,
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's only about total block size. As per spec, 2 MiB blocks must go through, so the actual message size will be higher.
bkontur
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, let's release and bump master.
Left just couple of ultra-nits, which could be also ignored, maybe just rename max_size -> max_batch_size for clarity
bkontur
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dmitry-markin cool, thank you, let's ship it :)
|
We'll create a release once another bugfix is merged (#518). Hopefully today. |
## [0.13.0] - 2026-01-21 This release brings multiple fixes to both the transport and application-level protocols. Specifically, it enhances WebSocket stability by resolving AsyncWrite errors and ensuring that partial writes during the negotiation phase no longer trigger connection failures. At the same time, Bitswap client functionality is introduced, which makes this release semver breaking. ### Added - Add Bitswap client ([#501](#501)) ### Fixed - notif/fix: Avoid CPU busy loops on litep2p full shutdown ([#521](#521)) - protocol: Ensure transport manager knows about closed connections ([#515](#515)) - substream: Decrement the bytes counter to avoid excessive flushing ([#511](#511)) - crypto/noise: Improve stability of websockets by fixing AsyncWrite implementation ([#518](#518)) - bitswap: Split block responses into batches under 2 MiB ([#516](#516)) - crypto/noise: Fix connection negotiation logic on partial writes ([#519](#519)) - substream/fix: Fix partial reads for ProtocolCodec::Identity ([#512](#512)) - webrtc: Avoid panics returning error instead ([#509](#509)) - bitswap: e2e test & max payload fix ([#508](#508)) - tcp: Exit connections when events fail to propagate to protocols ([#506](#506)) - webrtc: Avoid future being dropped when channel is full ([#483](#483)) --------- Co-authored-by: Alexandru Vasile <[email protected]>
Split blocks in a Bitswap response into batches under 2 MiB so the maximum substream message size is respected and we don't lose the blocks.
Closes #514.
Follow-ups: