Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions libp2p/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ async def close(self) -> None:

"""

@property
@abstractmethod
def is_established(self) -> bool:
"""
Check if the connection is fully established and ready for streams.

:return: True if the connection is established, otherwise False.
"""

@property
@abstractmethod
def is_closed(self) -> bool:
Expand Down
11 changes: 11 additions & 0 deletions libp2p/network/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,10 +1550,21 @@ async def add_conn(
logger.debug("Swarm::add_conn | starting muxed connection")
self.manager.run_task(muxed_conn.start)
await muxed_conn.event_started.wait()
logger.debug(
f"Swarm::add_conn | event_started received for peer {muxed_conn.peer_id}"
)
# Verify connection is fully established before proceeding.
# For QUIC connections, wait for the connected event.
# For other muxers (like Yamux/Mplex), check the is_established property.
# For QUIC connections, also verify connection is established
if isinstance(muxed_conn, QUICConnection):
if not muxed_conn.is_established:
await muxed_conn._connected_event.wait()
elif not muxed_conn.is_established:
logger.warning(
f"Swarm::add_conn | muxer event_started set but "
f"is_established=False for peer {muxed_conn.peer_id}"
)
logger.debug("Swarm::add_conn | starting swarm connection")
self.manager.run_task(swarm_conn.start)
await swarm_conn.event_started.wait()
Expand Down
19 changes: 19 additions & 0 deletions libp2p/stream_muxer/mplex/mplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class Mplex(IMuxedConn):
event_closed: trio.Event
event_started: trio.Event
on_close: Callable[[], Awaitable[Any]] | None
_established: bool

def __init__(
self,
Expand Down Expand Up @@ -109,6 +110,23 @@ def __init__(
self.event_closed = trio.Event()
self.event_started = trio.Event()
self.on_close = on_close
self._established = False

@property
def is_established(self) -> bool:
"""
Check if the Mplex connection is fully established and ready for streams.

Returns True when:
- The event_started has been set
- The handle_incoming task is actively running
- The connection is not shutting down
"""
return (
self._established
and self.event_started.is_set()
and not self.event_shutting_down.is_set()
)

async def start(self) -> None:
await self.handle_incoming()
Expand Down Expand Up @@ -221,6 +239,7 @@ async def handle_incoming(self) -> None:
Read a message off of the secured connection and add it to the
corresponding message buffer.
"""
self._established = True
self.event_started.set()
while True:
try:
Expand Down
40 changes: 39 additions & 1 deletion libp2p/stream_muxer/yamux/yamux.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,23 @@ def __init__(
self.stream_buffers: dict[int, bytearray] = {}
self.stream_events: dict[int, trio.Event] = {}
self._nursery: Nursery | None = None
self._established: bool = False

@property
def is_established(self) -> bool:
"""
Check if the Yamux connection is fully established and ready for streams.

Returns True when:
- The event_started has been set
- The handle_incoming task is actively running
- The connection is not shutting down
"""
return (
self._established
and self.event_started.is_set()
and not self.event_shutting_down.is_set()
)

async def start(self) -> None:
logger.debug(f"Starting Yamux for {self.peer_id}")
Expand All @@ -463,8 +480,12 @@ async def start(self) -> None:
logger.debug(
f"Yamux.start() starting handle_incoming task for {self.peer_id}"
)
nursery.start_soon(self.handle_incoming)
# Use nursery.start() to ensure handle_incoming has started
# before we set event_started. This prevents race conditions
# where streams are opened before the muxer is ready.
await nursery.start(self._handle_incoming_with_ready_signal)
logger.debug(f"Yamux.start() setting event_started for {self.peer_id}")
self._established = True
self.event_started.set()
logger.debug(
f"Yamux.start() exiting for {self.peer_id}, closing new stream channel"
Expand Down Expand Up @@ -691,6 +712,23 @@ async def read_stream(self, stream_id: int, n: int = -1) -> bytes:
# This line should never be reached, but satisfies the type checker
raise MuxedStreamEOF("Unexpected end of read_stream")

async def _handle_incoming_with_ready_signal(
self, task_status: Any = trio.TASK_STATUS_IGNORED
) -> None:
"""
Wrapper for handle_incoming that signals when the task is ready.

This method uses trio's task_status to signal that the handle_incoming
loop is ready to process frames. This prevents race conditions where
streams are opened before the muxer is ready to handle them.
"""
logger.debug(
f"Yamux _handle_incoming_with_ready_signal() starting for "
f"peer {self.peer_id}"
)
task_status.started()
await self.handle_incoming()

async def handle_incoming(self) -> None:
logger.debug(f"Yamux handle_incoming() started for peer {self.peer_id}")
while not self.event_shutting_down.is_set():
Expand Down
1 change: 1 addition & 0 deletions newsfragments/1128.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed DHT regression on v0.5.0 where streams failed to open to bootstrap peers due to a race condition in stream muxer initialization.
Loading