|
| 1 | +{.push raises: [Defect].} |
| 2 | + |
| 3 | +## Collection of utilities commonly used |
| 4 | +## during the setup phase of a Waku v2 node |
| 5 | + |
| 6 | +import |
| 7 | + std/tables, |
| 8 | + chronos, |
| 9 | + chronicles, |
| 10 | + json_rpc/rpcserver, |
| 11 | + metrics, |
| 12 | + metrics/chronos_httpserver, |
| 13 | + stew/results, |
| 14 | + stew/shims/net, |
| 15 | + ./storage/sqlite, |
| 16 | + ./storage/migration/migration_types, |
| 17 | + ./jsonrpc/[admin_api, |
| 18 | + debug_api, |
| 19 | + filter_api, |
| 20 | + relay_api, |
| 21 | + store_api, |
| 22 | + private_api, |
| 23 | + debug_api], |
| 24 | + ./config, |
| 25 | + ./wakunode2 |
| 26 | + |
| 27 | +logScope: |
| 28 | + topics = "wakunode.setup" |
| 29 | + |
| 30 | +type |
| 31 | + SetupResult*[T] = Result[T, string] |
| 32 | + |
| 33 | +########################## |
| 34 | +# Setup helper functions # |
| 35 | +########################## |
| 36 | + |
| 37 | +proc startRpc*(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) |
| 38 | + {.raises: [Defect, RpcBindError, CatchableError].} = |
| 39 | + # @TODO: API handlers still raise CatchableError |
| 40 | + |
| 41 | + let |
| 42 | + ta = initTAddress(rpcIp, rpcPort) |
| 43 | + rpcServer = newRpcHttpServer([ta]) |
| 44 | + installDebugApiHandlers(node, rpcServer) |
| 45 | + |
| 46 | + # Install enabled API handlers: |
| 47 | + if conf.relay: |
| 48 | + let topicCache = newTable[string, seq[WakuMessage]]() |
| 49 | + installRelayApiHandlers(node, rpcServer, topicCache) |
| 50 | + if conf.rpcPrivate: |
| 51 | + # Private API access allows WakuRelay functionality that |
| 52 | + # is backwards compatible with Waku v1. |
| 53 | + installPrivateApiHandlers(node, rpcServer, node.rng, topicCache) |
| 54 | + |
| 55 | + if conf.filter: |
| 56 | + let messageCache = newTable[ContentTopic, seq[WakuMessage]]() |
| 57 | + installFilterApiHandlers(node, rpcServer, messageCache) |
| 58 | + |
| 59 | + if conf.store: |
| 60 | + installStoreApiHandlers(node, rpcServer) |
| 61 | + |
| 62 | + if conf.rpcAdmin: |
| 63 | + installAdminApiHandlers(node, rpcServer) |
| 64 | + |
| 65 | + rpcServer.start() |
| 66 | + info "RPC Server started", ta |
| 67 | + |
| 68 | +proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) = |
| 69 | + info "Starting metrics HTTP server", serverIp, serverPort |
| 70 | + |
| 71 | + try: |
| 72 | + startMetricsHttpServer($serverIp, serverPort) |
| 73 | + except Exception as e: |
| 74 | + raiseAssert("Exception while starting metrics HTTP server: " & e.msg) |
| 75 | + |
| 76 | + info "Metrics HTTP server started", serverIp, serverPort |
| 77 | + |
| 78 | +proc startMetricsLog*() = |
| 79 | + # https://github.com/nim-lang/Nim/issues/17369 |
| 80 | + var logMetrics: proc(udata: pointer) {.gcsafe, raises: [Defect].} |
| 81 | + logMetrics = proc(udata: pointer) = |
| 82 | + {.gcsafe.}: |
| 83 | + # TODO: libp2p_pubsub_peers is not public, so we need to make this either |
| 84 | + # public in libp2p or do our own peer counting after all. |
| 85 | + var |
| 86 | + totalMessages = 0.float64 |
| 87 | + |
| 88 | + for key in waku_node_messages.metrics.keys(): |
| 89 | + try: |
| 90 | + totalMessages = totalMessages + waku_node_messages.value(key) |
| 91 | + except KeyError: |
| 92 | + discard |
| 93 | + |
| 94 | + info "Node metrics", totalMessages |
| 95 | + discard setTimer(Moment.fromNow(2.seconds), logMetrics) |
| 96 | + discard setTimer(Moment.fromNow(2.seconds), logMetrics) |
| 97 | + |
| 98 | +proc runMigrations*(sqliteDatabase: SqliteDatabase, conf: WakuNodeConf) = |
| 99 | + # Run migration scripts on persistent storage |
| 100 | + |
| 101 | + var migrationPath: string |
| 102 | + if conf.persistPeers and conf.persistMessages: |
| 103 | + migrationPath = migration_types.ALL_STORE_MIGRATION_PATH |
| 104 | + elif conf.persistPeers: |
| 105 | + migrationPath = migration_types.PEER_STORE_MIGRATION_PATH |
| 106 | + elif conf.persistMessages: |
| 107 | + migrationPath = migration_types.MESSAGE_STORE_MIGRATION_PATH |
| 108 | + |
| 109 | + # run migration |
| 110 | + info "running migration ... " |
| 111 | + let migrationResult = sqliteDatabase.migrate(migrationPath) |
| 112 | + if migrationResult.isErr: |
| 113 | + warn "migration failed" |
| 114 | + else: |
| 115 | + info "migration is done" |
0 commit comments