Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions libp2p/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2962,6 +2962,18 @@ class ITransport(ABC):

Provides methods for dialing peers and creating listeners on a transport.

Optional capability attributes (see libp2p.capabilities):

- provides_secure: bool — if True, dial() returns connections that are
already secure; the upgrader should not add a security layer.

- provides_muxed: bool — if True, dial() returns connections that are
already multiplexed (implement IMuxedConn); the upgrader should not
add a muxer layer.

Default is False when absent. Transports like TCP do not set these;
transports like QUIC set both to True.

"""

@abstractmethod
Expand Down
54 changes: 54 additions & 0 deletions libp2p/capabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Transport and connection capabilities for capability-based dispatch.

Transports and connections may declare optional capability attributes so that
the core (swarm, host) can treat them uniformly without isinstance checks:

- provides_secure: bool — this transport/connection already provides security
(e.g. QUIC with built-in TLS). When True, the upgrader should not add a
security layer.
- provides_muxed: bool — this transport/connection already provides multiplexing
(e.g. QUIC with native streams). When True, the upgrader should not add a
muxer layer, and the "raw" connection is already an IMuxedConn.

Default is False for both when the attribute is absent, so existing transports
(TCP, WebSocket, etc.) are unchanged.
"""

from typing import Any


def transport_provides_secure_and_muxed(transport: Any) -> bool:
"""
Return True if the transport produces connections that are already
secure and muxed (e.g. QUIC). Used to skip security/muxer upgrade.
"""
return bool(
getattr(transport, "provides_secure", False)
and getattr(transport, "provides_muxed", False)
)


def connection_provides_muxed(muxed_conn: Any) -> bool:
"""
Return True if the muxed connection was provided directly by the transport
(already muxed), e.g. QUIC. Used for host/swarm logic that differs for
native-muxed vs upgraded connections.
"""
return bool(getattr(muxed_conn, "provides_muxed", False))


