Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 107 additions & 23 deletions nim-peer/src/nim_peer.nim
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
{.push raises: [Exception].}

import tables, deques, strutils, os, streams
import std/sets

import libp2p, chronos, cligen, chronicles
from libp2p/protocols/pubsub/rpc/message import Message
import libp2p/protocols/kademlia
import libp2p/protocols/pubsub/rpc/message as pubsub_message
import libp2p/nameresolving/dnsresolver
import libp2p/nameresolving/nameresolver

from illwave as iw import nil, `[]`, `[]=`, `==`, width, height
from terminal import nil
Expand All @@ -17,6 +21,9 @@ const
PeerIdFile: string = "local.peerid"
MaxKeyLen: int = 4096
ListenPort: int = 9093
DiscoveryInterval = 10.seconds
KadBootstrapPeerAddrs =
["/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"]

proc cleanup() {.noconv: (raises: []).} =
try:
Expand Down Expand Up @@ -76,9 +83,80 @@ proc loadOrCreateKey(rng: var HmacDrbgContext): PrivateKey =
echo "Could not create new key"
quit(1)

proc roomToKadKey(room: string): Opt[Key] {.raises: [].} =
var roomBytes = newSeq[byte](room.len)
for i, ch in room:
roomBytes[i] = byte(ord(ch))
let digest = MultiHash.digest("sha2-256", roomBytes).valueOr:
error "Could not derive Kad-DHT key for room", room = room, description = error
return Opt.none(Key)
Opt.some(digest.toKey())

proc kadBootstrapNodes(
resolver: DnsResolver
): Future[seq[(PeerId, seq[MultiAddress])]] {.async.} =
const PeerIdTag = "/p2p/"

for addr in KadBootstrapPeerAddrs:
let tagPos = addr.rfind(PeerIdTag)
if tagPos < 0:
error "Missing /p2p/ segment", address = addr
continue

let peerIdStr = addr[tagPos + PeerIdTag.len .. ^1]
let peerId = PeerId.init(peerIdStr).valueOr:
error "Invalid peer id", peerId = peerIdStr
continue

let baseAddr = addr[0 ..< tagPos]

let maddr = MultiAddress.init(baseAddr).valueOr:
error "Invalid multiaddr", address = baseAddr
continue

result.add((peerId, @[maddr]))

proc discoverPeersWithKad(
switch: Switch, kad: KadDHT, room: string
) {.async: (raises: []).} =
let roomKey = roomToKadKey(room).valueOr:
error "Unable to convert room to kad key"
return

try:
while true:
# announce ourselves as a provider for this room and query for other providers
await kad.addProvider(roomKey)
var providers: HashSet[Provider]
try:
providers = await kad.getProviders(roomKey)
except LPStreamError as exc:
debug "Kad provider lookup stream error", description = exc.msg
await sleepAsync(DiscoveryInterval)
continue

for provider in providers.items:
let peerId = PeerId.init(provider.id).valueOr:
continue
if peerId == switch.peerInfo.peerId or switch.isConnected(peerId):
continue
if provider.addrs.len == 0:
continue

try:
await switch.connect(peerId, provider.addrs)
info "Kad Connected to peer via Kad-DHT", peerId = $peerId
except DialFailedError as exc:
debug "Failed to connect to discovered peer",
peerId = $peerId, description = exc.msg

await sleepAsync(DiscoveryInterval)
except CancelledError:
discard

proc start(
addrs: Opt[MultiAddress], headless: bool, room: string, port: int
) {.async: (raises: [CancelledError]).} =
) {.async.} =
# Handle Ctrl+C
setControlCHook(cleanup)

