Skip to content

Commit b302baa

Browse files
committed
Respect one substrea per peer limit
1 parent d85b0af commit b302baa

File tree

2 files changed

+165
-113
lines changed

2 files changed

+165
-113
lines changed

src/protocol/libp2p/bitswap/handle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub enum BitswapEvent {
5656
}
5757

5858
/// Response type for received bitswap request.
59-
#[derive(Debug)]
59+
#[derive(Debug, Clone)]
6060
#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
6161
pub enum ResponseType {
6262
/// Block.

src/protocol/libp2p/bitswap/mod.rs

Lines changed: 164 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ use tokio_stream::{StreamExt, StreamMap};
3939
pub use config::Config;
4040
pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType};
4141
pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
42-
use std::{collections::HashMap, time::Duration};
42+
use std::{
43+
collections::{hash_map::Entry, HashMap, HashSet},
44+
time::Duration,
45+
};
4346

4447
mod config;
4548
mod handle;
@@ -134,14 +137,17 @@ pub(crate) struct Bitswap {
134137
/// RX channel for receiving commands from `BitswapHandle`.
135138
cmd_rx: Receiver<BitswapCommand>,
136139

137-
/// Pending outbound substreams.
138-
pending_outbound: HashMap<SubstreamId, SubstreamAction>,
140+
/// Pending outbound actions.
141+
pending_outbound: HashMap<PeerId, Vec<SubstreamAction>>,
139142

140143
/// Inbound substreams.
141144
inbound: StreamMap<PeerId, Substream>,
142145

146+
/// Outbound substreams.
147+
outbound: HashMap<PeerId, Substream>,
148+
143149
/// Peers waiting for dial.
144-
pending_dials: HashMap<PeerId, Vec<SubstreamAction>>,
150+
pending_dials: HashSet<PeerId>,
145151
}
146152

147153
impl Bitswap {
@@ -153,7 +159,8 @@ impl Bitswap {
153159
event_tx: config.event_tx,
154160
pending_outbound: HashMap::new(),
155161
inbound: StreamMap::new(),
156-
pending_dials: HashMap::new(),
162+
outbound: HashMap::new(),
163+
pending_dials: HashSet::new(),
157164
}
158165
}
159166

