Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
types::ConnectionId,
};

use futures::Stream;
use futures::{future::BoxFuture, Stream};
use multiaddr::Multiaddr;

use std::{
Expand Down Expand Up @@ -71,8 +71,8 @@ impl Transport for DummyTransport {
Ok(())
}

fn accept(&mut self, _: ConnectionId) -> crate::Result<()> {
Ok(())
fn accept(&mut self, _: ConnectionId) -> crate::Result<BoxFuture<'static, crate::Result<()>>> {
Ok(Box::pin(async { Ok(()) }))
}

fn accept_pending(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Expand Down
59 changes: 51 additions & 8 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
};

use address::{scores, AddressStore};
use futures::{Stream, StreamExt};
use futures::{future::BoxFuture, Stream, StreamExt};
use indexmap::IndexMap;
use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;
Expand Down Expand Up @@ -252,6 +252,11 @@ pub struct TransportManager {

/// Opening connections errors.
opening_errors: HashMap<ConnectionId, Vec<(Multiaddr, DialError)>>,

/// Pending accept future with associated connection information.
/// When a connection is accepted, we must wait for the accept future to complete
/// (which notifies all protocols) before emitting the ConnectionEstablished event.
pending_accept: Option<(PeerId, Endpoint, BoxFuture<'static, crate::Result<()>>)>,
}

/// Builder for [`crate::transport::manager::TransportManager`].
Expand Down Expand Up @@ -365,6 +370,7 @@ impl TransportManagerBuilder {
pending_connections: HashMap::new(),
connection_limits: limits::ConnectionLimits::new(self.connection_limits_config),
opening_errors: HashMap::new(),
pending_accept: None,
}
}
}
Expand Down Expand Up @@ -1090,6 +1096,32 @@ impl TransportManager {
/// Poll next event from [`crate::transport::manager::TransportManager`].
pub async fn next(&mut self) -> Option<TransportEvent> {
loop {
// First, check if we have a pending accept future to poll
if let Some((peer, endpoint, mut future)) = self.pending_accept.take() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we "stop" here everything when one connection is waiting for its accept to finish?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is not ok, especially under load. The PR is entirely vibed as PoC for @tdimitrov 🙏

I'll need to rewrite this with some FuturesUnordered and see ask Tsvetomir for another testing round 🙏

match future.as_mut().await {
Ok(()) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?endpoint,
"connection accepted and protocols notified",
);

return Some(TransportEvent::ConnectionEstablished { peer, endpoint });
}
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?endpoint,
?error,
"failed to notify protocols about connection",
);
// If notification failed, we don't emit the ConnectionEstablished event
}
}
}

