|
| 1 | +{.push raises: [Exception].} |
| 2 | + |
| 3 | +import tables, deques, strutils, os, streams |
| 4 | + |
| 5 | +import libp2p, chronos, cligen, chronicles |
| 6 | +from libp2p/protocols/pubsub/rpc/message import Message |
| 7 | + |
| 8 | +from illwave as iw import nil, `[]`, `[]=`, `==`, width, height |
| 9 | +from terminal import nil |
| 10 | + |
| 11 | +import ./ui/root |
| 12 | +import ./utils |
| 13 | +import ./file_exchange |
| 14 | + |
| 15 | +const |
| 16 | + KeyFile: string = "local.key" |
| 17 | + PeerIdFile: string = "local.peerid" |
| 18 | + MaxKeyLen: int = 4096 |
| 19 | + ListenPort: int = 9093 |
| 20 | + |
| 21 | +proc cleanup() {.noconv: (raises: []).} = |
| 22 | + try: |
| 23 | + iw.deinit() |
| 24 | + except: |
| 25 | + discard |
| 26 | + try: |
| 27 | + terminal.resetAttributes() |
| 28 | + terminal.showCursor() |
| 29 | + # Clear screen and move cursor to top-left |
| 30 | + stdout.write("\e[2J\e[H") # ANSI escape: clear screen & home |
| 31 | + stdout.flushFile() |
| 32 | + quit(130) # SIGINT conventional exit code |
| 33 | + except IOError as exc: |
| 34 | + echo "Unexpected error: " & exc.msg |
| 35 | + quit(1) |
| 36 | + |
| 37 | +proc readKeyFile( |
| 38 | + filename: string |
| 39 | +): PrivateKey {.raises: [OSError, IOError, ResultError[crypto.CryptoError]].} = |
| 40 | + let size = getFileSize(filename) |
| 41 | + |
| 42 | + if size == 0: |
| 43 | + raise newException(OSError, "Empty key file") |
| 44 | + |
| 45 | + var buf: seq[byte] |
| 46 | + buf.setLen(size) |
| 47 | + |
| 48 | + var fs = openFileStream(filename, fmRead) |
| 49 | + defer: |
| 50 | + fs.close() |
| 51 | + |
| 52 | + discard fs.readData(buf[0].addr, size.int) |
| 53 | + PrivateKey.init(buf).tryGet() |
| 54 | + |
| 55 | +proc writeKeyFile( |
| 56 | + filename: string, key: PrivateKey |
| 57 | +) {.raises: [OSError, IOError, ResultError[crypto.CryptoError]].} = |
| 58 | + var fs = openFileStream(filename, fmWrite) |
| 59 | + defer: |
| 60 | + fs.close() |
| 61 | + |
| 62 | + let buf = key.getBytes().tryGet() |
| 63 | + fs.writeData(buf[0].addr, buf.len) |
| 64 | + |
| 65 | +proc loadOrCreateKey(rng: var HmacDrbgContext): PrivateKey = |
| 66 | + if fileExists(KeyFile): |
| 67 | + try: |
| 68 | + return readKeyFile(KeyFile) |
| 69 | + except: |
| 70 | + discard # overwrite file |
| 71 | + try: |
| 72 | + let k = PrivateKey.random(rng).tryGet() |
| 73 | + writeKeyFile(KeyFile, k) |
| 74 | + k |
| 75 | + except: |
| 76 | + echo "Could not create new key" |
| 77 | + quit(1) |
| 78 | + |
| 79 | +proc start( |
| 80 | + addrs: Opt[MultiAddress], headless: bool, room: string, port: int |
| 81 | +) {.async: (raises: [CancelledError]).} = |
| 82 | + # Handle Ctrl+C |
| 83 | + setControlCHook(cleanup) |
| 84 | + |
| 85 | + var rng = newRng() |
| 86 | + |
| 87 | + let switch = |
| 88 | + try: |
| 89 | + SwitchBuilder |
| 90 | + .new() |
| 91 | + .withRng(rng) |
| 92 | + .withTcpTransport() |
| 93 | + .withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/" & $port).tryGet()]) |
| 94 | + .withYamux() |
| 95 | + .withNoise() |
| 96 | + .withPrivateKey(loadOrCreateKey(rng[])) |
| 97 | + .build() |
| 98 | + except LPError as exc: |
| 99 | + echo "Could not start switch: " & $exc.msg |
| 100 | + quit(1) |
| 101 | + except Exception as exc: |
| 102 | + echo "Could not start switch: " & $exc.msg |
| 103 | + quit(1) |
| 104 | + |
| 105 | + try: |
| 106 | + writeFile(PeerIdFile, $switch.peerInfo.peerId) |
| 107 | + except IOError as exc: |
| 108 | + error "Could not write PeerId to file", description = exc.msg |
| 109 | + |
| 110 | + let (gossip, fileExchange) = |
| 111 | + try: |
| 112 | + (GossipSub.init(switch = switch, triggerSelf = true), FileExchange.new()) |
| 113 | + except InitializationError as exc: |
| 114 | + echo "Could not initialize gossipsub: " & $exc.msg |
| 115 | + quit(1) |
| 116 | + |
| 117 | + try: |
| 118 | + switch.mount(gossip) |
| 119 | + switch.mount(fileExchange) |
| 120 | + await switch.start() |
| 121 | + except LPError as exc: |
| 122 | + echo "Could start switch: " & $exc.msg |
| 123 | + |
| 124 | + info "Started switch", peerId = $switch.peerInfo.peerId |
| 125 | + |
| 126 | + let |
| 127 | + recvQ = newAsyncQueue[string]() |
| 128 | + peerQ = newAsyncQueue[(PeerId, PeerEventKind)]() |
| 129 | + systemQ = newAsyncQueue[string]() |
| 130 | + |
| 131 | + # if --connect was specified, connect to peer |
| 132 | + if addrs.isSome(): |
| 133 | + try: |
| 134 | + discard await switch.connect(addrs.get()) |
| 135 | + except Exception as exc: |
| 136 | + error "Connection error", description = exc.msg |
| 137 | + |
| 138 | + # wait so that gossipsub can form mesh |
| 139 | + await sleepAsync(3.seconds) |
| 140 | + |
| 141 | + # topic handlers |
| 142 | + # chat and file handlers actually need to be validators instead of regular handlers |
| 143 | + # validators allow us to get information about which peer sent a message |
| 144 | + let onChatMsg = proc( |
| 145 | + topic: string, msg: Message |
| 146 | + ): Future[ValidationResult] {.async, gcsafe.} = |
| 147 | + let strMsg = cast[string](msg.data) |
| 148 | + await recvQ.put(shortPeerId(msg.fromPeer) & ": " & strMsg) |
| 149 | + await systemQ.put("Received message") |
| 150 | + await systemQ.put(" Source: " & $msg.fromPeer) |
| 151 | + await systemQ.put(" Topic: " & $topic) |
| 152 | + await systemQ.put(" Seqno: " & $seqnoToUint64(msg.seqno)) |
| 153 | + await systemQ.put(" ") # empty line |
| 154 | + return ValidationResult.Accept |
| 155 | + |
| 156 | + # when a new file is announced, download it |
| 157 | + let onNewFile = proc( |
| 158 | + topic: string, msg: Message |
| 159 | + ): Future[ValidationResult] {.async, gcsafe.} = |
| 160 | + let fileId = sanitizeFileId(cast[string](msg.data)) |
| 161 | + # this will only work if we're connected to `fromPeer` (since we don't have kad-dht) |
| 162 | + let conn = await switch.dial(msg.fromPeer, FileExchangeCodec) |
| 163 | + let filePath = getTempDir() / fileId |
| 164 | + let fileContents = await fileExchange.requestFile(conn, fileId) |
| 165 | + writeFile(filePath, fileContents) |
| 166 | + await conn.close() |
| 167 | + # Save file in /tmp/fileId |
| 168 | + await systemQ.put("Downloaded file to " & filePath) |
| 169 | + await systemQ.put(" ") # empty line |
| 170 | + return ValidationResult.Accept |
| 171 | + |
| 172 | + # when a new peer is announced |
| 173 | + let onNewPeer = proc(topic: string, data: seq[byte]) {.async, gcsafe.} = |
| 174 | + let peerId = PeerId.init(data).valueOr: |
| 175 | + error "Could not parse PeerId from data", data = $data |
| 176 | + return |
| 177 | + await peerQ.put((peerId, PeerEventKind.Joined)) |
| 178 | + |
| 179 | + # register validators and handlers |
| 180 | + |
| 181 | + # receive chat messages |
| 182 | + gossip.subscribe(room, nil) |
| 183 | + gossip.addValidator(room, onChatMsg) |
| 184 | + |
| 185 | + # receive files offerings |
| 186 | + gossip.subscribe(ChatFileTopic, nil) |
| 187 | + gossip.addValidator(ChatFileTopic, onNewFile) |
| 188 | + |
| 189 | + # receive newly connected peers through gossipsub |
| 190 | + gossip.subscribe(PeerDiscoveryTopic, onNewPeer) |
| 191 | + |
| 192 | + let onPeerJoined = proc( |
| 193 | + peer: PeerId, peerEvent: PeerEvent |
| 194 | + ) {.gcsafe, async: (raises: [CancelledError]).} = |
| 195 | + await peerQ.put((peer, PeerEventKind.Joined)) |
| 196 | + |
| 197 | + let onPeerLeft = proc( |
| 198 | + peer: PeerId, peerEvent: PeerEvent |
| 199 | + ) {.gcsafe, async: (raises: [CancelledError]).} = |
| 200 | + await peerQ.put((peer, PeerEventKind.Left)) |
| 201 | + |
| 202 | + # receive newly connected peers through direct connections |
| 203 | + switch.addPeerEventHandler(onPeerJoined, PeerEventKind.Joined) |
| 204 | + switch.addPeerEventHandler(onPeerLeft, PeerEventKind.Left) |
| 205 | + |
| 206 | + # add already connected peers |
| 207 | + for peerId in switch.peerStore[AddressBook].book.keys: |
| 208 | + await peerQ.put((peerId, PeerEventKind.Joined)) |
| 209 | + |
| 210 | + if headless: |
| 211 | + runForever() |
| 212 | + else: |
| 213 | + try: |
| 214 | + await runUI(gossip, room, recvQ, peerQ, systemQ, switch.peerInfo.peerId) |
| 215 | + except Exception as exc: |
| 216 | + error "Unexpected error", description = exc.msg |
| 217 | + finally: |
| 218 | + if switch != nil: |
| 219 | + await switch.stop() |
| 220 | + try: |
| 221 | + cleanup() |
| 222 | + except: |
| 223 | + discard |
| 224 | + |
| 225 | +proc cli(connect = "", room = ChatTopic, port = ListenPort, headless = false) = |
| 226 | + var addrs = Opt.none(MultiAddress) |
| 227 | + if connect.len > 0: |
| 228 | + addrs = Opt.some(MultiAddress.init(connect).get()) |
| 229 | + try: |
| 230 | + waitFor start(addrs, headless, room, port) |
| 231 | + except CancelledError: |
| 232 | + echo "Operation cancelled" |
| 233 | + |
| 234 | +when isMainModule: |
| 235 | + dispatch cli, |
| 236 | + help = { |
| 237 | + "connect": "full multiaddress (with /p2p/ peerId) of the node to connect to", |
| 238 | + "room": "Room name", |
| 239 | + "port": "TCP listen port", |
| 240 | + "headless": "No UI, can only receive messages", |
| 241 | + } |
0 commit comments