diff --git a/nim-peer/src/nim_peer.nim b/nim-peer/src/nim_peer.nim index 34e16c3e..624390be 100644 --- a/nim-peer/src/nim_peer.nim +++ b/nim-peer/src/nim_peer.nim @@ -1,9 +1,13 @@ {.push raises: [Exception].} import tables, deques, strutils, os, streams +import std/sets import libp2p, chronos, cligen, chronicles -from libp2p/protocols/pubsub/rpc/message import Message +import libp2p/protocols/kademlia +import libp2p/protocols/pubsub/rpc/message as pubsub_message +import libp2p/nameresolving/dnsresolver +import libp2p/nameresolving/nameresolver from illwave as iw import nil, `[]`, `[]=`, `==`, width, height from terminal import nil @@ -17,6 +21,9 @@ const PeerIdFile: string = "local.peerid" MaxKeyLen: int = 4096 ListenPort: int = 9093 + DiscoveryInterval = 10.seconds + KadBootstrapPeerAddrs = + ["/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"] proc cleanup() {.noconv: (raises: []).} = try: @@ -76,9 +83,80 @@ proc loadOrCreateKey(rng: var HmacDrbgContext): PrivateKey = echo "Could not create new key" quit(1) +proc roomToKadKey(room: string): Opt[Key] {.raises: [].} = + var roomBytes = newSeq[byte](room.len) + for i, ch in room: + roomBytes[i] = byte(ord(ch)) + let digest = MultiHash.digest("sha2-256", roomBytes).valueOr: + error "Could not derive Kad-DHT key for room", room = room, description = error + return Opt.none(Key) + Opt.some(digest.toKey()) + +proc kadBootstrapNodes( + resolver: DnsResolver +): Future[seq[(PeerId, seq[MultiAddress])]] {.async.} = + const PeerIdTag = "/p2p/" + + for addr in KadBootstrapPeerAddrs: + let tagPos = addr.rfind(PeerIdTag) + if tagPos < 0: + error "Missing /p2p/ segment", address = addr + continue + + let peerIdStr = addr[tagPos + PeerIdTag.len .. ^1] + let peerId = PeerId.init(peerIdStr).valueOr: + error "Invalid peer id", peerId = peerIdStr + continue + + let baseAddr = addr[0 ..< tagPos] + + let maddr = MultiAddress.init(baseAddr).valueOr: + error "Invalid multiaddr", address = baseAddr + continue + + result.add((peerId, @[maddr])) + +proc discoverPeersWithKad( + switch: Switch, kad: KadDHT, room: string +) {.async: (raises: []).} = + let roomKey = roomToKadKey(room).valueOr: + error "Unable to convert room to kad key" + return + + try: + while true: + # announce ourselves as a provider for this room and query for other providers + await kad.addProvider(roomKey) + var providers: HashSet[Provider] + try: + providers = await kad.getProviders(roomKey) + except LPStreamError as exc: + debug "Kad provider lookup stream error", description = exc.msg + await sleepAsync(DiscoveryInterval) + continue + + for provider in providers.items: + let peerId = PeerId.init(provider.id).valueOr: + continue + if peerId == switch.peerInfo.peerId or switch.isConnected(peerId): + continue + if provider.addrs.len == 0: + continue + + try: + await switch.connect(peerId, provider.addrs) + info "Kad Connected to peer via Kad-DHT", peerId = $peerId + except DialFailedError as exc: + debug "Failed to connect to discovered peer", + peerId = $peerId, description = exc.msg + + await sleepAsync(DiscoveryInterval) + except CancelledError: + discard + proc start( addrs: Opt[MultiAddress], headless: bool, room: string, port: int -) {.async: (raises: [CancelledError]).} = +) {.async.} = # Handle Ctrl+C setControlCHook(cleanup) @@ -89,24 +167,26 @@ proc start( type WriterStr = LogOutputStr # Early (bootstrap) writer: mirror logs to stdout so nothing is dropped - defaultChroniclesStream.output.writer = - proc (lvl: LogLevel, rec: WriterStr) {.closure, gcsafe, raises: [].} = - let s = cast[string](rec) - try: - for line in s.splitLines(): - stdout.writeLine(line) - stdout.flushFile() - except IOError: - discard - + defaultChroniclesStream.output.writer = proc( + lvl: LogLevel, rec: WriterStr + ) {.closure, gcsafe, raises: [].} = + let s = cast[string](rec) + try: + for line in s.splitLines(): + stdout.writeLine(line) + stdout.flushFile() + except IOError: + discard var rng = newRng() + let nameResolver = DnsResolver.new(@[initTAddress("1.1.1.1:53")]) let switch = try: SwitchBuilder .new() .withRng(rng) + .withNameResolver(nameResolver) .withTcpTransport() .withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/" & $port).tryGet()]) .withYamux() @@ -131,8 +211,10 @@ proc start( except InitializationError as exc: echo "Could not initialize gossipsub: " & $exc.msg quit(1) + let kad = KadDHT.new(switch, bootstrapNodes = await kadBootstrapNodes(nameResolver)) try: + switch.mount(kad) switch.mount(gossip) switch.mount(fileExchange) await switch.start() @@ -153,14 +235,13 @@ proc start( except Exception as exc: error "Connection error", description = exc.msg - # wait so that gossipsub can form mesh - await sleepAsync(3.seconds) + asyncSpawn discoverPeersWithKad(switch, kad, room) # topic handlers # chat and file handlers actually need to be validators instead of regular handlers # validators allow us to get information about which peer sent a message let onChatMsg = proc( - topic: string, msg: Message + topic: string, msg: pubsub_message.Message ): Future[ValidationResult] {.async, gcsafe.} = let strMsg = cast[string](msg.data) await recvQ.put(shortPeerId(msg.fromPeer) & ": " & strMsg) @@ -173,10 +254,10 @@ proc start( # when a new file is announced, download it let onNewFile = proc( - topic: string, msg: Message + topic: string, msg: pubsub_message.Message ): Future[ValidationResult] {.async, gcsafe.} = let fileId = sanitizeFileId(cast[string](msg.data)) - # this will only work if we're connected to `fromPeer` (since we don't have kad-dht) + # File transfer still requires a direct stream to the announcing peer. let conn = await switch.dial(msg.fromPeer, FileExchangeCodec) let filePath = getTempDir() / fileId let fileContents = await fileExchange.requestFile(conn, fileId) @@ -201,8 +282,8 @@ proc start( gossip.addValidator(room, onChatMsg) # receive files offerings - gossip.subscribe(ChatFileTopic, nil) - gossip.addValidator(ChatFileTopic, onNewFile) + gossip.subscribe(FileChatTopic, nil) + gossip.addValidator(FileChatTopic, onNewFile) # receive newly connected peers through gossipsub gossip.subscribe(PeerDiscoveryTopic, onNewPeer) @@ -222,7 +303,10 @@ proc start( switch.addPeerEventHandler(onPeerLeft, PeerEventKind.Left) # add already connected peers - for peerId in switch.peerStore[AddressBook].book.keys: + for peerId in switch.connectedPeers(Direction.Out): + await peerQ.put((peerId, PeerEventKind.Joined)) + + for peerId in switch.connectedPeers(Direction.In): await peerQ.put((peerId, PeerEventKind.Joined)) if headless: @@ -245,9 +329,9 @@ proc cli(connect = "", room = ChatTopic, port = ListenPort, headless = false) = if connect.len > 0: addrs = Opt.some(MultiAddress.init(connect).get()) try: - waitFor start(addrs, headless, room, port) - except CancelledError: - echo "Operation cancelled" + waitFor noCancel(start(addrs, headless, room, port)) + except CatchableError as exc: + echo "Operation failed: " & exc.msg when isMainModule: dispatch cli, diff --git a/nim-peer/src/ui/root.nim b/nim-peer/src/ui/root.nim index 2a21ed68..604a15c5 100644 --- a/nim-peer/src/ui/root.nim +++ b/nim-peer/src/ui/root.nim @@ -130,7 +130,7 @@ proc runUI*( copyFile(path, getTempDir().joinPath(fileId)) # publish /tmp/{filename} try: - discard await gossip.publish(ChatFileTopic, cast[seq[byte]](@(fileId))) + discard await gossip.publish(FileChatTopic, cast[seq[byte]](@(fileId))) systemPanel.push("Offering file " & fileId) except Exception as exc: systemPanel.push("Unable to offer file: " & exc.msg) diff --git a/nim-peer/src/utils.nim b/nim-peer/src/utils.nim index 938c94f3..bf73485e 100644 --- a/nim-peer/src/utils.nim +++ b/nim-peer/src/utils.nim @@ -4,8 +4,8 @@ import libp2p const ChatTopic*: string = "universal-connectivity" - ChatFileTopic*: string = "universal-connectivity-file" - PeerDiscoveryTopic*: string = "universal-connectivity-browser-peer-discovery" + FileChatTopic*: string = ChatTopic & "-file" + PeerDiscoveryTopic*: string = ChatTopic & "-browser-peer-discovery" const SanitizationRules = [ ({'\0' .. '\31'}, ' '), # Control chars -> space