diff --git a/examples/pingpong.nim b/examples/pingpong.nim index bfdb67c..6caf000 100644 --- a/examples/pingpong.nim +++ b/examples/pingpong.nim @@ -21,10 +21,6 @@ proc main() {.async.} = var cfg_saro = DefaultConfig() var cfg_raya = DefaultConfig() - # Cross pollinate Peers - No Waku discovery is used in this example - cfg_saro.staticPeers.add(cfg_raya.getMultiAddr()) - cfg_raya.staticPeers.add(cfg_saro.getMultiAddr()) - let sKey = loadPrivateKeyFromBytes(@[45u8, 216, 160, 24, 19, 207, 193, 214, 98, 92, 153, 145, 222, 247, 101, 99, 96, 131, 149, 185, 33, 187, 229, 251, 100, 158, 20, 131, 111, 97, 181, 210]).get() let rKey = loadPrivateKeyFromBytes(@[43u8, 12, 160, 51, 212, 90, 199, 160, 154, 164, 129, 229, 147, 69, 151, 17, 239, 51, 190, 33, 86, 164, 50, 105, 39, 250, 182, 116, 138, 132, 114, 234]).get() @@ -73,7 +69,7 @@ proc main() {.async.} = await saro.start() await raya.start() - await sleepAsync(5.seconds) + await sleepAsync(10.seconds) # Perform OOB Introduction: Raya -> Saro let raya_bundle = raya.createIntroBundle() @@ -81,8 +77,8 @@ proc main() {.async.} = await sleepAsync(20.seconds) # Run for some time - saro.stop() - raya.stop() + await saro.stop() + await raya.stop() when isMainModule: diff --git a/examples/tui/tui.nim b/examples/tui/tui.nim index f49c608..a3b4716 100644 --- a/examples/tui/tui.nim +++ b/examples/tui/tui.nim @@ -106,10 +106,6 @@ proc getSelectedConvo(app: ChatApp): ptr ConvoInfo = proc createChatClient(name: string): Future[Client] {.async.} = var cfg = await getCfg(name) - for key, val in fetchRegistrations(): - if key != name: - cfg.waku.staticPeers.add(val) - result = newClient(cfg.waku, cfg.ident) @@ -124,7 +120,7 @@ proc sendMessage(app: ChatApp, convoInfo: ptr ConvoInfo, msg: string) {.async.} var msgId = "" if convoInfo.convo != nil: - msgId = await convoInfo.convo.sendMessage(app.client.ds, initTextFrame(msg).toContentFrame()) + msgId = await convoInfo.convo.sendMessage(initTextFrame(msg).toContentFrame()) convoInfo[].addMessage(msgId, "You", app.inputBuffer) @@ -490,7 +486,7 @@ proc appLoop(app: ChatApp, panes: seq[Pane]) : Future[void] {.async.} = illwillInit(fullscreen = false) # Clear buffer while true: - await sleepAsync(5.milliseconds) + await sleepAsync(chronos.milliseconds(5)) app.tb.clear() drawStatusBar(app, panes[0], fgBlack, getIdColor(app.client.getId())) @@ -527,7 +523,7 @@ proc appLoop(app: ChatApp, panes: seq[Pane]) : Future[void] {.async.} = proc peerWatch(app: ChatApp): Future[void] {.async.} = while true: - await sleepAsync(1.seconds) + await sleepAsync(chronos.seconds(1)) app.peerCount = app.client.ds.getConnectedPeerCount() diff --git a/src/chat/client.nim b/src/chat/client.nim index ee52046..3d5b374 100644 --- a/src/chat/client.nim +++ b/src/chat/client.nim @@ -229,14 +229,16 @@ proc newPrivateConversation*(client: Client, ################################################# # Payload Handling +# Receives a incoming payload, decodes it, and processes it. ################################################# proc parseMessage(client: Client, msg: ChatPayload) {.raises: [ValueError, SerializationError].} = - ## Receives a incoming payload, decodes it, and processes it. - - let envelope = decode(msg.bytes, WapEnvelopeV1).valueOr: - raise newException(ValueError, "Failed to decode WapEnvelopeV1: " & error) + let envelopeRes = decode(msg.bytes, WapEnvelopeV1) + if envelopeRes.isErr: + debug "Failed to decode WapEnvelopeV1", client = client.getId(), err = envelopeRes.error + return + let envelope = envelopeRes.get() let convo = block: let opt = client.getConversationFromHint(envelope.conversationHint).valueOr: @@ -266,6 +268,11 @@ proc messageQueueConsumer(client: Client) {.async.} = while client.isRunning: let message = await client.inboundQueue.queue.get() + let topicRes = inbox.parseTopic(message.contentTopic).or(private_v1.parseTopic(message.contentTopic)) + if topicRes.isErr: + debug "Invalid content topic", client = client.getId(), err = topicRes.error, contentTopic = message.contentTopic + continue + notice "Inbound Message Received", client = client.getId(), contentTopic = message.contentTopic, len = message.bytes.len() try: @@ -291,7 +298,8 @@ proc start*(client: Client) {.async.} = notice "Client start complete", client = client.getId() -proc stop*(client: Client) = +proc stop*(client: Client) {.async.} = ## Stop the client. + await client.ds.stop() client.isRunning = false notice "Client stopped", client = client.getId() diff --git a/src/chat/conversations/private_v1.nim b/src/chat/conversations/private_v1.nim index 5b31f4a..99a0236 100644 --- a/src/chat/conversations/private_v1.nim +++ b/src/chat/conversations/private_v1.nim @@ -42,6 +42,9 @@ type discriminator: string doubleratchet: naxolotl.Doubleratchet +const + TopicPrefixPrivateV1 = "/convo/private/" + proc getTopic*(self: PrivateV1): string = ## Returns the topic for the PrivateV1 conversation. return self.topic @@ -63,7 +66,18 @@ proc getConvoId*(self: PrivateV1): string = proc derive_topic(participants: seq[PublicKey], discriminator: string): string = ## Derives a topic from the participants' public keys. - return "/convo/private/" & getConvoIdRaw(participants, discriminator) + return TopicPrefixPrivateV1 & getConvoIdRaw(participants, discriminator) + +## Parses the topic to extract the conversation ID. +proc parseTopic*(topic: string): Result[string, ChatError] = + if not topic.startsWith(TopicPrefixPrivateV1): + return err(ChatError(code: errTopic, context: "Invalid topic prefix")) + + let id = topic.split('/')[^1] + if id == "": + return err(ChatError(code: errTopic, context: "Empty conversation ID")) + + return ok(id) proc calcMsgId(self: PrivateV1, msgBytes: seq[byte]): string = let s = fmt"{self.getConvoId()}|{msgBytes}" diff --git a/src/chat/delivery/waku_client.nim b/src/chat/delivery/waku_client.nim index 1a04d2c..2e871df 100644 --- a/src/chat/delivery/waku_client.nim +++ b/src/chat/delivery/waku_client.nim @@ -15,6 +15,7 @@ import waku_node, waku_enr, discovery/waku_discv5, + discovery/waku_dnsdisc, factory/builder, waku_filter_v2/client, ] @@ -71,8 +72,8 @@ type proc DefaultConfig*(): WakuConfig = let nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] - let clusterId = 19'u16 - let shardId = 0'u16 + let clusterId = 16'u16 + let shardId = 32'u16 var port: uint16 = 50000'u16 + uint16(rand(200)) result = WakuConfig(nodeKey: nodeKey, port: port, clusterId: clusterId, @@ -161,6 +162,17 @@ proc start*(client: WakuClient) {.async.} = client.node.peerManager.start() + let dnsDiscoveryUrl = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@boot.staging.status.nodes.status.im" + let nameServer = parseIpAddress("1.1.1.1") + let discoveredPeers = await retrieveDynamicBootstrapNodes(dnsDiscoveryUrl, @[nameServer]) + if discoveredPeers.isOk: + info "Connecting to discovered peers" + let remotePeers = discoveredPeers.get() + info "Discovered and connecting to peers", peerCount = remotePeers.len + asyncSpawn client.node.connectToNodes(remotePeers) + else: + warn "Failed to find peers via DNS discovery", error = discoveredPeers.error + let subscription: SubscriptionEvent = (kind: PubsubSub, topic: client.cfg.pubsubTopic) @@ -194,3 +206,6 @@ proc getConnectedPeerCount*(client: WakuClient): int = if peerInfo.connectedness == Connected: inc count return count + +proc stop*(client: WakuClient) {.async.} = + await client.node.stop() diff --git a/src/chat/errors.nim b/src/chat/errors.nim index f4751b0..c74b5b2 100644 --- a/src/chat/errors.nim +++ b/src/chat/errors.nim @@ -8,6 +8,7 @@ type ErrorCode* = enum errTypeError errWrapped + errTopic proc `$`*(x: ChatError): string = fmt"ChatError(code={$x.code}, context: {x.context})" diff --git a/src/chat/inbox.nim b/src/chat/inbox.nim index 14beed2..6b8f606 100644 --- a/src/chat/inbox.nim +++ b/src/chat/inbox.nim @@ -1,3 +1,5 @@ +import std/[strutils] + import chronicles, chronos, @@ -10,6 +12,7 @@ import conversation_store, crypto, delivery/waku_client, + errors, proto_types, types @@ -21,6 +24,8 @@ type pubkey: PublicKey inbox_addr: string +const + TopicPrefixInbox = "/inbox/" proc `$`*(conv: Inbox): string = fmt"Inbox: addr->{conv.inbox_addr}" @@ -56,7 +61,17 @@ proc conversation_id_for*(pubkey: PublicKey): string = # TODO derive this from instance of Inbox proc topic_inbox*(client_addr: string): string = - return "/inbox/" & client_addr + return TopicPrefixInbox & client_addr + +proc parseTopic*(topic: string): Result[string, ChatError] = + if not topic.startsWith(TopicPrefixInbox): + return err(ChatError(code: errTopic, context: "Invalid inbox topic prefix")) + + let id = topic.split('/')[^1] + if id == "": + return err(ChatError(code: errTopic, context: "Empty inbox id")) + + return ok(id) method id*(convo: Inbox): string = return conversation_id_for(convo.pubkey)