From 5494c345899ce7470323e43134d7189f62b93381 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Thu, 31 Oct 2024 10:29:32 +0100 Subject: [PATCH 1/2] Add some changes that enable mixnet adapter. There's also loads of debug prints and whatnot that will get cleaned up later. --- libp2p/dialer.nim | 38 ++++++++++++++++++++++++---- libp2p/multistream.nim | 4 +++ libp2p/protocols/ping.nim | 9 ++++++- libp2p/protocols/secure/secure.nim | 2 ++ libp2p/stream/connection.nim | 13 ++++------ libp2p/stream/lpstream.nim | 18 +++++++------ libp2p/transports/tcptransport.nim | 9 ++++++- libp2p/transports/transport.nim | 8 ++++-- libp2p/upgrademngrs/muxedupgrade.nim | 2 ++ libp2p/upgrademngrs/upgrade.nim | 4 ++- 10 files changed, 81 insertions(+), 26 deletions(-) diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 81a7c83fd0..6003f125f6 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -54,13 +54,29 @@ proc dialAndUpgrade( address: MultiAddress, dir = Direction.Out, ): Future[Muxer] {.async.} = + echo "\n\n> Dialer::dialAndUpgrade" for transport in self.transports: # for each transport if transport.handles(address): # check if it can dial it trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname let dialed = try: libp2p_total_dial_attempts.inc() - await transport.dial(hostname, address, peerId) + echo "> Dialer::dialAndUpgrade::0" + # echo hostname + # echo address + # echo peerId + echo transport.log() + echo type(transport) + echo type(hostname) + echo type(address) + echo type(peerId) + let dialFut = transport.dial(hostname, address, peerId) + echo "> Dialer::dialAndUpgrade::after-dial" + let myD = await dialFut + echo "< Dialer::dialAndUpgrade::after-dial" + echo myD.shortLog() + echo "<<<< Dialer::dialAndUpgrade::after-dial" + myD except CancelledError as exc: trace "Dialing canceled", description = exc.msg, peerId = peerId.get(default(PeerId)) @@ -70,7 +86,8 @@ proc dialAndUpgrade( description = exc.msg, peerId = peerId.get(default(PeerId)) libp2p_failed_dials.inc() return nil # Try the next address - + echo "> Dialer::dialAndUpgrade::1" + echo dialed.shortLog() libp2p_successful_dials.inc() let mux = @@ -80,6 +97,11 @@ proc dialAndUpgrade( # The if below is more general and might handle other use cases in the future. if dialed.dir != dir: dialed.dir = dir + echo "> Dialer::dialAndUpgrade::2" + echo transport.log() + echo dialed.shortLog() + # TODO: dialed should be MixnetConnectionAdapter + echo "< Dialer::dialAndUpgrade::2" await transport.upgrade(dialed, peerId) except CancelledError as exc: await dialed.close() @@ -97,7 +119,7 @@ proc dialAndUpgrade( # Try other address return nil - + echo "> Dialer::dialAndUpgrade::3" doAssert not isNil(mux), "connection died after upgrade " & $dialed.dir debug "Dial successful", peerId = mux.connection.peerId return mux @@ -170,6 +192,7 @@ proc internalConnect( reuseConnection = true, dir = Direction.Out, ): Future[Muxer] {.async.} = + echo "> Dialer::internalConnect" if Opt.some(self.localPeerId) == peerId: raise newException(CatchableError, "can't dial self!") @@ -184,16 +207,17 @@ proc internalConnect( return mux let slot = self.connManager.getOutgoingSlot(forceDial) + echo "> Dialer::internalConnect::0" let muxed = try: await self.dialAndUpgrade(peerId, addrs, dir) except CatchableError as exc: slot.release() raise exc + echo "> Dialer::internalConnect::1" slot.trackMuxer(muxed) if isNil(muxed): # None of the addresses connected raise newException(DialFailedError, "Unable to establish outgoing link") - try: self.connManager.storeMuxer(muxed) await self.peerStore.identify(muxed) @@ -302,7 +326,7 @@ method dial*( ## create a protocol stream and establish ## a connection if one doesn't exist already ## - + echo "> Dialer::dial" var conn: Muxer stream: Connection @@ -315,14 +339,18 @@ method dial*( await conn.close() try: + echo "> Dialer::0" trace "Dialing (new)", peerId, protos conn = await self.internalConnect(Opt.some(peerId), addrs, forceDial) + echo "> Dialer::1" trace "Opening stream", conn stream = await self.connManager.getStream(conn) + echo "> Dialer::2" if isNil(stream): raise newException(DialFailedError, "Couldn't get muxed stream") + echo "< Dialer::dial" return await self.negotiateStream(stream, protos) except CancelledError as exc: trace "Dial canceled", conn diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 3465404ce3..663153ca4c 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -52,7 +52,9 @@ proc select*( ): Future[string] {.async: (raises: [CancelledError, LPStreamError, MultiStreamError]).} = trace "initiating handshake", conn, codec = Codec ## select a remote protocol + echo "> MultiStreamSelect::select" await conn.writeLp(Codec & "\n") # write handshake + echo "> MultiStreamSelect::select - 0" if proto.len() > 0: trace "selecting proto", conn, proto = proto[0] await conn.writeLp((proto[0] & "\n")) # select proto @@ -67,6 +69,7 @@ proc select*( trace "multistream handshake success", conn if proto.len() == 0: # no protocols, must be a handshake call + echo "< MultiStreamSelect::select - Handshake" return Codec else: s = string.fromBytes(await conn.readLp(MsgSize)) # read the first proto @@ -75,6 +78,7 @@ proc select*( if s == proto[0]: trace "successfully selected ", conn, proto = proto[0] conn.protocol = proto[0] + echo "< MultiStreamSelect::select - ", proto[0] return proto[0] elif proto.len > 1: # Try to negotiate alternatives diff --git a/libp2p/protocols/ping.nim b/libp2p/protocols/ping.nim index ff424129e4..c57fc12d14 100644 --- a/libp2p/protocols/ping.nim +++ b/libp2p/protocols/ping.nim @@ -52,6 +52,7 @@ proc new*( method init*(p: Ping) = proc handle(conn: Connection, proto: string) {.async.} = + echo "######### Before Ping #########" try: trace "handling ping", conn var buf: array[PingSize, byte] @@ -64,13 +65,15 @@ method init*(p: Ping) = raise exc except CatchableError as exc: trace "exception in ping handler", description = exc.msg, conn + echo "######### After Ping #########" p.handler = handle p.codec = PingCodec proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} = ## Sends ping to `conn`, returns the delay - + echo "######### Pinging #########" + echo conn.shortLog() trace "initiating ping", conn var @@ -82,9 +85,12 @@ proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} = let startTime = Moment.now() trace "sending ping", conn + echo "# Before Write. Is conn closed? ", conn.isClosed, conn.isEof await conn.write(@randomBuf) + echo "# After Write. Is conn closed? ", conn.isClosed, conn.isEof await conn.readExactly(addr resultBuf[0], PingSize) + echo "# After Read. Is conn closed? ", conn.isClosed let responseDur = Moment.now() - startTime @@ -95,4 +101,5 @@ proc ping*(p: Ping, conn: Connection): Future[Duration] {.async, public.} = raise newException(WrongPingAckError, "Incorrect ping data from peer!") trace "valid ping response", conn + echo "######### Pinged #########" return responseDur diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 5fc2e1b194..29458494bd 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -148,6 +148,7 @@ method init*(s: Secure) = try: # We don't need the result but we # definitely need to await the handshake + echo "Secure::handle" discard await s.handleConn(conn, false, Opt.none(PeerId)) trace "connection secured", conn except CancelledError as exc: @@ -165,6 +166,7 @@ method secure*( ): Future[Connection] {. async: (raises: [CancelledError, LPStreamError], raw: true), base .} = + echo "> Secure::secure" s.handleConn(conn, conn.dir == Direction.Out, peerId) method readOnce*( diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index e076175364..125972f2c8 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -40,14 +40,11 @@ type proc timeoutMonitor(s: Connection) {.async: (raises: []).} -func shortLog*(conn: Connection): string = - try: - if conn == nil: - "Connection(nil)" - else: - &"{shortLog(conn.peerId)}:{conn.oid}" - except ValueError as exc: - raiseAssert(exc.msg) +method shortLog*(conn: Connection): string {.raises: [].} = + if conn == nil: + "Connection(nil)" + else: + &"{shortLog(conn.peerId)}:{conn.oid}:{conn.protocol}" chronicles.formatIt(Connection): shortLog(it) diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index bfa842e417..c3832c8270 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -112,7 +112,7 @@ method initStream*(s: LPStream) {.base.} = trackCounter(s.objName) trace "Stream created", s, objName = s.objName, dir = $s.dir -proc join*( +method join*( s: LPStream ): Future[void] {.async: (raises: [CancelledError], raw: true), public.} = ## Wait for the stream to be closed @@ -134,9 +134,10 @@ method readOnce*( ## available raiseAssert("Not implemented!") -proc readExactly*( +method readExactly*( s: LPStream, pbytes: pointer, nbytes: int ): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} = + # echo "readExactly. Is conn closed? ", s.isClosed ## Waits for `nbytes` to be available, then read ## them and return them if s.atEof: @@ -159,9 +160,9 @@ proc readExactly*( if read == 0: doAssert s.atEof() - trace "couldn't read all bytes, stream EOF", s, nbytes, read # Re-readOnce to raise a more specific error than EOF # Raise EOF if it doesn't raise anything(shouldn't happen) + # echo "readExactly3. Is conn closed? ", s.isClosed discard await s.readOnce(addr pbuffer[read], nbytes - read) warn "Read twice while at EOF" raise newLPStreamEOFError() @@ -170,7 +171,7 @@ proc readExactly*( trace "couldn't read all bytes, incomplete data", s, nbytes, read raise newLPStreamIncompleteError() -proc readLine*( +method readLine*( s: LPStream, limit = 0, sep = "\r\n" ): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} = ## Reads up to `limit` bytes are read, or a `sep` is found @@ -198,7 +199,7 @@ proc readLine*( if len(result) == lim: break -proc readVarint*( +method readVarint*( conn: LPStream ): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} = var buffer: array[10, byte] @@ -217,7 +218,7 @@ proc readVarint*( if true: # can't end with a raise apparently raise (ref InvalidVarintError)(msg: "Cannot parse varint") -proc readLp*( +method readLp*( s: LPStream, maxSize: int ): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} = ## read length prefixed msg, with the length encoded as a varint @@ -243,7 +244,7 @@ method write*( # Write `msg` to stream, waiting for the write to be finished raiseAssert("Not implemented!") -proc writeLp*( +method writeLp*( s: LPStream, msg: openArray[byte] ): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} = ## Write `msg` with a varint-encoded length prefix @@ -253,7 +254,7 @@ proc writeLp*( buf[vbytes.len ..< buf.len] = msg s.write(buf) -proc writeLp*( +method writeLp*( s: LPStream, msg: string ): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} = writeLp(s, msg.toOpenArrayByte(0, msg.high)) @@ -305,6 +306,7 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async: (raises: []), public.} = ## trace "Closing with EOF", s + echo "> Closing with EOF: ", s.shortLog() if s.closedWithEOF: trace "Already closed" return diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index f4312cc366..b2a9137fbf 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -226,6 +226,7 @@ method accept*(self: TcpTransport): Future[Connection] = proc impl( self: TcpTransport ): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} = + echo "> TcpTransport::accept" proc cancelAcceptFuts() = for fut in self.acceptFuts: if not fut.completed(): @@ -239,19 +240,23 @@ method accept*(self: TcpTransport): Future[Connection] = elif self.acceptFuts.len == 0: # Holds futures representing ongoing accept calls on multiple servers. self.acceptFuts = self.servers.mapIt(it.accept()) - + echo "> TcpTransport::accept - 0" let finished = try: # Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers. + echo "###############################################################" await one(self.acceptFuts) except ValueError: + echo "> TcpTransport::accept - 02" raiseAssert "Accept futures should not be empty" except CancelledError as exc: + echo "> TcpTransport::accept - 03" cancelAcceptFuts() raise exc index = self.acceptFuts.find(finished) + echo "> TcpTransport::accept - 1" # A new connection has been accepted. The corresponding server should immediately start accepting another connection. # Thus we replace the completed future with a new one by calling accept on the same server again. self.acceptFuts[index] = self.servers[index].accept() @@ -274,6 +279,7 @@ method accept*(self: TcpTransport): Future[Connection] = cancelAcceptFuts() raise exc + echo "> TcpTransport::accept - 2" if not self.running: # Stopped while waiting await transp.closeWait() raise newTransportClosedError() @@ -289,6 +295,7 @@ method accept*(self: TcpTransport): Future[Connection] = let observedAddr = MultiAddress.init(remote).expect("Can initialize from remote address") + echo "- TcpTransport::accept" self.connHandler(transp, Opt.some(observedAddr), Direction.In) impl(self) diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 94c605eb72..1ede801a18 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -36,6 +36,9 @@ type upgrader*: Upgrade networkReachability*: NetworkReachability +method log*(self: Transport): string {.base, gcsafe.} = + "" + proc newTransportClosedError*(parent: ref Exception = nil): ref TransportError = newException(TransportClosedError, "Transport closed, no more connections!", parent) @@ -69,10 +72,10 @@ method dial*( ): Future[Connection] {.base, gcsafe.} = ## dial a peer ## - + echo "Transport::dial" doAssert(false, "Not implemented!") -proc dial*( +method dial*( self: Transport, address: MultiAddress, peerId: Opt[PeerId] = Opt.none(PeerId) ): Future[Connection] {.gcsafe.} = self.dial("", address) @@ -83,6 +86,7 @@ method upgrade*( ## base upgrade method that the transport uses to perform ## transport specific upgrades ## + echo "> Transport::upgrade" self.upgrader.upgrade(conn, peerId) method handles*(self: Transport, address: MultiAddress): bool {.base, gcsafe.} = diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim index a096791cc5..f47330a5c3 100644 --- a/libp2p/upgrademngrs/muxedupgrade.nim +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -68,6 +68,8 @@ method upgrade*( ): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = trace "Upgrading connection", conn, direction = conn.dir + echo "> MuxedUpgrade::upgrade" + echo "-----" let sconn = await self.secure(conn, peerId) # secure the connection if sconn == nil: raise (ref UpgradeFailedError)(msg: "unable to secure connection, stopping upgrade") diff --git a/libp2p/upgrademngrs/upgrade.nim b/libp2p/upgrademngrs/upgrade.nim index bc5b10de1e..8dc39a3369 100644 --- a/libp2p/upgrademngrs/upgrade.nim +++ b/libp2p/upgrademngrs/upgrade.nim @@ -47,9 +47,10 @@ method upgrade*( ): Future[Muxer] {.async: (raises: [CancelledError, LPError], raw: true), base.} = raiseAssert("Not implemented!") -proc secure*( +method secure*( self: Upgrade, conn: Connection, peerId: Opt[PeerId] ): Future[Connection] {.async: (raises: [CancelledError, LPError]).} = + echo "> Upgrade::secure" if self.secureManagers.len <= 0: raise (ref UpgradeFailedError)(msg: "No secure managers registered!") @@ -68,4 +69,5 @@ proc secure*( # let's avoid duplicating checks but detect if it fails to do it properly doAssert(secureProtocol.len > 0) + echo "> Upgrade::secure - 0" await secureProtocol[0].secure(conn, peerId) From 3fbfb8e19c9b54d46e5d3e3f26364ef84f905e43 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Thu, 31 Oct 2024 12:18:57 +0100 Subject: [PATCH 2/2] Update quic dependency to fix an error that impacts mixnet --- libp2p.nimble | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p.nimble b/libp2p.nimble index 1aff488916..d4e630158f 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -11,7 +11,7 @@ requires "nim >= 1.6.0", "nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5", "chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew#head", "websock", "unittest2", - "https://github.com/status-im/nim-quic.git#ddcb31ffb74b5460ab37fd13547eca90594248bc" + "https://github.com/status-im/nim-quic.git#0e4677b3e8cafdaaaba52de59164a8e64ed3906e" let nimc = getEnv("NIMC", "nim") # Which nim compiler to use let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)