@@ -294,136 +301,114 @@ impl Bitswap {
294301
substream_id: SubstreamId,
295302
mut substream: Substream,
296303
) {
297-
let Some(action) = self.pending_outbound.remove(&substream_id) else {
304+
let Some(actions) = self.pending_outbound.remove(&peer) else {
298305
tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist");
299306
return;
300307
};
301308

302-
match action {
303-
SubstreamAction::SendRequest(cids) => {
304-
let request = schema::bitswap::Message {
305-
wantlist: Some(schema::bitswap::Wantlist {
306-
entries: cids
307-
.into_iter()
308-
.map(|(cid, want_type)| schema::bitswap::wantlist::Entry {
309-
block: cid.to_bytes(),
310-
priority: 1,
311-
cancel: false,
312-
want_type: want_type as i32,
313-
send_dont_have: false,
314-
})
315-
.collect(),
316-
full: false,
317-
}),
318-
..Default::default()
319-
};
320-
321-
let message = request.encode_to_vec().into();
322-
let _ = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await;
323-
324-
// Keep the substream open to receive the response.
325-
self.inbound.insert(peer, substream);
326-
}
327-
SubstreamAction::SendResponse(entries) => {
328-
let mut response = schema::bitswap::Message {
329-
// `wantlist` field must always be present. This is what the official Kubo IPFS
330-
// implementation does.
331-
wantlist: Some(Default::default()),
332-
..Default::default()
333-
};
334-
335-
for entry in entries {
336-
match entry {
337-
ResponseType::Block { cid, block } => {
338-
let prefix = Prefix {
339-
version: cid.version(),
340-
codec: cid.codec(),
341-
multihash_type: cid.hash().code(),
342-
multihash_len: cid.hash().size(),
343-
}
344-
.to_bytes();
345-
346-
response.payload.push(schema::bitswap::Block {
347-
prefix,
348-
data: block,
349-
});
350-
}
351-
ResponseType::Presence { cid, presence } => {
352-
response.block_presences.push(schema::bitswap::BlockPresence {
353-
cid: cid.to_bytes(),
354-
r#type: presence as i32,
355-
});
356-
}
309+
for action in actions {
310+
match action {
311+
SubstreamAction::SendRequest(cids) => {
312+
if send_request(&mut substream, cids).await.is_err() {
313+
// Drop the substream and all actions in case of sending error.
314+
tracing::debug!(target: LOG_TARGET, ?peer, "bitswap request failed");
315+
return;
316+
}
317+
}
318+
SubstreamAction::SendResponse(entries) => {
319+
if send_response(&mut substream, entries).await.is_err() {
320+
// Drop the substream and all actions in case of sending error.
321+
tracing::debug!(target: LOG_TARGET, ?peer, "bitswap response failed");
322+
return;
357323
}
358324
}
359-
360-
let message = response.encode_to_vec().into();
361-
let _ = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await;
362-
363-
substream.close().await;
364325
}
365326
}
327+
328+
self.outbound.insert(peer, substream);
366329
}
367330

368331
/// Handle connection established event.
369332
fn on_connection_established(&mut self, peer: PeerId) {
370-
// If we have pending actions for this peer, open substreams.
371-
if let Some(actions) = self.pending_dials.remove(&peer) {
372-
for action in actions {
373-
match self.service.open_substream(peer) {
374-
Ok(substream_id) => {
375-
self.pending_outbound.insert(substream_id, action);
376-
}
377-
Err(error) => {
378-
tracing::debug!(
379-
target: LOG_TARGET,
380-
?peer,
381-
?error,
382-
"failed to open substream after connection established",
383-
);
384-
}
333+
// If we have pending actions for this peer, open a substream.
334+
if self.pending_dials.remove(&peer) {
335+
if let Err(error) = self.service.open_substream(peer) {
336+
tracing::debug!(
337+
target: LOG_TARGET,
338+
?peer,
339+
?error,
340+
"failed to open substream after connection established",
341+
);
342+
// Drop all pending actions; they are not going to be handled anyway, and we need
343+
// the entry to be empty to properly open subsequent substreams.
344+
self.pending_outbound.remove(&peer);
345+
}
346+
}
347+
}
348+
349+
/// Open substream or dial a peer.
350+
fn open_substream_or_dial(&mut self, peer: PeerId) {
351+
if let Err(error) = self.service.open_substream(peer) {
352+
tracing::trace!(
353+
target: LOG_TARGET,
354+
?peer,
355+
?error,
356+
"failed to open substream, dialing peer",
357+
);
358+
359+
// Failed to open substream, try to dial the peer.
360+
match self.service.dial(&peer) {
361+
Ok(()) => {
362+
// Store the peer to open a substream once it is connected.
363+
self.pending_dials.insert(peer);
364+
}
365+
Err(error) => {
366+
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer");
385367
}
386368
}
387369
}
388370
}
389371

390372
/// Handle bitswap request.
391-
fn on_bitswap_request(&mut self, peer: PeerId, cids: Vec<(Cid, WantType)>) {
392-
match self.service.open_substream(peer) {
393-
Ok(substream_id) => {
394-
self.pending_outbound.insert(substream_id, SubstreamAction::SendRequest(cids));
395-
}
396-
Err(error) => {
397-
tracing::trace!(target: LOG_TARGET, ?peer, ?error, "failed to open substream, dialing peer");
398-
399-
// Failed to open substream, try to dial the peer.
400-
match self.service.dial(&peer) {
401-
Ok(()) => {
402-
// Store the action to be performed when the peer is connected.
403-
self.pending_dials
404-
.entry(peer)
405-
.or_default()
406-
.push(SubstreamAction::SendRequest(cids));
407-
}
408-
Err(error) => {
409-
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer");
410-
}
411-
}
373+
async fn on_bitswap_request(&mut self, peer: PeerId, cids: Vec<(Cid, WantType)>) {
374+
// Try to send request over existing substream first.
375+
if let Entry::Occupied(mut entry) = self.outbound.entry(peer) {
376+
if send_request(entry.get_mut(), cids.clone()).await.is_ok() {
377+
return;
378+
} else {
379+
entry.remove();
412380
}
413381
}
382+
383+
// Store pending actions for later and open substream if not requested already.
384+
let pending_actions = self.pending_outbound.entry(peer).or_default();
385+
let no_pending_substream = pending_actions.is_empty();
386+
pending_actions.push(SubstreamAction::SendRequest(cids));
387+
388+
if no_pending_substream {
389+
self.open_substream_or_dial(peer);
390+
}
414391
}
415392

416393
/// Handle bitswap response.
417-
fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
418-
match self.service.open_substream(peer) {
419-
Err(error) => {
420-
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream to peer")
421-
}
422-
Ok(substream_id) => {
423-
self.pending_outbound
424-
.insert(substream_id, SubstreamAction::SendResponse(responses));
394+
async fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
395+
// Try to send response over existing substream first.
396+
if let Entry::Occupied(mut entry) = self.outbound.entry(peer) {
397+
if send_response(entry.get_mut(), responses.clone()).await.is_ok() {
398+
return;
399+
} else {
400+
entry.remove();
425401
}
426402
}
403+
404+
// Store pending actions for later and open substream if not requested already.
405+
let pending_actions = self.pending_outbound.entry(peer).or_default();
406+
let no_pending_substream = pending_actions.is_empty();
407+
pending_actions.push(SubstreamAction::SendResponse(responses));
408+
409+
if no_pending_substream {
410+
self.open_substream_or_dial(peer);
411+
}
427412
}
428413

429414
/// Start [`Bitswap`] event loop.
@@ -451,10 +436,10 @@ impl Bitswap {
451436
},
452437
command = self.cmd_rx.recv() => match command {
453438
Some(BitswapCommand::SendRequest { peer, cids }) => {
454-
self.on_bitswap_request(peer, cids);
439+
self.on_bitswap_request(peer, cids).await;
455440
}
456441
Some(BitswapCommand::SendResponse { peer, responses }) => {
457-
self.on_bitswap_response(peer, responses);
442+
self.on_bitswap_response(peer, responses).await;
458443
}
459444
None => return,
460445
},
@@ -484,3 +469,70 @@ impl Bitswap {
484469
}
485470
}
486471
}
472+
473+
async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) -> Result<(), ()> {
474+
let request = schema::bitswap::Message {
475+
wantlist: Some(schema::bitswap::Wantlist {
476+
entries: cids
477+
.into_iter()
478+
.map(|(cid, want_type)| schema::bitswap::wantlist::Entry {
479+
block: cid.to_bytes(),
480+
priority: 1,
481+
cancel: false,
482+
want_type: want_type as i32,
483+
send_dont_have: false,
484+
})
485+
.collect(),
486+
full: false,
487+
}),
488+
..Default::default()
489+
};
490+
491+
let message = request.encode_to_vec().into();
492+
if let Ok(Ok(())) = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
493+
Ok(())
494+
} else {
495+
Err(())
496+
}
497+
}
498+
499+
async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) -> Result<(), ()> {
500+
let mut response = schema::bitswap::Message {
501+
// `wantlist` field must always be present. This is what the official Kubo
502+
// IPFS implementation does.
503+
wantlist: Some(Default::default()),
504+
..Default::default()
505+
};
506+
507+
for entry in entries {
508+
match entry {
509+
ResponseType::Block { cid, block } => {
510+
let prefix = Prefix {
511+
version: cid.version(),
512+
codec: cid.codec(),
513+
multihash_type: cid.hash().code(),
514+
multihash_len: cid.hash().size(),
515+
}
516+
.to_bytes();
517+
518+
response.payload.push(schema::bitswap::Block {
519+
prefix,
520+
data: block,
521+
});
522+
}
523+
ResponseType::Presence { cid, presence } => {
524+
response.block_presences.push(schema::bitswap::BlockPresence {
525+
cid: cid.to_bytes(),
526+
r#type: presence as i32,
527+
});
528+
}
529+
}
530+
}
531+
532+
let message = response.encode_to_vec().into();
533+
if let Ok(Ok(())) = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
534+
Ok(())
535+
} else {
536+
Err(())
537+
}
538+
}

0 commit comments

Comments
 (0)