Expand All @@ -89,24 +167,26 @@ proc start(
type WriterStr = LogOutputStr

# Early (bootstrap) writer: mirror logs to stdout so nothing is dropped
defaultChroniclesStream.output.writer =
proc (lvl: LogLevel, rec: WriterStr) {.closure, gcsafe, raises: [].} =
let s = cast[string](rec)
try:
for line in s.splitLines():
stdout.writeLine(line)
stdout.flushFile()
except IOError:
discard

defaultChroniclesStream.output.writer = proc(
lvl: LogLevel, rec: WriterStr
) {.closure, gcsafe, raises: [].} =
let s = cast[string](rec)
try:
for line in s.splitLines():
stdout.writeLine(line)
stdout.flushFile()
except IOError:
discard

var rng = newRng()
let nameResolver = DnsResolver.new(@[initTAddress("1.1.1.1:53")])

let switch =
try:
SwitchBuilder
.new()
.withRng(rng)
.withNameResolver(nameResolver)
.withTcpTransport()
.withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/" & $port).tryGet()])
.withYamux()
Expand All @@ -131,8 +211,10 @@ proc start(
except InitializationError as exc:
echo "Could not initialize gossipsub: " & $exc.msg
quit(1)
let kad = KadDHT.new(switch, bootstrapNodes = await kadBootstrapNodes(nameResolver))

try:
switch.mount(kad)
switch.mount(gossip)
switch.mount(fileExchange)
await switch.start()
Expand All @@ -153,14 +235,13 @@ proc start(
except Exception as exc:
error "Connection error", description = exc.msg

# wait so that gossipsub can form mesh
await sleepAsync(3.seconds)
asyncSpawn discoverPeersWithKad(switch, kad, room)

# topic handlers
# chat and file handlers actually need to be validators instead of regular handlers
# validators allow us to get information about which peer sent a message
let onChatMsg = proc(
topic: string, msg: Message
topic: string, msg: pubsub_message.Message
): Future[ValidationResult] {.async, gcsafe.} =
let strMsg = cast[string](msg.data)
await recvQ.put(shortPeerId(msg.fromPeer) & ": " & strMsg)
Expand All @@ -173,10 +254,10 @@ proc start(

# when a new file is announced, download it
let onNewFile = proc(
topic: string, msg: Message
topic: string, msg: pubsub_message.Message
): Future[ValidationResult] {.async, gcsafe.} =
let fileId = sanitizeFileId(cast[string](msg.data))
# this will only work if we're connected to `fromPeer` (since we don't have kad-dht)
# File transfer still requires a direct stream to the announcing peer.
let conn = await switch.dial(msg.fromPeer, FileExchangeCodec)
let filePath = getTempDir() / fileId
let fileContents = await fileExchange.requestFile(conn, fileId)
Expand All @@ -201,8 +282,8 @@ proc start(
gossip.addValidator(room, onChatMsg)

# receive files offerings
gossip.subscribe(ChatFileTopic, nil)
gossip.addValidator(ChatFileTopic, onNewFile)
gossip.subscribe(FileChatTopic, nil)
gossip.addValidator(FileChatTopic, onNewFile)

# receive newly connected peers through gossipsub
gossip.subscribe(PeerDiscoveryTopic, onNewPeer)
Expand All @@ -222,7 +303,10 @@ proc start(
switch.addPeerEventHandler(onPeerLeft, PeerEventKind.Left)

# add already connected peers
for peerId in switch.peerStore[AddressBook].book.keys:
for peerId in switch.connectedPeers(Direction.Out):
await peerQ.put((peerId, PeerEventKind.Joined))

for peerId in switch.connectedPeers(Direction.In):
await peerQ.put((peerId, PeerEventKind.Joined))

if headless:
Expand All @@ -245,9 +329,9 @@ proc cli(connect = "", room = ChatTopic, port = ListenPort, headless = false) =
if connect.len > 0:
addrs = Opt.some(MultiAddress.init(connect).get())
try:
waitFor start(addrs, headless, room, port)
except CancelledError:
echo "Operation cancelled"
waitFor noCancel(start(addrs, headless, room, port))
except CatchableError as exc:
echo "Operation failed: " & exc.msg

when isMainModule:
dispatch cli,
Expand Down
2 changes: 1 addition & 1 deletion nim-peer/src/ui/root.nim
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ proc runUI*(
copyFile(path, getTempDir().joinPath(fileId))
# publish /tmp/{filename}
try:
discard await gossip.publish(ChatFileTopic, cast[seq[byte]](@(fileId)))
discard await gossip.publish(FileChatTopic, cast[seq[byte]](@(fileId)))
systemPanel.push("Offering file " & fileId)
except Exception as exc:
systemPanel.push("Unable to offer file: " & exc.msg)
Expand Down
4 changes: 2 additions & 2 deletions nim-peer/src/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import libp2p

const
ChatTopic*: string = "universal-connectivity"
ChatFileTopic*: string = "universal-connectivity-file"
PeerDiscoveryTopic*: string = "universal-connectivity-browser-peer-discovery"
FileChatTopic*: string = ChatTopic & "-file"
PeerDiscoveryTopic*: string = ChatTopic & "-browser-peer-discovery"

const SanitizationRules = [
({'\0' .. '\31'}, ' '), # Control chars -> space
Expand Down