def connection_needs_establishment_wait(muxed_conn: Any) -> bool:
"""
Return True if the muxed connection has an establishment phase and
_connected_event to wait on (e.g. QUIC). If True, caller should
await muxed_conn._connected_event.wait() when not yet established.
"""
event = getattr(muxed_conn, "_connected_event", None)
if event is None:
return False
established = getattr(muxed_conn, "is_established", True)
# is_established may be a property
if callable(established):
established = established()
return not established
7 changes: 4 additions & 3 deletions libp2p/host/basic_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
IPeerStore,
IRawConnection,
)
from libp2p.capabilities import connection_provides_muxed
from libp2p.crypto.keys import (
PrivateKey,
PublicKey,
Expand Down Expand Up @@ -92,7 +93,6 @@
from libp2p.tools.async_service import (
background_trio_service,
)
from libp2p.transport.quic.connection import QUICConnection
import libp2p.utils.paths
from libp2p.utils.varint import (
read_length_prefixed_protobuf,
Expand Down Expand Up @@ -749,7 +749,7 @@ async def new_stream(
# Get registry stats if QUIC connection
# Try to get stats from server listener (for server-side connections)
# or from client transport's listeners (if available)
if connection_type == "QUICConnection" and hasattr(
if connection_provides_muxed(muxed_conn) and hasattr(
muxed_conn, "_transport"
):
transport = getattr(muxed_conn, "_transport", None)
Expand Down Expand Up @@ -1050,7 +1050,8 @@ def _get_first_connection(self, peer_id: ID) -> INetConn | None:
return None

def _is_quic_muxer(self, muxed_conn: IMuxedConn | None) -> bool:
return isinstance(muxed_conn, QUICConnection)
"""True if connection is natively muxed (e.g. QUIC), not upgraded."""
return connection_provides_muxed(muxed_conn) if muxed_conn else False

def _should_identify_peer(self, peer_id: ID) -> bool:
connection = self._get_first_connection(peer_id)
Expand Down
53 changes: 30 additions & 23 deletions libp2p/network/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
ISecureConn,
ITransport,
)
from libp2p.capabilities import (
connection_needs_establishment_wait,
transport_provides_secure_and_muxed,
)
from libp2p.custom_types import (
StreamHandlerFn,
)
Expand Down Expand Up @@ -58,8 +62,6 @@
SecurityUpgradeFailure,
)
from libp2p.transport.quic.config import QUICTransportConfig
from libp2p.transport.quic.connection import QUICConnection
from libp2p.transport.quic.transport import QUICTransport
from libp2p.transport.upgrader import (
TransportUpgrader,
)
Expand Down Expand Up @@ -200,13 +202,10 @@ async def run(self) -> None:

# Set background nursery BEFORE setting the event
# This ensures transports have the nursery when they check
if isinstance(self.transport, QUICTransport):
self.transport.set_background_nursery(nursery)
self.transport.set_swarm(self)
elif hasattr(self.transport, "set_background_nursery"):
# WebSocket transport also needs background nursery
# for connection management
if hasattr(self.transport, "set_background_nursery"):
self.transport.set_background_nursery(nursery) # type: ignore[attr-defined]
if hasattr(self.transport, "set_swarm"):
self.transport.set_swarm(self) # type: ignore[attr-defined]

# Set event after background nursery is configured
# This ensures transports have the nursery when they check the event
Expand Down Expand Up @@ -647,11 +646,12 @@ async def _dial_addr_single_attempt(self, addr: Multiaddr, peer_id: ID) -> INetC
pass
raise SwarmException(f"Unexpected error dialing peer {peer_id}") from e

if isinstance(self.transport, QUICTransport) and isinstance(
if transport_provides_secure_and_muxed(self.transport) and isinstance(
raw_conn, IMuxedConn
):
logger.info(
"Skipping upgrade for QUIC, QUIC connections are already multiplexed"
"Connection is secured and muxed by transport; "
"skipping security and muxer upgrade"
)
try:
swarm_conn = await self.add_conn(raw_conn, direction="outbound")
Expand Down Expand Up @@ -886,7 +886,10 @@ async def new_stream(self, peer_id: ID) -> INetStream:
f"Failed to get a valid connection for peer {peer_id}"
)

if isinstance(self.transport, QUICTransport) and connection is not None:
if (
transport_provides_secure_and_muxed(self.transport)
and connection is not None
):
conn = cast("SwarmConn", connection)
try:
stream = await conn.new_stream()
Expand Down Expand Up @@ -1115,14 +1118,17 @@ async def conn_handler(
pass
return

# No need to upgrade QUIC Connection
if isinstance(self.transport, QUICTransport):
# No need to upgrade: transport provides secure+muxed connection
if transport_provides_secure_and_muxed(self.transport) and isinstance(
read_write_closer, IMuxedConn
):
try:
quic_conn = cast(QUICConnection, read_write_closer)
await self.add_conn(quic_conn, direction="inbound")
peer_id = quic_conn.peer_id
logger.debug(
f"successfully opened quic connection to peer {peer_id}"
await self.add_conn(read_write_closer, direction="inbound")
peer_id = read_write_closer.peer_id
logger.info(
"Inbound connection is secured and muxed; "
"skipping upgrade for peer %s",
peer_id,
)
# NOTE: This is a intentional barrier to prevent from the
# handler exiting and closing the connection.
Expand Down Expand Up @@ -1461,7 +1467,7 @@ async def add_conn(
raise SwarmException(
"Connection denied by resource manager: resource limit exceeded"
)
# QUICConnection provides a hook to set scope and ensure cleanup
# Some muxed connections provide a hook to set scope and ensure cleanup
if hasattr(muxed_conn, "set_resource_scope"):
# Type ignore: we've checked the attribute exists
muxed_conn.set_resource_scope(conn_scope) # type: ignore
Expand Down Expand Up @@ -1511,10 +1517,11 @@ async def add_conn(
logger.debug("Swarm::add_conn | starting muxed connection")
self.manager.run_task(muxed_conn.start)
await muxed_conn.event_started.wait()
# For QUIC connections, also verify connection is established
if isinstance(muxed_conn, QUICConnection):
if not muxed_conn.is_established:
await muxed_conn._connected_event.wait()
# For connections that have an establishment phase, wait until ready
if connection_needs_establishment_wait(muxed_conn):
event = getattr(muxed_conn, "_connected_event", None)
if event is not None:
await event.wait()
logger.debug("Swarm::add_conn | starting swarm connection")
self.manager.run_task(swarm_conn.start)
await swarm_conn.event_started.wait()
Expand Down
8 changes: 8 additions & 0 deletions libp2p/transport/quic/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class QUICConnection(IRawConnection, IMuxedConn):
QUIC natively provides stream multiplexing, so this connection acts as both
a raw connection (for transport layer) and muxed connection (for upper layers).

Capability flags (see libp2p.capabilities): provides_secure and provides_muxed
are True so the core treats this as a top-of-stack connection without
adding security/muxer layers.

Features:
- Native QUIC stream multiplexing
- Integrated libp2p TLS security with peer identity verification
Expand All @@ -63,6 +67,10 @@ class QUICConnection(IRawConnection, IMuxedConn):
- COMPLETE connection ID management (fixes the original issue)
"""

# Capability flags for capability-based dispatch (libp2p.capabilities)
provides_secure: bool = True
provides_muxed: bool = True

def __init__(
self,
quic_connection: QuicConnection,
Expand Down
6 changes: 4 additions & 2 deletions libp2p/transport/quic/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,8 +1215,10 @@ async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool:
self._socket = await self._create_socket(host, port)
self._nursery = active_nursery

# Get the actual bound address
bound_host, bound_port = self._socket.getsockname()
# Get the actual bound address (IPv4: 2-tuple, IPv6: 4-tuple)
sockname = self._socket.getsockname()
bound_host = sockname[0]
bound_port = sockname[1]
quic_version = multiaddr_to_quic_version(maddr)
bound_maddr = create_quic_multiaddr(bound_host, bound_port, quic_version)
self._bound_addresses = [bound_maddr]
Expand Down
7 changes: 6 additions & 1 deletion libp2p/transport/quic/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@

class QUICTransport(ITransport):
"""
QUIC Stream implementation following libp2p IMuxedStream interface.
QUIC transport. Produces connections that are already secure (TLS) and
muxed (native streams), so the upgrader skips security and muxer layers.
"""

# Capability flags: this transport produces secure+muxed connections
provides_secure: bool = True
provides_muxed: bool = True

def __init__(
self,
private_key: PrivateKey,
Expand Down
1 change: 1 addition & 0 deletions newsfragments/395.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Transport and connection behavior is now driven by capability flags instead of concrete types. Transports can declare ``provides_secure`` and ``provides_muxed``; the swarm skips security and muxer upgrade when both are true (e.g. QUIC). Added ``libp2p.capabilities`` helpers, INFO logs when upgrade is skipped, QUIC listener fix for IPv6 ``getsockname()``, and transport tests for TCP/QUIC capability flags.
5 changes: 5 additions & 0 deletions tests/core/transport/quic/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def test_transport_initialization(self, transport):
assert not transport._closed
assert len(transport._quic_configs) >= 1

def test_quic_transport_capability_flags(self, transport: QUICTransport):
"""QUIC sets provides_secure and provides_muxed for capability dispatch."""
assert getattr(transport, "provides_secure", False) is True
assert getattr(transport, "provides_muxed", False) is True

def test_quic_transport_forwards_enable_autotls_to_security_factory(
self, private_key
):
Expand Down
72 changes: 72 additions & 0 deletions tests/core/transport/test_capabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Tests for transport/connection capability flags and helpers.

Verifies that TCP does not declare provides_secure/provides_muxed, that the
capability helpers behave correctly, and that QUIC transport has the flags
(set in tests/core/transport/quic/test_transport.py).
"""

from libp2p.capabilities import (
connection_provides_muxed,
transport_provides_secure_and_muxed,
)
from libp2p.transport.tcp.tcp import TCP


def test_tcp_transport_has_no_capability_flags():
"""TCP does not declare provides_secure or provides_muxed (default upgrade path)."""
transport = TCP()
assert getattr(transport, "provides_secure", False) is False
assert getattr(transport, "provides_muxed", False) is False


def test_transport_provides_secure_and_muxed_false_for_tcp():
"""transport_provides_secure_and_muxed returns False for TCP."""
transport = TCP()
assert transport_provides_secure_and_muxed(transport) is False


def test_transport_provides_secure_and_muxed_false_for_plain_object():
"""transport_provides_secure_and_muxed returns False for object without flags."""
assert transport_provides_secure_and_muxed(object()) is False


def test_transport_provides_secure_and_muxed_true_when_both_set():
"""transport_provides_secure_and_muxed is True only when both flags are True."""

class WithBoth:
provides_secure = True
provides_muxed = True

assert transport_provides_secure_and_muxed(WithBoth()) is True

class OnlySecure:
provides_secure = True
provides_muxed = False

assert transport_provides_secure_and_muxed(OnlySecure()) is False

class OnlyMuxed:
provides_secure = False
provides_muxed = True

assert transport_provides_secure_and_muxed(OnlyMuxed()) is False


def test_connection_provides_muxed_false_for_none():
"""connection_provides_muxed returns False for None."""
assert connection_provides_muxed(None) is False


def test_connection_provides_muxed_false_without_flag():
"""connection_provides_muxed returns False when provides_muxed is not set."""
assert connection_provides_muxed(object()) is False


def test_connection_provides_muxed_true_when_set():
"""connection_provides_muxed returns True when provides_muxed is True."""

class Conn:
provides_muxed = True

assert connection_provides_muxed(Conn()) is True
Loading