Skip to content

Commit 0d2d1bd

Browse files
bitswap: Split block responses into batches under 2 MiB (#516)
Split blocks in a Bitswap response into batches under 2 MiB so the maximum substream message size is respected and we don't lose the blocks. Closes #514. --------- Co-authored-by: Branislav Kontur <[email protected]>
1 parent e8d93ef commit 0d2d1bd

File tree

2 files changed

+271
-24
lines changed

2 files changed

+271
-24
lines changed

src/protocol/libp2p/bitswap/config.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,13 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
3030
/// IPFS Bitswap protocol name as a string.
3131
pub const PROTOCOL_NAME: &str = "/ipfs/bitswap/1.2.0";
3232

33-
/// Maximum Size for `/ipfs/bitswap/1.2.0` substream payload. Note this is bigger than 2 MiB max
34-
/// block size to account for protobuf message overhead.
35-
const MAX_PAYLOAD_SIZE: usize = 2_100_000;
33+
/// Maximum size for `/ipfs/bitswap/1.2.0` substream message. Includes enough room for protobuf
34+
/// overhead. Enforced on the transport level.
35+
pub const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
36+
37+
/// Maximum batch size of all blocks in a single Bitswap message combined. Enforced on the
38+
/// application protocol level.
39+
pub const MAX_BATCH_SIZE: usize = 2 * 1024 * 1024;
3640

3741
/// Bitswap configuration.
3842
#[derive(Debug)]
@@ -61,7 +65,7 @@ impl Config {
6165
cmd_rx,
6266
event_tx,
6367
protocol: ProtocolName::from(PROTOCOL_NAME),
64-
codec: ProtocolCodec::UnsignedVarint(Some(MAX_PAYLOAD_SIZE)),
68+
codec: ProtocolCodec::UnsignedVarint(Some(MAX_MESSAGE_SIZE)),
6569
},
6670
BitswapHandle::new(event_rx, cmd_tx),
6771
)

src/protocol/libp2p/bitswap/mod.rs

Lines changed: 263 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::{
3131
PeerId,
3232
};
3333

34+
use bytes::Bytes;
3435
use cid::{Cid, Version};
3536
use prost::Message;
3637
use tokio::sync::mpsc::{Receiver, Sender};
@@ -40,7 +41,7 @@ pub use config::Config;
4041
pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType};
4142
pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
4243
use std::{
43-
collections::{hash_map::Entry, HashMap, HashSet},
44+
collections::{hash_map::Entry, vec_deque::Drain, HashMap, HashSet, VecDeque},
4445
time::Duration,
4546
};
4647

@@ -535,16 +536,102 @@ async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) ->
535536
}
536537

