@@ -31,6 +31,7 @@ use crate::{
3131 PeerId ,
3232} ;
3333
34+ use bytes:: Bytes ;
3435use cid:: { Cid , Version } ;
3536use prost:: Message ;
3637use tokio:: sync:: mpsc:: { Receiver , Sender } ;
@@ -40,7 +41,7 @@ pub use config::Config;
4041pub use handle:: { BitswapCommand , BitswapEvent , BitswapHandle , ResponseType } ;
4142pub use schema:: bitswap:: { wantlist:: WantType , BlockPresenceType } ;
4243use std:: {
43- collections:: { hash_map:: Entry , HashMap , HashSet } ,
44+ collections:: { hash_map:: Entry , HashMap , HashSet , VecDeque } ,
4445 time:: Duration ,
4546} ;
4647
@@ -535,16 +536,129 @@ async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) ->
535536}
536537
537538async fn send_response ( substream : & mut Substream , entries : Vec < ResponseType > ) -> Result < ( ) , Error > {
538- let mut response = schema:: bitswap:: Message {
539- // `wantlist` field must always be present. This is what the official Kubo
540- // IPFS implementation does.
539+ // Send presences in a separate message to not deal with it when batching blocks below.
540+ if let Some ( ( message, cid_count) ) =
541+ presences_message ( entries. iter ( ) . filter_map ( |entry| match entry {
542+ ResponseType :: Presence { cid, presence } => Some ( ( cid. clone ( ) , * presence) ) ,
543+ ResponseType :: Block { .. } => None ,
544+ } ) )
545+ {
546+ if message. len ( ) <= config:: MAX_MESSAGE_SIZE {
547+ tracing:: trace!(
548+ target: LOG_TARGET ,
549+ cid_count,
550+ "sending Bitswap presence message" ,
551+ ) ;
552+ match tokio:: time:: timeout ( WRITE_TIMEOUT , substream. send_framed ( message) ) . await {
553+ Err ( _) => return Err ( Error :: Timeout ) ,
554+ Ok ( Err ( e) ) => return Err ( Error :: SubstreamError ( e) ) ,
555+ Ok ( Ok ( ( ) ) ) => { }
556+ }
557+ } else {
558+ // This should never happen in practice, but log a warning if the presence message
559+ // exceeded [`config::MAX_MESSAGE_SIZE`].
560+ tracing:: warn!(
561+ target: LOG_TARGET ,
562+ size = message. len( ) ,
563+ max_size = config:: MAX_MESSAGE_SIZE ,
564+ "outgoing Bitswap presence message exceeded max size" ,
565+ ) ;
566+ }
567+ }
568+
569+ // Send blocks in batches of up to [`config::MAX_BATCH_SIZE`] bytes.
570+ let mut blocks = entries
571+ . into_iter ( )
572+ . filter_map ( |entry| match entry {
573+ ResponseType :: Block { cid, block } => Some ( ( cid, block) ) ,
574+ ResponseType :: Presence { .. } => None ,
575+ } )
576+ . collect :: < VecDeque < _ > > ( ) ;
577+
578+ while !blocks. is_empty ( ) {
579+ // Get rid of oversized blocks to not stall the processing by not being able to queue them.
580+ if let Some ( block) = blocks. front ( ) {
581+ if block. 1 . len ( ) > config:: MAX_BATCH_SIZE {
582+ tracing:: warn!(
583+ target: LOG_TARGET ,
584+ size = block. 1 . len( ) ,
585+ max_size = config:: MAX_BATCH_SIZE ,
586+ "outgoing Bitswap block exceeded max batch size" ,
587+ ) ;
588+ blocks. pop_front ( ) ;
589+ continue ;
590+ }
591+ }
592+
593+ // Determine how many blocks we can batch.
594+ let mut total_size = 0 ;
595+ let mut block_count = 0 ;
596+
597+ for b in blocks. iter ( ) {
598+ let next_block_size = b. 1 . len ( ) ;
599+ if total_size + next_block_size > config:: MAX_BATCH_SIZE {
600+ break ;
601+ }
602+ total_size += next_block_size;
603+ block_count += 1 ;
604+ }
605+
606+ if let Some ( message) = blocks_message ( blocks. drain ( ..block_count) ) {
607+ if message. len ( ) <= config:: MAX_MESSAGE_SIZE {
608+ tracing:: trace!(
609+ target: LOG_TARGET ,
610+ block_count,
611+ "sending Bitswap blocks message" ,
612+ ) ;
613+ match tokio:: time:: timeout ( WRITE_TIMEOUT , substream. send_framed ( message) ) . await {
614+ Err ( _) => return Err ( Error :: Timeout ) ,
615+ Ok ( Err ( e) ) => return Err ( Error :: SubstreamError ( e) ) ,
616+ Ok ( Ok ( ( ) ) ) => { }
617+ }
618+ } else {
619+ // This should never happen in practice, but log a warning if the blocks message
620+ // exceeded [`config::MAX_MESSAGE_SIZE`].
621+ tracing:: warn!(
622+ target: LOG_TARGET ,
623+ size = message. len( ) ,
624+ max_size = config:: MAX_MESSAGE_SIZE ,
625+ "outgoing Bitswap blocks message exceeded max size" ,
626+ ) ;
627+ }
628+ }
629+ }
630+
631+ Ok ( ( ) )
632+ }
633+
634+ fn presences_message (
635+ presences : impl IntoIterator < Item = ( Cid , BlockPresenceType ) > ,
636+ ) -> Option < ( Bytes , usize ) > {
637+ let message = schema:: bitswap:: Message {
638+ // Set wantlist to not cause null pointer dereference in older versions of Kubo.
541639 wantlist : Some ( Default :: default ( ) ) ,
640+ block_presences : presences
641+ . into_iter ( )
642+ . map ( |( cid, presence) | schema:: bitswap:: BlockPresence {
643+ cid : cid. to_bytes ( ) ,
644+ r#type : presence as i32 ,
645+ } )
646+ . collect ( ) ,
542647 ..Default :: default ( )
543648 } ;
544649
545- for entry in entries {
546- match entry {
547- ResponseType :: Block { cid, block } => {
650+ let count = message. block_presences . len ( ) ;
651+
652+ ( count > 0 ) . then_some ( ( message. encode_to_vec ( ) . into ( ) , count) )
653+ }
654+
655+ fn blocks_message ( blocks : impl IntoIterator < Item = ( Cid , Vec < u8 > ) > ) -> Option < Bytes > {
656+ let message = schema:: bitswap:: Message {
657+ // Set wantlist to not cause null pointer dereference in older versions of Kubo.
658+ wantlist : Some ( Default :: default ( ) ) ,
659+ payload : blocks
660+ . into_iter ( )
661+ . map ( |( cid, block) | {
548662 let prefix = Prefix {
549663 version : cid. version ( ) ,
550664 codec : cid. codec ( ) ,
@@ -553,24 +667,14 @@ async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) ->
553667 }
554668 . to_bytes ( ) ;
555669
556- response . payload . push ( schema:: bitswap:: Block {
670+ schema:: bitswap:: Block {
557671 prefix,
558672 data : block,
559- } ) ;
560- }
561- ResponseType :: Presence { cid, presence } => {
562- response. block_presences . push ( schema:: bitswap:: BlockPresence {
563- cid : cid. to_bytes ( ) ,
564- r#type : presence as i32 ,
565- } ) ;
566- }
567- }
568- }
673+ }
674+ } )
675+ . collect ( ) ,
676+ ..Default :: default ( )
677+ } ;
569678
570- let message = response. encode_to_vec ( ) . into ( ) ;
571- match tokio:: time:: timeout ( WRITE_TIMEOUT , substream. send_framed ( message) ) . await {
572- Err ( _) => Err ( Error :: Timeout ) ,
573- Ok ( Err ( e) ) => Err ( Error :: SubstreamError ( e) ) ,
574- Ok ( Ok ( ( ) ) ) => Ok ( ( ) ) ,
575- }
679+ ( !message. payload . is_empty ( ) ) . then_some ( message. encode_to_vec ( ) . into ( ) )
576680}
0 commit comments