tokio::select! {
event = self.event_rx.recv() => {
let Some(event) = event else {
Expand Down Expand Up @@ -1270,16 +1302,27 @@ impl TransportManager {
"accept connection",
);

let _ = self
match self
.transports
.get_mut(&transport)
.expect("transport to exist")
.accept(endpoint.connection_id());

return Some(TransportEvent::ConnectionEstablished {
peer,
endpoint,
});
.accept(endpoint.connection_id())
{
Ok(future) => {
// Store the accept future to be polled in the next iteration
// This ensures protocols are notified before we emit ConnectionEstablished
self.pending_accept = Some((peer, endpoint, future));
}
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?endpoint,
?error,
"failed to accept connection",
);
}
}
}
Ok(ConnectionEstablishedResult::Reject) => {
tracing::trace!(
Expand Down
12 changes: 10 additions & 2 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use crate::{error::DialError, transport::manager::TransportHandle, types::ConnectionId, PeerId};

use futures::Stream;
use futures::{future::BoxFuture, Stream};
use hickory_resolver::TokioResolver;
use multiaddr::Multiaddr;

Expand Down Expand Up @@ -194,7 +194,15 @@ pub(crate) trait Transport: Stream + Unpin + Send {
fn dial(&mut self, connection_id: ConnectionId, address: Multiaddr) -> crate::Result<()>;

/// Accept negotiated connection.
fn accept(&mut self, connection_id: ConnectionId) -> crate::Result<()>;
///
/// Returns a future that completes when the connection has been fully established
/// and all installed protocols have been notified via their event channels.
/// This ensures that by the time the caller receives a ConnectionEstablished event,
/// protocols are ready to handle substream operations.
fn accept(
&mut self,
connection_id: ConnectionId,
) -> crate::Result<BoxFuture<'static, crate::Result<()>>>;

/// Accept pending connection.
fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()>;
Expand Down
9 changes: 3 additions & 6 deletions src/transport/quic/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,9 @@ impl QuicConnection {
})
}

/// Start event loop for [`QuicConnection`].
pub async fn start(mut self) -> crate::Result<()> {
self.protocol_set
.report_connection_established(self.peer, self.endpoint.clone())
.await?;

/// Start the connection event loop without notifying protocols.
/// This is used when protocols have already been notified during accept().
pub(crate) async fn start(mut self) -> crate::Result<()> {
loop {
tokio::select! {
event = self.connection.accept_bi() => match event {
Expand Down
43 changes: 28 additions & 15 deletions src/transport/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,35 +306,48 @@ impl Transport for QuicTransport {
Ok(())
}

fn accept(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
fn accept(
&mut self,
connection_id: ConnectionId,
) -> crate::Result<BoxFuture<'static, crate::Result<()>>> {
let (connection, endpoint) = self
.pending_open
.remove(&connection_id)
.ok_or(Error::ConnectionDoesntExist(connection_id))?;
let bandwidth_sink = self.context.bandwidth_sink.clone();
let protocol_set = self.context.protocol_set(connection_id);
let mut protocol_set = self.context.protocol_set(connection_id);
let substream_open_timeout = self.config.substream_open_timeout;
let executor = self.context.executor.clone();

tracing::trace!(
target: LOG_TARGET,
?connection_id,
"start connection",
);

self.context.executor.run(Box::pin(async move {
let _ = QuicConnection::new(
connection.peer,
endpoint,
connection.connection,
protocol_set,
bandwidth_sink,
substream_open_timeout,
)
.start()
.await;
}));
let peer = connection.peer;
let endpoint_clone = endpoint.clone();

Ok(Box::pin(async move {
// First, notify all protocols about the connection establishment
protocol_set.report_connection_established(peer, endpoint_clone).await?;

// After protocols are notified, spawn the connection event loop
executor.run(Box::pin(async move {
let _ = QuicConnection::new(
peer,
endpoint,
connection.connection,
protocol_set,
bandwidth_sink,
substream_open_timeout,
)
.start()
.await;
}));

Ok(())
Ok(())
}))
}

fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
Expand Down
7 changes: 2 additions & 5 deletions src/transport/tcp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,9 @@ impl TcpConnection {
}
}

/// Start connection event loop.
/// Start the connection event loop without notifying protocols.
/// This is used when protocols have already been notified during accept().
pub(crate) async fn start(mut self) -> crate::Result<()> {
self.protocol_set
.report_connection_established(self.peer, self.endpoint.clone())
.await?;

loop {
tokio::select! {
substream = self.connection.next() => {
Expand Down
48 changes: 31 additions & 17 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,37 +389,51 @@ impl Transport for TcpTransport {
)
}

fn accept(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
fn accept(
&mut self,
connection_id: ConnectionId,
) -> crate::Result<BoxFuture<'static, crate::Result<()>>> {
let context = self
.pending_open
.remove(&connection_id)
.ok_or(Error::ConnectionDoesntExist(connection_id))?;
let protocol_set = self.context.protocol_set(connection_id);
let mut protocol_set = self.context.protocol_set(connection_id);
let bandwidth_sink = self.context.bandwidth_sink.clone();
let next_substream_id = self.context.next_substream_id.clone();
let executor = self.context.executor.clone();

tracing::trace!(
target: LOG_TARGET,
?connection_id,
"start connection",
);

self.context.executor.run(Box::pin(async move {
if let Err(error) =
TcpConnection::new(context, protocol_set, bandwidth_sink, next_substream_id)
.start()
.await
{
tracing::debug!(
target: LOG_TARGET,
?connection_id,
?error,
"connection exited with error",
);
}
}));
let peer = context.peer();
let endpoint = context.endpoint().clone();

Ok(Box::pin(async move {
// First, notify all protocols about the connection establishment
// This ensures that when the accept() future completes, protocols are ready
protocol_set.report_connection_established(peer, endpoint).await?;

// After protocols are notified, spawn the connection event loop
executor.run(Box::pin(async move {
if let Err(error) =
TcpConnection::new(context, protocol_set, bandwidth_sink, next_substream_id)
.start()
.await
{
tracing::debug!(
target: LOG_TARGET,
?connection_id,
?error,
"connection exited with error",
);
}
}));

Ok(())
Ok(())
}))
}

fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
Expand Down
36 changes: 21 additions & 15 deletions src/transport/webrtc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub struct WebRtcConnection {
peer: PeerId,

/// Endpoint.
endpoint: Endpoint,
_endpoint: Endpoint,

/// Peer address
peer_address: SocketAddr,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl WebRtcConnection {
peer_address,
local_address,
socket,
endpoint,
_endpoint: endpoint,
dgram_rx,
pending_outbound: HashMap::new(),
channels: HashMap::new(),
Expand Down Expand Up @@ -667,19 +667,25 @@ impl WebRtcConnection {
.await;
}

/// Start running event loop of [`WebRtcConnection`].
pub async fn run(mut self) {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
"start webrtc connection event loop",
);

let _ = self
.protocol_set
.report_connection_established(self.peer, self.endpoint.clone())
.await;

// /// Start running event loop of [`WebRtcConnection`].
// pub async fn run(mut self) {
// tracing::trace!(
// target: LOG_TARGET,
// peer = ?self.peer,
// "start webrtc connection event loop",
// );

// let _ = self
// .protocol_set
// .report_connection_established(self.peer, self.endpoint.clone())
// .await;

// self.run_event_loop().await;
// }

Comment on lines +670 to +685
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// /// Start running event loop of [`WebRtcConnection`].
// pub async fn run(mut self) {
// tracing::trace!(
// target: LOG_TARGET,
// peer = ?self.peer,
// "start webrtc connection event loop",
// );
// let _ = self
// .protocol_set
// .report_connection_established(self.peer, self.endpoint.clone())
// .await;
// self.run_event_loop().await;
// }

/// Start the connection event loop without notifying protocols.
/// This is used when protocols have already been notified during accept().
pub async fn run_event_loop(mut self) {
loop {
// poll output until we get a timeout
let timeout = match self.rtc.poll_output().unwrap() {
Expand Down
Loading
Loading