Skip to content

Commit b80f2ac

Browse files
authored
Merge branch 'master' into lexnv/fix-mdns
2 parents 3c767c9 + e9a009f commit b80f2ac

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

src/protocol/libp2p/identify.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ use crate::{
2828
substream::Substream,
2929
transport::Endpoint,
3030
types::{protocol::ProtocolName, SubstreamId},
31+
utils::futures_stream::FuturesStream,
3132
PeerId, DEFAULT_CHANNEL_SIZE,
3233
};
3334

34-
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
35+
use futures::{future::BoxFuture, Stream, StreamExt};
3536
use multiaddr::Multiaddr;
3637
use prost::Message;
3738
use tokio::sync::mpsc::{channel, Sender};
@@ -181,10 +182,10 @@ pub(crate) struct Identify {
181182
protocols: Vec<String>,
182183

183184
/// Pending outbound substreams.
184-
pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<IdentifyResponse>>>,
185+
pending_outbound: FuturesStream<BoxFuture<'static, crate::Result<IdentifyResponse>>>,
185186

186187
/// Pending inbound substreams.
187-
pending_inbound: FuturesUnordered<BoxFuture<'static, ()>>,
188+
pending_inbound: FuturesStream<BoxFuture<'static, ()>>,
188189
}
189190

190191
impl Identify {
@@ -197,8 +198,8 @@ impl Identify {
197198
public: config.public.expect("public key to be supplied"),
198199
protocol_version: config.protocol_version,
199200
user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()),
200-
pending_inbound: FuturesUnordered::new(),
201-
pending_outbound: FuturesUnordered::new(),
201+
pending_inbound: FuturesStream::new(),
202+
pending_outbound: FuturesStream::new(),
202203
protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(),
203204
}
204205
}
@@ -356,7 +357,10 @@ impl Identify {
356357
loop {
357358
tokio::select! {
358359
event = self.service.next() => match event {
359-
None => return,
360+
None => {
361+
tracing::warn!(target: LOG_TARGET, "transport service stream ended, terminating identify event loop");
362+
return
363+
},
360364
Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
361365
let _ = self.on_connection_established(peer, endpoint);
362366
}
@@ -390,7 +394,7 @@ impl Identify {
390394
.await;
391395
}
392396
Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"),
393-
None => return,
397+
None => {}
394398
}
395399
}
396400
}

src/utils/futures_stream.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ impl<F> FuturesStream<F> {
5050
self.futures.len()
5151
}
5252

53+
/// Check if the stream is empty.
54+
pub fn is_empty(&self) -> bool {
55+
self.futures.is_empty()
56+
}
57+
5358
/// Push a future for processing.
5459
pub fn push(&mut self, future: F) {
5560
self.futures.push(future);

0 commit comments

Comments
 (0)