537538
async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) -> Result<(), Error> {
538-
let mut response = schema::bitswap::Message {
539-
// `wantlist` field must always be present. This is what the official Kubo
540-
// IPFS implementation does.
539+
// Send presences in a separate message to not deal with it when batching blocks below.
540+
if let Some((message, cid_count)) =
541+
presences_message(entries.iter().filter_map(|entry| match entry {
542+
ResponseType::Presence { cid, presence } => Some((*cid, *presence)),
543+
ResponseType::Block { .. } => None,
544+
}))
545+
{
546+
if message.len() <= config::MAX_MESSAGE_SIZE {
547+
tracing::trace!(
548+
target: LOG_TARGET,
549+
cid_count,
550+
"sending Bitswap presence message",
551+
);
552+
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
553+
Err(_) => return Err(Error::Timeout),
554+
Ok(Err(e)) => return Err(Error::SubstreamError(e)),
555+
Ok(Ok(())) => {}
556+
}
557+
} else {
558+
// This should never happen in practice, but log a warning if the presence message
559+
// exceeded [`config::MAX_MESSAGE_SIZE`].
560+
tracing::warn!(
561+
target: LOG_TARGET,
562+
size = message.len(),
563+
max_size = config::MAX_MESSAGE_SIZE,
564+
"outgoing Bitswap presence message exceeded max size",
565+
);
566+
}
567+
}
568+
569+
// Send blocks in batches of up to [`config::MAX_BATCH_SIZE`] bytes.
570+
let mut blocks = entries
571+
.into_iter()
572+
.filter_map(|entry| match entry {
573+
ResponseType::Block { cid, block } => Some((cid, block)),
574+
ResponseType::Presence { .. } => None,
575+
})
576+
.collect::<VecDeque<_>>();
577+
578+
while let Some(batch) = extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) {
579+
if let Some((message, block_count)) = blocks_message(batch) {
580+
if message.len() <= config::MAX_MESSAGE_SIZE {
581+
tracing::trace!(
582+
target: LOG_TARGET,
583+
block_count,
584+
"sending Bitswap blocks message",
585+
);
586+
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
587+
Err(_) => return Err(Error::Timeout),
588+
Ok(Err(e)) => return Err(Error::SubstreamError(e)),
589+
Ok(Ok(())) => {}
590+
}
591+
} else {
592+
// This should never happen in practice, but log a warning if the blocks message
593+
// exceeded [`config::MAX_MESSAGE_SIZE`].
594+
tracing::warn!(
595+
target: LOG_TARGET,
596+
size = message.len(),
597+
max_size = config::MAX_MESSAGE_SIZE,
598+
"outgoing Bitswap blocks message exceeded max size",
599+
);
600+
}
601+
}
602+
}
603+
604+
Ok(())
605+
}
606+
607+
fn presences_message(
608+
presences: impl IntoIterator<Item = (Cid, BlockPresenceType)>,
609+
) -> Option<(Bytes, usize)> {
610+
let message = schema::bitswap::Message {
611+
// Set wantlist to not cause null pointer dereference in older versions of Kubo.
541612
wantlist: Some(Default::default()),
613+
block_presences: presences
614+
.into_iter()
615+
.map(|(cid, presence)| schema::bitswap::BlockPresence {
616+
cid: cid.to_bytes(),
617+
r#type: presence as i32,
618+
})
619+
.collect(),
542620
..Default::default()
543621
};
544622

545-
for entry in entries {
546-
match entry {
547-
ResponseType::Block { cid, block } => {
623+
let count = message.block_presences.len();
624+
625+
(count > 0).then(|| (message.encode_to_vec().into(), count))
626+
}
627+
628+
fn blocks_message(blocks: impl IntoIterator<Item = (Cid, Vec<u8>)>) -> Option<(Bytes, usize)> {
629+
let message = schema::bitswap::Message {
630+
// Set wantlist to not cause null pointer dereference in older versions of Kubo.
631+
wantlist: Some(Default::default()),
632+
payload: blocks
633+
.into_iter()
634+
.map(|(cid, block)| {
548635
let prefix = Prefix {
549636
version: cid.version(),
550637
codec: cid.codec(),
@@ -553,24 +640,180 @@ async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) ->
553640
}
554641
.to_bytes();
555642

556-
response.payload.push(schema::bitswap::Block {
643+
schema::bitswap::Block {
557644
prefix,
558645
data: block,
559-
});
560-
}
561-
ResponseType::Presence { cid, presence } => {
562-
response.block_presences.push(schema::bitswap::BlockPresence {
563-
cid: cid.to_bytes(),
564-
r#type: presence as i32,
565-
});
646+
}
647+
})
648+
.collect(),
649+
..Default::default()
650+
};
651+
652+
let count = message.payload.len();
653+
654+
(count > 0).then(|| (message.encode_to_vec().into(), count))
655+
}
656+
657+
/// Extract a batch of blocks of no more than `max_size` from `blocks`.
658+
/// Returns `None` if no more blocks are left.
659+
fn extract_next_batch<'a>(
660+
blocks: &'a mut VecDeque<(Cid, Vec<u8>)>,
661+
max_batch_size: usize,
662+
) -> Option<Drain<'a, (Cid, Vec<u8>)>> {
663+
// Get rid of oversized blocks to not stall the processing by not being able to queue them.
664+
loop {
665+
if let Some(block) = blocks.front() {
666+
if block.1.len() > max_batch_size {
667+
tracing::warn!(
668+
target: LOG_TARGET,
669+
cid = block.0.to_string(),
670+
size = block.1.len(),
671+
max_batch_size,
672+
"outgoing Bitswap block exceeded max batch size",
673+
);
674+
blocks.pop_front();
675+
} else {
676+
break;
566677
}
678+
} else {
679+
return None;
567680
}
568681
}
569682

570-
let message = response.encode_to_vec().into();
571-
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
572-
Err(_) => Err(Error::Timeout),
573-
Ok(Err(e)) => Err(Error::SubstreamError(e)),
574-
Ok(Ok(())) => Ok(()),
683+
// Determine how many blocks we can batch. Note that we can always batch at least one
684+
// block due to check above.
685+
let mut total_size = 0;
686+
let mut block_count = 0;
687+
688+
for b in blocks.iter() {
689+
let next_block_size = b.1.len();
690+
if total_size + next_block_size > max_batch_size {
691+
break;
692+
}
693+
total_size += next_block_size;
694+
block_count += 1;
695+
}
696+
697+
Some(blocks.drain(..block_count))
698+
}
699+
700+
#[cfg(test)]
701+
mod tests {
702+
use cid::multihash::Multihash;
703+
704+
use super::*;
705+
706+
fn cid(block: &[u8]) -> Cid {
707+
let codec = 0x55;
708+
let multihash = Code::Sha2_256.digest(block);
709+
let multihash =
710+
Multihash::wrap(multihash.code(), multihash.digest()).expect("to be valid multihash");
711+
712+
Cid::new_v1(codec, multihash)
713+
}
714+
715+
#[test]
716+
fn extract_next_batch_fits_max_size() {
717+
let max_size = 100;
718+
719+
let block1 = vec![0x01; 10];
720+
let block2 = vec![0x02; 10];
721+
let block3 = vec![0x03; 10];
722+
723+
let blocks = vec![
724+
(cid(&block1), block1),
725+
(cid(&block2), block2),
726+
(cid(&block3), block3),
727+
];
728+
let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
729+
730+
let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
731+
assert_eq!(batch.collect::<Vec<_>>(), blocks);
732+
733+
assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
734+
}
735+
736+
#[test]
737+
fn extract_next_batch_chunking_exact() {
738+
let max_size = 20;
739+
740+
let block1 = vec![0x01; 10];
741+
let block2 = vec![0x02; 10];
742+
let block3 = vec![0x03; 10];
743+
744+
let blocks = vec![
745+
(cid(&block1), block1.clone()),
746+
(cid(&block2), block2.clone()),
747+
(cid(&block3), block3.clone()),
748+
];
749+
let chunk1 = vec![
750+
(cid(&block1), block1.clone()),
751+
(cid(&block2), block2.clone()),
752+
];
753+
let chunk2 = vec![(cid(&block3), block3.clone())];
754+
let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
755+
756+
let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
757+
assert_eq!(batch.collect::<Vec<_>>(), chunk1);
758+
759+
let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
760+
assert_eq!(batch.collect::<Vec<_>>(), chunk2);
761+
762+
assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
763+
}
764+
765+
#[test]
766+
fn extract_next_batch_chunking_less_than() {
767+
let max_size = 20;
768+
769+
let block1 = vec![0x01; 10];
770+
let block2 = vec![0x02; 9];
771+
let block3 = vec![0x03; 10];
772+
773+
let blocks = vec![
774+
(cid(&block1), block1.clone()),
775+
(cid(&block2), block2.clone()),
776+
(cid(&block3), block3.clone()),
777+
];
778+
let chunk1 = vec![
779+
(cid(&block1), block1.clone()),
780+
(cid(&block2), block2.clone()),
781+
];
782+
let chunk2 = vec![(cid(&block3), block3.clone())];
783+
let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
784+
785+
let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
786+
assert_eq!(batch.collect::<Vec<_>>(), chunk1);
787+
788+
let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
789+
assert_eq!(batch.collect::<Vec<_>>(), chunk2);
790+
791+
assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
792+
}
793+
794+
#[test]
795+
fn extract_next_batch_oversized_blocks_discarded() {
796+
let max_size = 20;
797+
798+
let block1 = vec![0x01; 10];
799+
let block2 = vec![0x02; 101];
800+
let block3 = vec![0x03; 10];
801+
802+
let blocks = vec![
803+
(cid(&block1), block1.clone()),
804+
(cid(&block2), block2.clone()),
805+
(cid(&block3), block3.clone()),
806+
];
807+
let chunk1 = vec![(cid(&block1), block1.clone())];
808+
let chunk2 = vec![(cid(&block3), block3.clone())];
809+
let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
810+
811+
let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
812+
assert_eq!(batch.collect::<Vec<_>>(), chunk1);
813+
814+
let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
815+
assert_eq!(batch.collect::<Vec<_>>(), chunk2);
816+
817+
assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
575818
}
576819
}

0 commit comments

Comments
 (0)