diff --git a/README-I13N-POC.md b/README-I13N-POC.md new file mode 100644 index 0000000000..8a6a169e46 --- /dev/null +++ b/README-I13N-POC.md @@ -0,0 +1,543 @@ +This document describes the testing for the service incentivization PoC for Waku Lightpush. + +# Background + +Waku provides a suite of light protocols that allow edge nodes to access network services without operating as full Relay nodes. Specifically, the Lightpush protocol enables an edge node (client) to request a service node to publish a message to the Waku network on its behalf. To do this, the service node must possess an RLN membership. In essence, the Lightpush client is asking the service node to expend a portion of its limited resources. The objective of this PoC is to demonstrate an incentivized setup between a Lightpush edge node and a service node. + +# Functionality Overview + +This proof-of-concept introduces two additional modules: eligibility and reputation. + +## Eligibility Module + +The eligibility module allows a service node to determine whether an incoming Lightpush request is _eligible_ for fulfillment. A request is considered eligible if it includes a _proof of payment_ -- a transaction hash (txid) that corresponds to a transaction on Linea Sepolia. + +The PoC operates under the following assumptions: +- The edge node obtains, off-band, the on-chain address of the service node (i.e., the payment destination) and the expected payment amount; +- Payments are made in native ETH tokens (not contract-based tokens like ERC-20); +- Each request is individually paid for with a unique transaction. + +A Lightpush request is deemed _eligible_ if and only if: +- A proof of payment (txid) is attached to the request; +- The txid corresponds to a confirmed transaction on Linea Sepolia; +- The transaction transfers exactly the expected amount to the correct address; +- The transaction has not been used in any previous successful requests. + +## Reputation Module + +The reputation module enables edge nodes to avoid service nodes that deliver poor service. + +Reputation can take on one of three values: good, bad, or neutral. Initially, all peers are considered to have neutral reputation from the edge node's perspective. If an edge node sends an eligible request that is not fulfilled, the respective service node is marked with a "bad" reputation. Peers with bad reputation are excluded from future requests. If a request is successfully fulfilled, the edge node updates the service node's reputation to "good". + +Not all error responses affect the service node's reputation. If a request is rejected due to a missing or invalid proof of payment, the service node's reputation remains unchanged. Reputation is downgraded only in the event of a server-side error. + +Reputation functionality only applies to peers selected from the peer store (i.e., those connected via `--staticnode`). An edge node can choose a peer for Lightpush requests in one of two ways: select from the peer store, or use the peer assigned to the Lightpush service slot. If an edge node explicitly connects to a peer via `--lightpushnode`, that peer occupies the Lightpush service slot. Only one peer can occupy this slot at any given time. When a peer is in the service slot, all Lightpush requests are directed to it. In the testing scenarios below, we intentionally avoid using `--lightpushnode` to ensure we can evaluate the reputation-based peer selection logic. + +# Prerequisites + +The testing setup (described below) involves Edge Nodes and Service Nodes. An Edge Node sends messages via Lightpush using a Service Node. If the request is eligible, the Service Node uses its RLN membership to publish the Edge Node's message. + +> [!warning] +> As of 2025-07-28, registering new RLN memberships is temporarily unavailable. You can only reproduce the testing scenario if you already have an RLN membership registered with the contract `0xB9cd878C90E49F797B4431fBF4fb333108CB90e6` on Linea Sepolia. This guide will be updated when the issue is resolved. + +There are two tokens involved (both on Linea Sepolia): +- **TST (Testing Stable Token)**: a custom ERC-20 token on Linea Sepolia, required to register an RLN membership; +- **Linea Sepolia ETH**: native tokens used by the edge node to pay the service node. + +The payment and service relationships are illustrated in the following diagram: + +```mermaid +graph LR + A["Edge Node"] -- "3\. Pay in ETH" --> B["Service Node"] + B -- "1\. Deposit TST" --> C[RLN contract] + C -- "2\. RLN membership" --> B + B -- "4\. RLN-as-a-service" --> A +``` + +You have two options: +1. Reproduce the testing scenario as-is using existing confirmed proof-of-payment transactions; +2. Send your own transactions. + +Use the flowchart and table below to determine the prerequisites based on your testing scenario. + +| Goal | Required Components | Optional / Conditional Steps | +| ---------------------------------------------------------------------- | ------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------ | +| **Reproduce the scenario with existing transactions** | • Linea Sepolia RPC endpoint
• RLN membership | If you **don’t** have RLN membership:
• Get Linea Sepolia ETH
• Mint TST tokens
• Register RLN membership | +| **Reproduce the scenario with your own proof-of-payment transactions** | All of the above + Linea Sepolia ETH (for sending txs) | Get Linea Sepolia ETH:
• From faucet **or**
• By bridging from Ethereum Sepolia | + +```mermaid +graph TD + +A[Start] --> B[Get a Linea Sepolia RPC endpoint] +B --> C{Have RLN membership on **Linea Sepolia**?} + +C -- Yes --> D[Ready to test with existing transactions] +C -- No --> E[Get Linea Sepolia ETH from Faucet or Bridge] +E --> F[Register RLN membership] +F --> D + +D --> H{Want to send own transactions?} +H -- No --> I[Done] +H -- Yes --> J[Ensure you have Linea Sepolia ETH] +``` + +The next sub-sections provide more detailed instructions for each prerequisite. + +## Get a Linea Sepolia RPC Endpoint + +Refer to the [official list of node providers](https://docs.linea.build/get-started/tooling/node-providers) on the Linea website. + +A Linea Sepolia RPC endpoint is needed for two main purposes: +- to create an RLN membership and generate proofs (as before); +- to check eligibility proofs (functionality introduced in this PoC). + +For extensibility, the PoC uses separate configuration parameters for each of these purposes. You may use the same or different RPC endpoints for each. + +## Get Linea Sepolia ETH + +You can obtain Linea Sepolia ETH in several ways: +1. Request ETH directly from a faucet (see [list of faucets](https://docs.linea.build/get-started/how-to/get-testnet-eth)); +2. Bridge Ethereum Sepolia ETH to Linea Sepolia ETH (see [native bridge](https://linea.build/hub/bridge/native-bridge) — ensure "Show Test Networks" is enabled in the settings); +3. Ask a friend or colleague for Linea Sepolia ETH (or Ethereum Sepolia ETH, which you can then bridge as described above). + +## Register RLN Membership + +To publish a message, a valid RLN membership is required. The recommended approach is to use the `register_rln.sh` script from [`nwaku-compose`](https://github.com/waku-org/nwaku-compose). This script mints TST tokens (required for RLN deposit) and registers an RLN membership in a single step. If using `register_rln.sh`, there is no need to separately mint TST tokens. + +We recommend the following directory structure (you will need both `nwaku` and `nwaku-compose` repositories): + +``` +- nwaku-poc-testing + - nwaku-compose + - nwaku +``` + +> [!note] +> You will clone the `nwaku-compose` repository in addition to the `nwaku` repository. This setup uses `nwaku-compose` only for its RLN registration script. We **do not** run `nwaku` via `docker compose`, which is `nwaku-compose`’s primary function. Instead, after registering the RLN membership, `nwaku` is run directly from a source-built binary. + +Clone the `nwaku-compose` repository: + +``` +git clone git@github.com:waku-org/nwaku-compose.git +cd nwaku-compose +``` + +Copy the environment file template and open it for editing: + +``` +cp .env.example .env +nano .env +``` + +> [!note] +> You may use any text editor in place of `nano`. + +Edit the `.env` file with the following required parameters: + +| Parameter | Comment | +| ------------------------------ | -------------------------------------------------------------------------------------- | +| `RLN_RELAY_ETH_CLIENT_ADDRESS` | The Linea Sepolia RPC URL endpoint (no quotes). | +| `ETH_TESTNET_ACCOUNT` | The Linea Sepolia account for which the RLN membership will be registered (no quotes). | +| `ETH_TESTNET_KEY` | The private key for `ETH_TESTNET_ACCOUNT`, without the `0x` prefix (no quotes). | +| `RLN_RELAY_CRED_PASSWORD` | A password to protect your RLN membership (in double quotes). | + +> [!note] +> `ETH_TESTNET_KEY` must be the private key corresponding to `ETH_TESTNET_ACCOUNT`. + +> [!warning] +> Be careful not to expose private keys that secure real value (including on other networks). See [how to export your private key](https://support.metamask.io/configure/accounts/how-to-export-an-accounts-private-key/) from Metamask. + +Run the script that registers an RLN membership and stores the keys in the keystore: + +``` +./register_rln.sh +``` + +If successful, you will see output similar to the following: + +``` +INF 2025-07-25 10:11:32.243+00:00 credentials persisted topics="rln_keystore_generator" tid=1 file=rln_keystore_generator.nim:119 path=/keystore/keystore.json +``` + +Change the ownership of the keystore so it can be accessed later from the `nwaku` directory: + +``` +sudo chown -R $USER:$USER keystore +``` + +This keystore will be used in the upcoming testing steps. + +> [!note] +> From this point forward, `nwaku-compose` is no longer needed. All subsequent steps assume you are using the `wakunode2` binary built from source. + +Return to the outer directory: + +``` +cd ../ +``` + +## Build `nwaku` from Source + +To use the PoC, you must build `wakunode2` from source using the corresponding feature branch. + +Clone the repository and check out the [`feat/service-incentivization-poc`](https://github.com/waku-org/nwaku/tree/feat/service-incentivization-poc) feature branch: + +``` +git clone git@github.com:waku-org/nwaku.git +cd nwaku +git checkout feat/service-incentivization-poc +``` + +Build `wakunode2` from source (refer also to the [official build instructions](https://docs.waku.org/guides/nwaku/build-source)): + +``` +make update +make wakunode2 +``` + +> [!note] +> To speed up the build process, you can pass the `-j` parameter to use multiple CPU cores in parallel. For example, `make -j20 wakunode2` will use 20 cores. + +Verify that the binary was built successfully: + +``` +./build/wakunode2 --version +``` + +Expected output (values may vary — we only check that the binary exists and runs): + +``` +version / git commit hash: v0.35.1-167-g248757 +[Summary] 0 tests run (0.00s): 0 OK, 0 FAILED, 0 SKIPPED +``` + +# Experimental Setup Overview + +This section describes a local setup involving four `nwaku` nodes used to test the PoC. Each node runs on the same machine but using different ports. When issuing REST API commands, ensure you use the correct port corresponding to the targeted node. + +Each node is defined by a set of parameters, either as CLI arguments or via a TOML configuration file. The config files for all four nodes are located in the `./i13n-poc-configs/toml` directory. CLI arguments always override configuration parameters from the TOML files. + +The experimental setup includes the following nodes: +- **Alice** — an edge node that wants to publish messages without being connected to Relay. +- **Bob** — a service node that fulfills Alice's Lightpush request. +- **Charlie** — an alternative service node that fails to fulfill Alice's request. +- **Dave** — a Relay-connected node that Bob uses to relay Alice's message to the network. + +```mermaid +graph LR + Alice -- Lightpush --> Bob + Bob <-- Relay --> Dave + Alice -- Lightpush --> Charlie + Dave <-- Relay --> W((The Waku Network)) +``` + +For reproducibility, all nodes are launched with static (pre-generated) keys defined in their config files. These static keys are used in example commands and determine the node IDs. For details on configuring node keys, refer to the [key configuration guide](https://github.com/waku-org/nwaku/blob/master/docs/operators/how-to/configure-key.md). + +> [!note] +> In this testing setup, Bob and Charlie share on-chain credentials and the same RLN membership (i.e., the same keystore). + +> [!note] +> Nodes do not persist eligibility or reputation data between restarts. + +# Testing Scenario + +## Set environment variables + +Make a file called `envvars.env` in your project root (or home directory): + +``` +nano ./i13n-poc-configs/envvars.env +``` + +In the environment file, set the necessary environment variables (`RLN_RELAY_CRED_PASSWORD` is your RLN password). If you use another RPC provider, replace `ELIGIBILITY_ETH_CLIENT_ADDRESS` or `RLN_RELAY_ETH_CLIENT_ADDRESS` accordingly (provide your API key if necessary): +``` +export ELIGIBILITY_ETH_CLIENT_ADDRESS="https://rpc.sepolia.linea.build/" +export RLN_RELAY_ETH_CLIENT_ADDRESS="https://rpc.sepolia.linea.build/" +export RLN_RELAY_CRED_PATH="../nwaku-compose/keystore/keystore.json" +export RLN_RELAY_CRED_PASSWORD=RLN_RELAY_CRED_PASSWORD +``` + +> [!warning] +> If you have moved the keystore from `nwaku-compose` directory, change `RLN_RELAY_CRED_PATH` accordingly. + +## Launch nodes + +Make node-launching scripts executable: + +``` +chmod +x ./i13n-poc-configs/*.sh +``` + +Launch nodes in different terminal windows (**in this order** — important for proper connection establishment): + +``` +./i13n-poc-configs/run_charlie.sh +``` + +``` +./i13n-poc-configs/run_alice.sh +``` + +``` +./i13n-poc-configs/run_dave.sh +``` + +``` +./i13n-poc-configs/run_bob.sh +``` + +## Run the testing scenario + +To communicate with Waku nodes, use the REST API interface (see [REST API reference](https://waku-org.github.io/waku-rest-api/)). + +### Alice is only connected to Charlie + +Initially, Alice is connected only to Charlie. At this stage, we test negative scenarios where Alice's requests cannot be fulfilled. Alice will be connected to Bob later in the scenario. + +
+Alice sends ineligible requests, Charlie denies +Alice sends a series of ineligible requests—either without proof of payment or with invalid proof of payment. + +1. Charlie is selected as the service node, as it is the only peer with neutral reputation that Alice is aware of. +2. All ineligible requests are rejected. Alice receives error messages, and Charlie's reputation remains unchanged. + +> [!note] +> In all experiments, we explicitly use the pubsub topic `waku/2/rs/1/0`, corresponding to shard `0` on The Waku Network. The encoded form `%2Fwaku%2F2%2Frs%2F1%2F0` represents that topic. + +REST API request from Alice without proof of payment: + +``` +curl -X POST "http://127.0.0.1:8646/lightpush/v3/message" -H "accept: application/json" -H "Content-Type: application/json" -d '{ "pubsubTopic": "/waku/2/rs/1/0", "message": { "payload": "SGVsbG8gV29ybGQ=", "contentTopic": "/i13n-poc/1/chat/proto" } }' +``` + +Expected response: + +``` +{"statusDesc":"Eligibility proof is required"} +``` + +REST API request from Alice with a non-existent transaction as proof of payment: + +``` +curl -X POST "http://127.0.0.1:8646/lightpush/v3/message" -H "accept: application/json" -H "Content-Type: application/json" -d '{ "pubsubTopic": "/waku/2/rs/1/0", "message": { "payload": "SGVsbG8gV29ybGQ=", "contentTopic": "/i13n-poc/1/chat/proto" }, "eligibilityProof": "0x0000000000000000000000000000000000000000000000000000000000000000" }' +``` + +Expected response: + +``` +{"statusDesc":"Eligibility check failed: Failed to fetch tx or tx receipt"} +``` + +REST API request from Alice with a transaction using an incorrect amount (higher than expected): + +``` +curl -X POST "http://127.0.0.1:8646/lightpush/v3/message" -H "accept: application/json" -H "Content-Type: application/json" -d '{ "pubsubTopic": "/waku/2/rs/1/0", "message": { "payload": "SGVsbG8gV29ybGQ=", "contentTopic": "/i13n-poc/1/chat/proto" }, "eligibilityProof": "0x0a502f0a367f99b50e520afeb3843ee9e0f73fd0f01d671829c0c476d86859df" }' +``` + +Expected response: + +``` +{"statusDesc":"Eligibility check failed: Wrong tx value: got 2000000000, expected 1000000000"} +``` + +> [!note] +> The amount must match the expected value exactly and is counted in wei. Overpayment is also rejected in the current PoC implementation. + +REST API request from Alice with a transaction using an incorrect amount (lower than expected): + +``` +curl -X POST "http://127.0.0.1:8646/lightpush/v3/message" -H "accept: application/json" -H "Content-Type: application/json" -d '{ "pubsubTopic": "/waku/2/rs/1/0", "message": { "payload": "SGVsbG8gV29ybGQ=", "contentTopic": "/i13n-poc/1/chat/proto" }, "eligibilityProof": "0xa3c5da96b234518ae544c3449344cf4216587f400a529a836ce6131a82228363" }' +``` + +Expected response: + +``` +{"statusDesc":"Eligibility check failed: Wrong tx value: got 900000000, expected 1000000000"} +``` + +All failed responses described above must not impact Charlie's reputation from Alice's perspective. This can be verified by checking Charlie's reputation: + +```bash +curl -s -X GET "http://127.0.0.1:8646/admin/v1/peer/16Uiu2HAkyxHKziUQghTarGhBSFn8GcVapDgkJjMFTUVCCfEuyzSd" -H "accept: application/json" | jq | grep reputation +``` + +Expected response should show `"reputation": "Neutral"` indicating that Charlie's reputation remains unchanged. + +
+ +
+Alice sends an eligible request, Charlie fails to fulfill it +Alice now sends an eligible request. + +1. Charlie is again selected as the service node. +2. Charlie fails to fulfill the request because it is isolated. +3. Alice receives an error message and assigns Charlie a "bad" reputation. + +REST API request from Alice with a valid proof of payment: + +``` +curl -X POST "http://127.0.0.1:8646/lightpush/v3/message" -H "accept: application/json" -H "Content-Type: application/json" -d '{ "pubsubTopic": "/waku/2/rs/1/0", "message": { "payload": "SGVsbG8gV29ybGQ=", "contentTopic": "/i13n-poc/1/chat/proto" }, "eligibilityProof": "0x67932980dd5e66be76d4d096f3e176b2f1590cef3aa9981decb8f59a5c7e60e3" }' +``` + +Expected response: + +``` +{"statusDesc":"No peers for topic, skipping publish"} +``` + +Alice assigns Charlie a "bad" reputation due to a valid request not being served. This can be verified by checking Charlie's reputation: + +```bash +curl -s -X GET "http://127.0.0.1:8646/admin/v1/peer/16Uiu2HAkyxHKziUQghTarGhBSFn8GcVapDgkJjMFTUVCCfEuyzSd" -H "accept: application/json" | jq | grep reputation +``` + +Expected response should show `"reputation": "Bad"` indicating that Charlie has been assigned bad reputation. + +
+ +### Alice is connected to Bob and Charlie + +Next, Alice is additionally connected to Bob. + +
+Connect Alice to Bob + +Connect Alice to Bob using the REST API, without restarting: + +``` +curl -X POST "http://127.0.0.1:8646/admin/v1/peers" -H "accept: text/plain" -H "content-type: application/json" -d '["/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAmVHRbXuE4MUZbZ4xXF5CnVT5ntNGS3z7ER1fX1aLjxE95"]' +``` + +Verify that Alice is now connected to Bob: + +``` +curl -s -X GET "http://127.0.0.1:8646/admin/v1/peers/connected" | jq . | grep multiaddr +``` + +Expected response (showing both Bob’s and Charlie’s multiaddrs; `EXTERNAL_IP` is used in place of actual IP): + +``` + "multiaddr": "/ip4/EXTERNAL_IP/tcp/60000/p2p/16Uiu2HAmVHRbXuE4MUZbZ4xXF5CnVT5ntNGS3z7ER1fX1aLjxE95", + "multiaddr": "/ip4/EXTERNAL_IP/tcp/60003/p2p/16Uiu2HAkyxHKziUQghTarGhBSFn8GcVapDgkJjMFTUVCCfEuyzSd", +``` + +
+ +
+Alice sends an eligible request, Bob fulfills it +Alice sends an eligible request. + +1. Bob is selected as the service peer. Although Alice is aware of both Bob and Charlie, Charlie is excluded due to his bad reputation. +2. Bob successfully serves the request and returns a success message. +3. Alice assigns Bob a "good" reputation. + +``` +curl -X POST "http://127.0.0.1:8646/lightpush/v3/message" -H "accept: application/json" -H "Content-Type: application/json" -d '{ "pubsubTopic": "/waku/2/rs/1/0", "message": { "payload": "SGVsbG8gV29ybGQ=", "contentTopic": "/i13n-poc/1/chat/proto" }, "eligibilityProof": "0x67932980dd5e66be76d4d096f3e176b2f1590cef3aa9981decb8f59a5c7e60e3" }' +``` + +Expected response (indicates successful message relay): + +``` +{"relayPeerCount":1} +``` + +> [!note] +> If the response is `no suitable peers and no discovery method`, it is likely that Bob already has a bad reputation with Alice due to earlier failure. + +> [!note] +> Successful fulfillment only requires that Bob relays the message to one peer (Dave, in this scenario). Additional connections to The Waku Network are not required. + +Bob is selected as the service peer because Charlie has bad reputation and is excluded from consideration. Upon successful message delivery, Alice assigns Bob a "good" reputation. This can be verified by checking Bob's reputation: + +```bash +curl -s -X GET "http://127.0.0.1:8646/admin/v1/peer/16Uiu2HAmVHRbXuE4MUZbZ4xXF5CnVT5ntNGS3z7ER1fX1aLjxE95" -H "accept: application/json" | jq | grep reputation +``` + +Expected response should show `"reputation": "Good"` indicating that Bob has been assigned good reputation for successfully fulfilling the request. + +To verify that Alice's message reached Dave, query for the latest messages on shard `0`: + +``` +curl -s -X GET "http://127.0.0.1:8647/relay/v1/messages/%2Fwaku%2F2%2Frs%2F1%2F0" +``` + +Expected response (truncated example): + +``` +[{"payload":"SGVsbG8gV29ybGQ=","contentTopic":"/i13n-poc/1/chat/proto","version":0,"timestamp":1752158544577207808,"ephemeral":false, .... +``` +
+ +
+Alice attempts to double-spend, Bob denies +Alice attempts a double-spend by sending an ineligible request using a reused transaction hash (same as earlier). + +1. Bob is again selected as service peer. +2. Bob rejects the request and returns an appropriate error message. +3. Alice does not change Bob’s reputation. + +``` +curl -X POST "http://127.0.0.1:8646/lightpush/v3/message" -H "accept: application/json" -H "Content-Type: application/json" -d '{ "pubsubTopic": "/waku/2/rs/1/0", "message": { "payload": "SGVsbG8gV29ybGQ=", "contentTopic": "/i13n-poc/1/chat/proto" }, "eligibilityProof": "0x67932980dd5e66be76d4d096f3e176b2f1590cef3aa9981decb8f59a5c7e60e3" }' +``` + +Expected response: + +``` +{"statusDesc":"Eligibility check failed: TxHash 0x67932980dd5e66be76d4d096f3e176b2f1590cef3aa9981decb8f59a5c7e60e3 was already checked (double-spend attempt)"} +``` +
+ +End of testing scenario. + +# Appendix + +
+Eligibility parameters and txids + +Transactions have been confirmed on Linea Sepolia for testing purposes. + +Transaction IDs with correct amount (should succeed if the service node is connected to at least one other node): + +``` +0x67932980dd5e66be76d4d096f3e176b2f1590cef3aa9981decb8f59a5c7e60e3 +0x7dff359c2eda52945f278341d056049510110030ac9545448762b70490eb6260 +0x3c93f0e5f18667dce2dd99253152253a05bc42ff48140c21107c5d6a891d1a29 +0xb5b7230a2eacfb70238843feb26ace80f01500376eb7b976f4757b0f1429e5d0 +0x4bdfdc1019a6e8a0d098e59592f076d50b54d7a7e18f86a0f758eb8c6e9e96b7 +``` + +Transaction IDs to the expected address with wrong amount (must fail regardless of the service node's connection status and return the appropriate error): + +``` +0x0a502f0a367f99b50e520afeb3843ee9e0f73fd0f01d671829c0c476d86859df +0x0a502f0a367f99b50e520afeb3843ee9e0f73fd0f01d671829c0c476d86859df +``` + +Transaction ID to the wrong address with the correct amount (must fail): +``` +0x8a7548b4552dea4e6ef1a3d7b13a0ab9759b5be0ce3f6599d28d04c3aaa1fa1e +``` + +Transaction ID that doesn't correspond to a confirmed transaction (must fail): +``` +0x0000000000000000000000000000000000000000000000000000000000000000 +``` +
+ +
+Node keys and node IDs + +The following table contains, for the reference, node (private) keys and node IDs of all nodes of the testing setup. + +> [!warning] +> This table may be outdated. Config files take precedence. + +| Name | Protocols enabled | Node key | Node ID | Ports shift | TCP port | REST API port | +| ------- | ------------------------- | ------------------------------------------------------------------ | ------------------------------------------------------- | ----------- | -------- | ------------- | +| Alice | Lightpush (client) | `17950ef7510db19197ec0e3d34b41c0ed60bb7a0a619aa504eb6689c85ca9925` | `16Uiu2HAkwxC5Mcsh2DyZBq8CiKqnDkLUHWTuXCJas3TMPmRkynWz` | 1 | 60001 | 8646 | +| Bob | Relay, Lightpush (server) | `2bd3bbef1afa198fc614a254367de5ae285d799d7b1ba6d9d8543ba41038bbed` | `16Uiu2HAmVHRbXuE4MUZbZ4xXF5CnVT5ntNGS3z7ER1fX1aLjxE95` | 0 | 60000 | 8645 | +| Charlie | Relay | `fbfa8c3e38e7594500e9718b8c800e2d1a3ef5bc65ce041adf788d276035230f` | `16Uiu2HAkyxHKziUQghTarGhBSFn8GcVapDgkJjMFTUVCCfEuyzSd` | 3 | 60003 | 8648 | +| Dave | Relay | `166aee32c415fe796378ca0336671f4ec1fa26648857a86a237e509aaaeb1980` | `16Uiu2HAmSCUwvwDnXm7PyVbtKiQ5xzXb36wNw8YbGQxcBuxWTuU8` | 2 | 60002 | 8647 | + +
+ diff --git a/README.md b/README.md index ce352d6f57..b4671d9ca5 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,9 @@ The nwaku repository implements Waku, and provides tools related to it. - Examples of Waku usage. - Various tests of above. -For more details see the [source code](waku/README.md) +For more details see the [source code](waku/README.md). + +For information on Service Incentivization (i13n) PoC, see [README-I13N-POC.md](README-I13N-POC.md). ## How to Build & Run ( Linux, MacOS & WSL ) diff --git a/i13n-poc-configs/envvars.env b/i13n-poc-configs/envvars.env new file mode 100644 index 0000000000..60c0fef771 --- /dev/null +++ b/i13n-poc-configs/envvars.env @@ -0,0 +1,4 @@ +export ELIGIBILITY_ETH_CLIENT_ADDRESS="https://rpc.sepolia.linea.build/" +export RLN_RELAY_ETH_CLIENT_ADDRESS="https://rpc.sepolia.linea.build/" +export RLN_RELAY_CRED_PATH="../nwaku-compose/keystore/keystore.json" +export RLN_RELAY_CRED_PASSWORD="" diff --git a/i13n-poc-configs/run_alice.sh b/i13n-poc-configs/run_alice.sh new file mode 100755 index 0000000000..3a192e907a --- /dev/null +++ b/i13n-poc-configs/run_alice.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +# Load env vars +source ./i13n-poc-configs/envvars.env + +# Run nwaku +./build/wakunode2 \ + --config-file=./i13n-poc-configs/toml/alice.toml diff --git a/i13n-poc-configs/run_bob.sh b/i13n-poc-configs/run_bob.sh new file mode 100755 index 0000000000..f5ff4dcc89 --- /dev/null +++ b/i13n-poc-configs/run_bob.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Load env vars +source ./i13n-poc-configs/envvars.env + +# Run nwaku +./build/wakunode2 \ + --config-file=./i13n-poc-configs/toml/bob.toml \ + --rln-relay-eth-client-address=$RLN_RELAY_ETH_CLIENT_ADDRESS \ + --rln-relay-cred-path=$RLN_RELAY_CRED_PATH \ + --rln-relay-cred-password=$RLN_RELAY_CRED_PASSWORD \ + --eligibility-eth-client-address=$ELIGIBILITY_ETH_CLIENT_ADDRESS diff --git a/i13n-poc-configs/run_charlie.sh b/i13n-poc-configs/run_charlie.sh new file mode 100755 index 0000000000..4e356e04cd --- /dev/null +++ b/i13n-poc-configs/run_charlie.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Load env vars +source ./i13n-poc-configs/envvars.env + +# Run nwaku +./build/wakunode2 \ + --config-file=./i13n-poc-configs/toml/charlie.toml \ + --rln-relay-eth-client-address=$RLN_RELAY_ETH_CLIENT_ADDRESS \ + --rln-relay-cred-path=$RLN_RELAY_CRED_PATH \ + --rln-relay-cred-password=$RLN_RELAY_CRED_PASSWORD \ + --eligibility-eth-client-address=$ELIGIBILITY_ETH_CLIENT_ADDRESS diff --git a/i13n-poc-configs/run_dave.sh b/i13n-poc-configs/run_dave.sh new file mode 100755 index 0000000000..0654b81ef7 --- /dev/null +++ b/i13n-poc-configs/run_dave.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +# Load env vars +source ./i13n-poc-configs/envvars.env + +# Run nwaku +./build/wakunode2 \ + --config-file=./i13n-poc-configs/toml/dave.toml \ + --rln-relay-eth-client-address=$RLN_RELAY_ETH_CLIENT_ADDRESS diff --git a/i13n-poc-configs/toml/alice.toml b/i13n-poc-configs/toml/alice.toml new file mode 100644 index 0000000000..58b1f99c1f --- /dev/null +++ b/i13n-poc-configs/toml/alice.toml @@ -0,0 +1,41 @@ +# Incentivization PoC: Alice + +# Alice is a client node that wants to publish messages +# without being connected to Relay. + +# Disable Lightpush +lightpush = false + +# Use a pre-generated static node key for reproducibility +nodekey = "17950ef7510db19197ec0e3d34b41c0ed60bb7a0a619aa504eb6689c85ca9925" + +# Connect to Charlie only on launch +# (Alice will connect to Bob via REST API later) +staticnode = [ "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAkyxHKziUQghTarGhBSFn8GcVapDgkJjMFTUVCCfEuyzSd" ] + +# Connect to The Waku Network (TWN) +# (must come before we override relay and discovery to false) +preset = "twn" + +# Disable Relay and RLN-Relay +relay = false +rln-relay = false + +# Disable discovery methods to ensure node isolation +discv5-discovery = false +dns-discovery = false + +# Reputation-related parameters +reputation-enabled = true + +# Shift ports to avoid conflicts in local setups +ports-shift = 1 + +# Enable REST API including admin functions +rest = true +rest-admin = true +rest-address = "127.0.0.1" +rest-allow-origin = [ "127.0.0.1:*" ] + +# Log level +log-level = "DEBUG" diff --git a/i13n-poc-configs/toml/bob.toml b/i13n-poc-configs/toml/bob.toml new file mode 100644 index 0000000000..8ad0559375 --- /dev/null +++ b/i13n-poc-configs/toml/bob.toml @@ -0,0 +1,43 @@ +# Incentivization PoC: Bob + +# Bob is a service node with Lightpush and Relay enabled +# that successfully fulfils Alice's eligible requests. +# Bob also checks for double-spend attempts - +# when Alice submits duplicate txid as proof of payment. + +# Enable Lightpush +lightpush = true + +# Use a pre-generated static node key for reproducibility +nodekey = "2bd3bbef1afa198fc614a254367de5ae285d799d7b1ba6d9d8543ba41038bbed" + +# Connect to Dave +staticnode = [ "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmSCUwvwDnXm7PyVbtKiQ5xzXb36wNw8YbGQxcBuxWTuU8" ] + +# Enable Relay (required for eligibility module) +relay = true + +# Connect to The Waku Network (TWN) +preset = "twn" + +# Optional: Bob only connecting to Dave is OK for testing scenario +discv5-discovery = false +dns-discovery = false +nat = "upnp" + +# Eligibility-related parameters +eligibility-enabled = true +eligibility-receiver-address = "0x6298cc1831B6E5eDEDA5cC73bc75a040108358Bb" +eligibility-payment-amount-wei = 1000000000 + +# RLN-related parameters +rln-relay-tree-path = "~/.waku/rln-tree-db" + +# Enable REST API including admin functions +rest = true +rest-admin = true +rest-address = "127.0.0.1" +rest-allow-origin = [ "127.0.0.1:*" ] + +# Log level +log-level = "DEBUG" diff --git a/i13n-poc-configs/toml/charlie.toml b/i13n-poc-configs/toml/charlie.toml new file mode 100644 index 0000000000..f997c3091a --- /dev/null +++ b/i13n-poc-configs/toml/charlie.toml @@ -0,0 +1,43 @@ +# Incentivization PoC: Charlie + +# Charlie is an isolated node with Relay and Lightpush enabled. +# Charlie fails to fulfill Alice's request due to lack of peers. +# However, Charlie implements eligibility-related checks, +# and rejects Alice's ineligible requests before trying to fulfil them. + +# Enable Lightpush +lightpush = true + +# Use a pre-generated static node key for reproducibility +nodekey = "fbfa8c3e38e7594500e9718b8c800e2d1a3ef5bc65ce041adf788d276035230f" + +# Enable Relay (required for eligibility module) +relay = true + +# Connect to The Waku Network (TWN) +preset = "twn" + +# Disable discovery methods to ensure node isolation +discv5-discovery = false +dns-discovery = false +nat = "upnp" + +# Eligibility-related parameters +eligibility-enabled = true +eligibility-receiver-address = "0x6298cc1831B6E5eDEDA5cC73bc75a040108358Bb" +eligibility-payment-amount-wei = 1000000000 + +# Shift ports to avoid conflicts in local setups +ports-shift = 3 + +# RLN-related parameters +rln-relay-tree-path = "~/.waku/rln-tree-db-3" + +# Enable REST API including admin functions +rest = true +rest-admin = true +rest-address = "127.0.0.1" +rest-allow-origin = [ "127.0.0.1:*" ] + +# Log level +log-level = "DEBUG" diff --git a/i13n-poc-configs/toml/dave.toml b/i13n-poc-configs/toml/dave.toml new file mode 100644 index 0000000000..168b534fc8 --- /dev/null +++ b/i13n-poc-configs/toml/dave.toml @@ -0,0 +1,32 @@ +# Incentivization PoC: Dave + +# Dave is a Relay node that Bob uses to publishing Alice's message. +# Dave is unaware of any elibibiligy- or reputation-related features. + +# Enable Lightpush +lightpush = true + +# Use a pre-generated static node key for reproducibility +nodekey = "166aee32c415fe796378ca0336671f4ec1fa26648857a86a237e509aaaeb1980" + +# Enable Relay (required for eligibility module) +relay = true + +# Connect to The Waku Network (TWN) +preset = "twn" +nat = "upnp" + +# Shift ports to avoid conflicts in local setups +ports-shift = 2 + +# RLN-related parameters +rln-relay-tree-path = "~/.waku/rln-tree-db-2" + +# Enable REST API including admin functions +rest = true +rest-admin = true +rest-address = "127.0.0.1" +rest-allow-origin = [ "127.0.0.1:*" ] + +# Log level +log-level = "DEBUG" diff --git a/tests/incentivization/test_poc_eligibility.nim b/tests/incentivization/test_poc_eligibility.nim index be90188982..be4c2f0e87 100644 --- a/tests/incentivization/test_poc_eligibility.nim +++ b/tests/incentivization/test_poc_eligibility.nim @@ -130,7 +130,8 @@ suite "Waku Incentivization PoC Eligibility Proofs": var manager {.threadvar.}: EligibilityManager asyncSetup: - manager = await EligibilityManager.init(EthClient) + # Setup manager with expected receiver and amount + manager = await EligibilityManager.init(EthClient, Address.fromHex(receiverExpected.toHex()), TxValueExpectedWei) ( txHashWrongReceiverRightAmount, txHashRightReceiverWrongAmount, @@ -147,7 +148,7 @@ suite "Waku Incentivization PoC Eligibility Proofs": let eligibilityProof = EligibilityProof(proofOfPayment: some(@(TxHashNonExisting.bytes()))) let isEligible = await manager.isEligibleTxId( - eligibilityProof, receiverExpected, TxValueExpectedWei + eligibilityProof ) check: isEligible.isErr() @@ -158,7 +159,7 @@ suite "Waku Incentivization PoC Eligibility Proofs": let eligibilityProof = EligibilityProof(proofOfPayment: some(@(txHashContractCreation.bytes()))) let isEligible = await manager.isEligibleTxId( - eligibilityProof, receiverExpected, TxValueExpectedWei + eligibilityProof ) check: isEligible.isErr() @@ -170,7 +171,7 @@ suite "Waku Incentivization PoC Eligibility Proofs": let eligibilityProof = EligibilityProof(proofOfPayment: some(@(txHashContractCall.bytes()))) let isEligible = await manager.isEligibleTxId( - eligibilityProof, receiverExpected, TxValueExpectedWei + eligibilityProof ) check: isEligible.isErr() @@ -181,7 +182,7 @@ suite "Waku Incentivization PoC Eligibility Proofs": let eligibilityProof = EligibilityProof(proofOfPayment: some(@(txHashRightReceiverRightAmount.bytes()))) let isEligible = await manager.isEligibleTxId( - eligibilityProof, receiverExpected, TxValueExpectedWei + eligibilityProof ) assert isEligible.isOk(), isEligible.error @@ -193,11 +194,11 @@ suite "Waku Incentivization PoC Eligibility Proofs": EligibilityProof(proofOfPayment: some(@(txHashRightReceiverRightAmount.bytes()))) let isEligibleOnce = await manager.isEligibleTxId( - eligibilityProof, receiverExpected, TxValueExpectedWei + eligibilityProof ) let isEligibleTwice = await manager.isEligibleTxId( - eligibilityProof, receiverExpected, TxValueExpectedWei + eligibilityProof ) assert isEligibleOnce.isOk() diff --git a/tests/incentivization/test_poc_reputation.nim b/tests/incentivization/test_poc_reputation.nim index 0547b9744f..08e0b5ca08 100644 --- a/tests/incentivization/test_poc_reputation.nim +++ b/tests/incentivization/test_poc_reputation.nim @@ -1,43 +1,111 @@ -import std/options, testutils/unittests, chronos, web3 +import + std/options, + testutils/unittests, + chronos, + web3, + stew/byteutils, + stint, + strutils, + tests/testlib/testasync, + libp2p/[peerid, crypto/crypto] -import waku/incentivization/reputation_manager, waku/waku_lightpush_legacy/rpc +import + waku/[node/peer_manager, waku_core], + waku/incentivization/[rpc, reputation_manager], + waku/waku_lightpush/[rpc, common] suite "Waku Incentivization PoC Reputation": var manager {.threadvar.}: ReputationManager + var peerId1 {.threadvar.}: PeerId setup: manager = ReputationManager.init() + peerId1 = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() test "incentivization PoC: reputation: reputation table is empty after initialization": check manager.reputationOf.len == 0 test "incentivization PoC: reputation: set and get reputation": - manager.setReputation("peer1", some(true)) # Encodes GoodRep - check manager.getReputation("peer1") == some(true) + manager.setReputation(peerId1, some(true)) # Encodes GoodRep + check manager.getReputation(peerId1) == some(true) - test "incentivization PoC: reputation: evaluate PushResponse valid": - let validLightpushResponse = - PushResponse(isSuccess: true, info: some("Everything is OK")) + test "incentivization PoC: reputation: evaluate LightPushResponse valid": + let validLightLightPushResponse = + LightPushResponse(requestId: "", statusCode: LightPushSuccessCode.SUCCESS) # We expect evaluateResponse to return GoodResponse if isSuccess is true - check evaluateResponse(validLightpushResponse) == GoodResponse + check evaluateResponse(validLightLightPushResponse) == GoodResponse - test "incentivization PoC: reputation: evaluate PushResponse invalid": - let invalidLightpushResponse = PushResponse(isSuccess: false, info: none(string)) - check evaluateResponse(invalidLightpushResponse) == BadResponse + test "incentivization PoC: reputation: evaluate LightPushResponse invalid": + let invalidLightLightPushResponse = LightPushResponse( + requestId: "", statusCode: LightPushErrorCode.SERVICE_NOT_AVAILABLE + ) + check evaluateResponse(invalidLightLightPushResponse) == BadResponse + + test "incentivization PoC: reputation: evaluate LightPushResponse neutral - payment required": + let neutralLightPushResponse = LightPushResponse( + requestId: "", statusCode: LightPushErrorCode.PAYMENT_REQUIRED + ) + check evaluateResponse(neutralLightPushResponse) == NeutralResponse + + test "incentivization PoC: reputation: evaluate LightPushResponse bad - no peers": + let badLightPushResponse = LightPushResponse( + requestId: "", statusCode: LightPushErrorCode.NO_PEERS_TO_RELAY + ) + check evaluateResponse(badLightPushResponse) == BadResponse test "incentivization PoC: reputation: updateReputationFromResponse valid": - let peerId = "peerWithValidResponse" - let validResp = PushResponse(isSuccess: true, info: some("All good")) - manager.updateReputationFromResponse(peerId, validResp) - check manager.getReputation(peerId) == some(true) + let validResp = + LightPushResponse(requestId: "", statusCode: LightPushSuccessCode.SUCCESS) + manager.updateReputationFromResponse(peerId1, validResp) + check manager.getReputation(peerId1) == some(true) test "incentivization PoC: reputation: updateReputationFromResponse invalid": - let peerId = "peerWithInvalidResponse" - let invalidResp = PushResponse(isSuccess: false, info: none(string)) - manager.updateReputationFromResponse(peerId, invalidResp) - check manager.getReputation(peerId) == some(false) + let invalidResp = LightPushResponse( + requestId: "", statusCode: LightPushErrorCode.SERVICE_NOT_AVAILABLE + ) + manager.updateReputationFromResponse(peerId1, invalidResp) + check manager.getReputation(peerId1) == some(false) + + test "incentivization PoC: reputation: updateReputationFromResponse neutral - no change": + # First set a good reputation + manager.setReputation(peerId1, some(true)) + check manager.getReputation(peerId1) == some(true) + + # Send a neutral response (payment required) + let neutralResp = LightPushResponse( + requestId: "", statusCode: LightPushErrorCode.PAYMENT_REQUIRED + ) + manager.updateReputationFromResponse(peerId1, neutralResp) + + # Reputation should remain unchanged + check manager.getReputation(peerId1) == some(true) + + test "incentivization PoC: reputation: updateReputationFromResponse neutral - no change from bad": + # First set a bad reputation + manager.setReputation(peerId1, some(false)) + check manager.getReputation(peerId1) == some(false) + + # Send a neutral response (payment required) + let neutralResp = LightPushResponse( + requestId: "", statusCode: LightPushErrorCode.PAYMENT_REQUIRED + ) + manager.updateReputationFromResponse(peerId1, neutralResp) + + # Reputation should remain unchanged + check manager.getReputation(peerId1) == some(false) + + test "incentivization PoC: reputation: updateReputationFromResponse neutral - no change from none": + # Start with no reputation set + check manager.getReputation(peerId1) == none(bool) + + # Send a neutral response (payment required) + let neutralResp = LightPushResponse( + requestId: "", statusCode: LightPushErrorCode.PAYMENT_REQUIRED + ) + manager.updateReputationFromResponse(peerId1, neutralResp) + + # Reputation should remain none + check manager.getReputation(peerId1) == none(bool) test "incentivization PoC: reputation: default is None": - let unknownPeerId = "unknown_peer" - # The peer is not in the table yet - check manager.getReputation(unknownPeerId) == none(bool) + check manager.getReputation(peerId1) == none(bool) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 889e397ccd..56ea17fc10 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -28,6 +28,7 @@ import waku_store/common, waku_peer_exchange, waku_metadata, + incentivization/reputation_manager, ], ./testlib/common, ./testlib/testutils, @@ -917,39 +918,57 @@ procSuite "Peer Manager": # Add a peer[0] to the peerstore pm.switch.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs pm.switch.peerStore[ProtoBook][peers[0].peerId] = - @[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec] + @[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec, WakuLightPushCodec] # When no service peers, we get one from the peerstore - let selectedPeer1 = pm.selectPeer(WakuStoreCodec) + let selectedPeerWakuStore = pm.selectPeer(WakuStoreCodec) check: - selectedPeer1.isSome() == true - selectedPeer1.get().peerId == peers[0].peerId + selectedPeerWakuStore.isSome() == true + selectedPeerWakuStore.get().peerId == peers[0].peerId # Same for other protocol - let selectedPeer2 = pm.selectPeer(WakuFilterSubscribeCodec) + let selectedPeerWakuFilter = pm.selectPeer(WakuFilterSubscribeCodec) check: - selectedPeer2.isSome() == true - selectedPeer2.get().peerId == peers[0].peerId + selectedPeerWakuFilter.isSome() == true + selectedPeerWakuFilter.get().peerId == peers[0].peerId # And return none if we dont have any peer for that protocol - let selectedPeer3 = pm.selectPeer(WakuLegacyLightPushCodec) + let selectedPeerWakuLegacyLightpush = pm.selectPeer(WakuLegacyLightPushCodec) check: - selectedPeer3.isSome() == false - - # Now we add service peers for different protocols peer[1..3] - pm.addServicePeer(peers[1], WakuStoreCodec) - pm.addServicePeer(peers[2], WakuLegacyLightPushCodec) + selectedPeerWakuLegacyLightpush.isSome() == false + # Reputation: if no reputation is set (neutral-rep), return a peer + let selectedPeerWakuLightpush = pm.selectPeer(WakuLightPushCodec) + check: + selectedPeerWakuLightpush.isSome() == true + + # Reputation: avoid negative-reputation peers + if pm.reputationManager.isSome(): + var rm = pm.reputationManager.get() + # assign negative reputation to the peer + rm.setReputation(selectedPeerWakuLightpush.get().peerId, some(false)) + # the peer is not selected because of negative reputation + check: + pm.selectPeer(WakuLightPushCodec).isNone() + # revert reputation to neutral + rm.setReputation(selectedPeerWakuLightpush.get().peerId, none(bool)) + # the peer is selected again + check: + pm.selectPeer(WakuLightPushCodec).isSome() + + # Now we add service peers for different protocols # We no longer get one from the peerstore. Slots are being used instead. - let selectedPeer4 = pm.selectPeer(WakuStoreCodec) + pm.addServicePeer(peers[1], WakuStoreCodec) + let selectedPeerWakuStoreSlotted = pm.selectPeer(WakuStoreCodec) check: - selectedPeer4.isSome() == true - selectedPeer4.get().peerId == peers[1].peerId + selectedPeerWakuStoreSlotted.isSome() == true + selectedPeerWakuStoreSlotted.get().peerId == peers[1].peerId - let selectedPeer5 = pm.selectPeer(WakuLegacyLightPushCodec) + pm.addServicePeer(peers[2], WakuLegacyLightPushCodec) + let selectedPeerWakuLegacyLightpushSlotted = pm.selectPeer(WakuLegacyLightPushCodec) check: - selectedPeer5.isSome() == true - selectedPeer5.get().peerId == peers[2].peerId + selectedPeerWakuLegacyLightpushSlotted.isSome() == true + selectedPeerWakuLegacyLightpushSlotted.get().peerId == peers[2].peerId test "peer manager cant have more max connections than peerstore size": # Peerstore size can't be smaller than max connections diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index 7bd44a311e..160e5d060f 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -9,12 +9,14 @@ import waku/waku_lightpush, waku/waku_lightpush/[client, common], waku/common/rate_limit/setting, - ../testlib/[common, wakucore] + ../testlib/[common, wakucore], + waku/incentivization/reputation_manager proc newTestWakuLightpushNode*( switch: Switch, handler: PushMessageHandler, rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), + eligibilityEnabled: bool = false, ): Future[WakuLightPush] {.async.} = let peerManager = PeerManager.new(switch) @@ -28,6 +30,8 @@ proc newTestWakuLightpushNode*( return proto -proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient = - let peerManager = PeerManager.new(switch) +proc newTestWakuLightpushClient*( + switch: Switch, reputationEnabled: bool = false +): WakuLightPushClient = + let peerManager = PeerManager.new(switch, reputationEnabled = reputationEnabled) WakuLightPushClient.new(peerManager, rng) diff --git a/tests/waku_lightpush/test_client.nim b/tests/waku_lightpush/test_client.nim index af22ffa5d5..10098109a2 100644 --- a/tests/waku_lightpush/test_client.nim +++ b/tests/waku_lightpush/test_client.nim @@ -22,19 +22,30 @@ import suite "Waku Lightpush Client": var handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] + handlerFutureFailsLightpush {.threadvar.}: Future[(PubsubTopic, WakuMessage)] + handler {.threadvar.}: PushMessageHandler + handlerFailsLightpush {.threadvar.}: PushMessageHandler serverSwitch {.threadvar.}: Switch + serverSwitchFailsLightpush {.threadvar.}: Switch clientSwitch {.threadvar.}: Switch + server {.threadvar.}: WakuLightPush + serverFailsLightpush {.threadvar.}: WakuLightPush client {.threadvar.}: WakuLightPushClient serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + serverRemotePeerInfoFailsLightpush {.threadvar.}: RemotePeerInfo + clientPeerId {.threadvar.}: PeerId pubsubTopic {.threadvar.}: PubsubTopic contentTopic {.threadvar.}: ContentTopic message {.threadvar.}: WakuMessage + # Use reputation manager (inside the peer manager) for Lightpush Client test instanse + const reputationEnabled = true + asyncSetup: handlerFuture = newPushHandlerFuture() handler = proc( @@ -49,21 +60,41 @@ suite "Waku Lightpush Client": # return that we published the message to 1 peer. return ok(1) + # A Lightpush server that fails + handlerFutureFailsLightpush = newPushHandlerFuture() + handlerFailsLightpush = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult] {.async.} = + handlerFutureFailsLightpush.complete((pubsubTopic, message)) + return lighpushErrorResult( + SERVICE_NOT_AVAILABLE, "a bad test server failed to push a lightpush message" + ) + serverSwitch = newTestSwitch() + serverSwitchFailsLightpush = newTestSwitch() clientSwitch = newTestSwitch() + server = await newTestWakuLightpushNode(serverSwitch, handler) - client = newTestWakuLightpushClient(clientSwitch) + serverFailsLightpush = + await newTestWakuLightpushNode(serverSwitchFailsLightpush, handlerFailsLightpush) + client = newTestWakuLightpushClient(clientSwitch, reputationEnabled) - await allFutures(serverSwitch.start(), clientSwitch.start()) + await allFutures( + serverSwitch.start(), serverSwitchFailsLightpush.start(), clientSwitch.start() + ) serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + serverRemotePeerInfoFailsLightpush = + serverSwitchFailsLightpush.peerInfo.toRemotePeerInfo() clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic contentTopic = DefaultContentTopic message = fakeWakuMessage() asyncTeardown: - await allFutures(clientSwitch.stop(), serverSwitch.stop()) + await allFutures( + clientSwitch.stop(), serverSwitch.stop(), serverSwitchFailsLightpush.stop() + ) suite "Verification of PushRequest Payload": asyncTest "Valid Payload Types": @@ -370,4 +401,25 @@ suite "Waku Lightpush Client": # Then the response is negative check not publishResponse.isOk() - check publishResponse.error.code == LightPushErrorCode.NO_PEERS_TO_RELAY + check publishResponse.error.code == LightpushStatusCode.NO_PEERS_TO_RELAY + + asyncTest "Positive Publish To Any": + # add a peer that supports the Lightpush protocol to the client's PeerManager + client.peerManager.addPeer(serverRemotePeerInfo) # supports Lightpush + + # When sending a valid PushRequest using publishToAny + let publishResponse = await client.publishToAny(pubsubTopic, message) + + # Then the response is positive + check publishResponse.isOk() + + asyncTest "Negative Publish To Any": + # add a peer that does not support the Lightpush protocol to the client's PeerManager + client.peerManager.addPeer(serverRemotePeerInfoFailsLightpush) + # does not support Lightpush + + # When sending a PushRequest using publishToAny to the only peer that doesn't support Lightpush + let publishResponse = await client.publishToAny(pubsubTopic, message) + + # Then the response is negative + check not publishResponse.isOk() diff --git a/tests/waku_lightpush/test_server_i13n.nim b/tests/waku_lightpush/test_server_i13n.nim new file mode 100644 index 0000000000..7bd40894ea --- /dev/null +++ b/tests/waku_lightpush/test_server_i13n.nim @@ -0,0 +1,89 @@ +{.used.} + +import + std/[options, strscans], + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto + +import + waku/[ + node/peer_manager, + waku_core, + waku_lightpush, + waku_lightpush/client, + waku_lightpush/protocol_metrics, + ], + ../testlib/[assertions, wakucore, testasync, futures, testutils], + ./lightpush_utils, + ../resources/[pubsub_topics, content_topics, payloads] + +suite "Lightpush Server Incentivization Test": + var + serverSwitch {.threadvar.}: Switch + clientSwitch {.threadvar.}: Switch + server {.threadvar.}: WakuLightPush + client {.threadvar.}: WakuLightPushClient + serverPeerId {.threadvar.}: RemotePeerInfo + handlerFuture {.threadvar.}: Future[(string, WakuMessage)] + tokenPeriod {.threadvar.}: Duration + waitInBetweenFor {.threadvar.}: Duration + firstWaitExtend {.threadvar.}: Duration + + asyncSetup: + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + handlerFuture = newFuture[(string, WakuMessage)]() + let handler: PushMessageHandler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return lightpushSuccessResult(1) + + tokenPeriod = 500.millis + server = await newTestWakuLightpushNode( + serverSwitch, handler, some((3, tokenPeriod)), eligibilityEnabled = true + ) + client = newTestWakuLightpushClient(clientSwitch) + serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo() + + waitInBetweenFor = 20.millis + firstWaitExtend = 300.millis + + asyncTeardown: + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "message with valid eligibility proof is published": + let sendMsgProc = proc(): Future[void] {.async.} = + let message = fakeWakuMessage() + + handlerFuture = newFuture[(string, WakuMessage)]() + let requestRes = + await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId) + + check await handlerFuture.withTimeout(50.millis) + + check: + requestRes.isOk() + handlerFuture.finished() + + let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read() + + check: + handledMessagePubsubTopic == DefaultPubsubTopic + handledMessage == message + + for runCnt in 0 ..< 3: + let startTime = Moment.now() + for testCnt in 0 ..< 3: + await sendMsgProc() + await sleepAsync(waitInBetweenFor) + + let endTime = Moment.now() + let elapsed = (endTime - startTime) + await sleepAsync(tokenPeriod - elapsed + firstWaitExtend) + firstWaitExtend = 100.millis diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index 772cfbffdf..df84b0ba51 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -46,6 +46,12 @@ type # Rate limit configs for non-relay req-resp protocols rateLimitSettings: Option[ProtocolRateLimitSettings] + # Eligibility enabled + eligibilityEnabled: bool + + # Reputation enabled + reputationEnabled: bool + WakuNodeBuilderResult* = Result[void, string] ## Init @@ -115,6 +121,8 @@ proc withPeerManagerConfig*( maxConnections: int, relayServiceRatio: string, shardAware = false, + eligibilityEnabled = false, + reputationEnabled = false, ) = let (relayRatio, serviceRatio) = parseRelayServiceRatio(relayServiceRatio).get() var relayPeers = int(ceil(float(maxConnections) * relayRatio)) @@ -123,6 +131,8 @@ proc withPeerManagerConfig*( builder.maxServicePeers = servicePeers builder.maxRelayPeers = relayPeers builder.shardAware = shardAware + builder.eligibilityEnabled = eligibilityEnabled + builder.reputationEnabled = reputationEnabled proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) = builder.colocationLimit = colocationLimit @@ -209,6 +219,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = maxServicePeers = some(builder.maxServicePeers), colocationLimit = builder.colocationLimit, shardedPeerManagement = builder.shardAware, + eligibilityEnabled = builder.eligibilityEnabled, + reputationEnabled = builder.reputationEnabled ) var node: WakuNode diff --git a/waku/factory/conf_builder/eligibility_conf_builder.nim b/waku/factory/conf_builder/eligibility_conf_builder.nim new file mode 100644 index 0000000000..a552606ee0 --- /dev/null +++ b/waku/factory/conf_builder/eligibility_conf_builder.nim @@ -0,0 +1,61 @@ +import chronicles, std/options, results +import ../waku_conf +import eth/common as eth_common + +logScope: + topics = "waku conf builder eligibility" + +type EligibilityConfBuilder* = object + enabled*: Option[bool] + receiverAddress*: Option[string] + paymentAmountWei*: Option[uint32] + ethClientUrls*: Option[seq[string]] + +proc init*(T: type EligibilityConfBuilder): EligibilityConfBuilder = + EligibilityConfBuilder() + +proc withEnabled*(b: var EligibilityConfBuilder, enabled: bool) = + b.enabled = some(enabled) + +proc withReceiverAddress*(b: var EligibilityConfBuilder, receiverAddress: string) = + b.receiverAddress = some(receiverAddress) + +proc withPaymentAmountWei*(b: var EligibilityConfBuilder, amount: uint32) = + b.paymentAmountWei = some(amount) + +proc withEthClientUrls*(b: var EligibilityConfBuilder, urls: seq[string]) = + b.ethClientUrls = some(urls) + +proc build*(b: EligibilityConfBuilder): Result[Option[EligibilityConf], string] = + if not b.enabled.get(false): + debug "eligibility: EligibilityConf not enabled" + return ok(none(EligibilityConf)) + + # Validation + if b.receiverAddress.isNone() or b.paymentAmountWei.isNone(): + debug "eligibility: EligibilityConf validation failed - missing address or amount" + return err("Eligibility: receiver address and payment amount must be specified") + + # FIXME: add validation check that receiver address is validly formed + + if b.paymentAmountWei.get() == 0: + debug "eligibility: EligibilityConf validation failed - payment amount is zero" + return err("Eligibility: payment amount must be above zero") + + # FIXME: how to reuse Eth RPC URL from RLN (?) config? + let urls = b.ethClientUrls.get(@[]) + if urls.len == 0: + debug "eligibility: EligibilityConf validation failed - no eth rpc urls" + return err("Eligibility: eligibility-eth-client-address is not specified") + + debug "eligibility: EligibilityConf created" + return ok( + some( + EligibilityConf( + enabled: true, + receiverAddress: b.receiverAddress.get(), + paymentAmountWei: b.paymentAmountWei.get(), + ethClientUrls: urls, + ) + ) + ) diff --git a/waku/factory/conf_builder/reputation_conf_builder.nim b/waku/factory/conf_builder/reputation_conf_builder.nim new file mode 100644 index 0000000000..f01a6a0833 --- /dev/null +++ b/waku/factory/conf_builder/reputation_conf_builder.nim @@ -0,0 +1,28 @@ +import chronicles, std/options, results +import ../waku_conf + +logScope: + topics = "waku conf builder reputation" + +type ReputationConfBuilder* = object + enabled*: Option[bool] + +proc init*(T: type ReputationConfBuilder): ReputationConfBuilder = + ReputationConfBuilder() + +proc withEnabled*(b: var ReputationConfBuilder, enabled: bool) = + b.enabled = some(enabled) + +proc build*(b: ReputationConfBuilder): Result[Option[ReputationConf], string] = + if not b.enabled.get(false): + debug "reputation: ReputationConf not enabled" + return ok(none(ReputationConf)) + + debug "reputation: ReputationConf created" + return ok( + some( + ReputationConf( + enabled: true + ) + ) + ) \ No newline at end of file diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index 32631e1d79..619ba1441c 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -23,8 +23,10 @@ import ./discv5_conf_builder, ./web_socket_conf_builder, ./metrics_server_conf_builder, + ./rln_relay_conf_builder, ./rate_limit_conf_builder, - ./rln_relay_conf_builder + ./eligibility_conf_builder, + ./reputation_conf_builder logScope: topics = "waku conf builder" @@ -75,6 +77,8 @@ type WakuConfBuilder* = object rlnRelayConf*: RlnRelayConfBuilder storeServiceConf*: StoreServiceConfBuilder webSocketConf*: WebSocketConfBuilder + eligibilityConf*: EligibilityConfBuilder + reputationConf*: ReputationConfBuilder rateLimitConf*: RateLimitConfBuilder # End conf builders relay: Option[bool] @@ -134,6 +138,8 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder = rlnRelayConf: RlnRelayConfBuilder.init(), storeServiceConf: StoreServiceConfBuilder.init(), webSocketConf: WebSocketConfBuilder.init(), + eligibilityConf: EligibilityConfBuilder.init(), + reputationConf: ReputationConfBuilder.init(), rateLimitConf: RateLimitConfBuilder.init(), ) @@ -488,6 +494,12 @@ proc build*( let webSocketConf = builder.webSocketConf.build().valueOr: return err("WebSocket Conf building failed: " & $error) + let eligibilityConf = builder.eligibilityConf.build().valueOr: + return err("Eligibility Conf building failed: " & $error) + + let reputationConf = builder.reputationConf.build().valueOr: + return err("Reputation Conf building failed: " & $error) + let rateLimit = builder.rateLimitConf.build().valueOr: return err("Rate limits Conf building failed: " & $error) @@ -606,6 +618,9 @@ proc build*( metricsServerConf: metricsServerConf, restServerConf: restServerConf, dnsDiscoveryConf: dnsDiscoveryConf, + eligibilityConf: eligibilityConf, + reputationConf: reputationConf, + rateLimit: rateLimit, # end confs nodeKey: nodeKey, clusterId: clusterId, @@ -644,7 +659,6 @@ proc build*( colocationLimit: colocationLimit, maxRelayPeers: builder.maxRelayPeers, relayServiceRatio: builder.relayServiceRatio.get("60:40"), - rateLimit: rateLimit, circuitRelayClient: builder.circuitRelayClient.get(false), staticNodes: builder.staticNodes, relayShardedPeerManagement: relayShardedPeerManagement, diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 43b37b01a6..70fd1ebae6 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -29,7 +29,9 @@ import ../node/peer_manager, ../waku_core/topics/pubsub_topic, ../../tools/rln_keystore_generator/rln_keystore_generator, - ../../tools/rln_db_inspector/rln_db_inspector + ../../tools/rln_db_inspector/rln_db_inspector, + ./conf_builder/eligibility_conf_builder, + ./conf_builder/reputation_conf_builder include ../waku_core/message/default_values @@ -262,8 +264,8 @@ type WakuNodeConf* = object isRelayClient* {. desc: """Set the node as a relay-client. -Set it to true for nodes that run behind a NAT or firewall and -hence would have reachability issues.""", + Set it to true for nodes that run behind a NAT or firewall and + hence would have reachability issues.""", defaultValue: false, name: "relay-client" .}: bool @@ -457,11 +459,46 @@ hence would have reachability issues.""", name: "lightpushnode" .}: string + ## Reputation config + reputationEnabled* {. + desc: "Enable client-side reputation for light protocols: true|false", + defaultValue: false, + name: "reputation-enabled" + .}: bool + + ## Eligibility config + ## Is eligibility check enabled or not + eligibilityEnabled* {. + desc: "Enable server-side eligibility (proof-of-payment) check for light protocols: true|false", + defaultValue: false, + name: "eligibility-enabled" + .}: bool + + ## The expected blockchain address receiving payments for eligibility + eligibilityReceiverAddress* {. + desc: "Blockchain address receiving payment for eligibility from light protocol clients", + defaultValue: "", + name: "eligibility-receiver-address" + .}: string + + ## The expected amount for eligibility payments + eligibilityPaymentAmountWei* {. + desc: "The expected payment amount from light protocol clients", + defaultValue: 0, + name: "eligibility-payment-amount-wei" + .}: uint32 + + eligibilityEthRpcUrl* {. + desc: "HTTP address of an Ethereum client for eligibility checks. Argument may be repeated.", + defaultValue: @[EthRpcUrl("http://localhost:8540/")], + name: "eligibility-eth-client-address" + .}: seq[EthRpcUrl] + ## Reliability config reliabilityEnabled* {. desc: """Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests. -with the drawback of consuming some more bandwidth.""", + with the drawback of consuming some more bandwidth.""", defaultValue: false, name: "reliability" .}: bool @@ -1018,4 +1055,16 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.rateLimitConf.withRateLimits(n.rateLimits) + # Setup eligibility configuration + b.eligibilityConf.withEnabled(n.eligibilityEnabled) + if n.eligibilityReceiverAddress != "": + b.eligibilityConf.withReceiverAddress(n.eligibilityReceiverAddress) + if n.eligibilityPaymentAmountWei != 0: + b.eligibilityConf.withPaymentAmountWei(n.eligibilityPaymentAmountWei) + if n.eligibilityEthRpcUrl.len > 0: + b.eligibilityConf.withEthClientUrls(n.eligibilityEthRpcUrl.mapIt(string(it))) + + # Setup reputation configuration + b.reputationConf.withEnabled(n.reputationEnabled) + return b.build() diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index f49d1f11a2..eda385cc71 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -37,7 +37,8 @@ import ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../waku_lightpush_legacy/common, ../common/rate_limit/setting, - ../common/databases/dburl + ../common/databases/dburl, + ../incentivization/eligibility_manager ## Peer persistence @@ -102,26 +103,39 @@ proc initNode( ) builder.withColocationLimit(conf.colocationLimit) - if conf.maxRelayPeers.isSome(): - let - maxRelayPeers = conf.maxRelayPeers.get() - maxConnections = conf.maxConnections - # Calculate the ratio as percentages - relayRatio = (maxRelayPeers.float / maxConnections.float) * 100 - serviceRatio = 100 - relayRatio - - builder.withPeerManagerConfig( - maxConnections = conf.maxConnections, - relayServiceRatio = $relayRatio & ":" & $serviceRatio, - shardAware = conf.relayShardedPeerManagement, - ) - error "maxRelayPeers is deprecated. It is recommended to use relayServiceRatio instead. If relayServiceRatio is not set, it will be automatically calculated based on maxConnections and maxRelayPeers." - else: - builder.withPeerManagerConfig( - maxConnections = conf.maxConnections, - relayServiceRatio = conf.relayServiceRatio, - shardAware = conf.relayShardedPeerManagement, - ) + let eligibilityEnabled = + if conf.eligibilityConf.isSome(): + conf.eligibilityConf.get().enabled + else: + false + + let reputationEnabled = + if conf.reputationConf.isSome(): + conf.reputationConf.get().enabled + else: + false + + let relayServiceRatio = + if conf.maxRelayPeers.isSome(): + let + maxRelayPeers = conf.maxRelayPeers.get() + maxConnections = conf.maxConnections + # Calculate the ratio as percentages + relayRatio = (maxRelayPeers.float / maxConnections.float) * 100 + serviceRatio = 100 - relayRatio + + error "maxRelayPeers is deprecated. It is recommended to use relayServiceRatio instead. If relayServiceRatio is not set, it will be automatically calculated based on maxConnections and maxRelayPeers." + $relayRatio & ":" & $serviceRatio + else: + conf.relayServiceRatio + + builder.withPeerManagerConfig( + maxConnections = conf.maxConnections, + relayServiceRatio = relayServiceRatio, + shardAware = conf.relayShardedPeerManagement, + eligibilityEnabled = eligibilityEnabled, + reputationEnabled = reputationEnabled, + ) builder.withRateLimit(conf.rateLimit) builder.withCircuitRelay(relay) @@ -421,6 +435,27 @@ proc setupProtocols( return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) + # initialize eligibility manager if eligibility is enabled + if conf.eligibilityConf.isSome() and conf.eligibilityConf.get().enabled: + # Check that RLN Relay and Lightpush are mounted + if node.wakuRlnRelay.isNil or node.wakuLightPush.isNil: + return err("Eligibility manager requires both RLN Relay and Lightpush protocols to be mounted") + let eligibilityConf = conf.eligibilityConf.get() + let ethUrl = eligibilityConf.ethClientUrls[0] + let expectedToAddress = Address.fromHex(eligibilityConf.receiverAddress) + let expectedValueWei = eligibilityConf.paymentAmountWei.u256 + try: + debug "initializing eligibilityManager..." + let manager = await EligibilityManager.init( + ethUrl, + expectedToAddress, + expectedValueWei + ) + node.peerManager.eligibilityManager = some(manager) + debug "i13n: eligibilityManager initialized with values:", ethUrl=ethUrl, expectedToAddress=expectedToAddress, expectedValueWei=expectedValueWei + except CatchableError: + return err("failed to initialize eligibility manager: " & getCurrentExceptionMsg()) + return ok() ## Start node diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 4a05049063..c56e3184c7 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -67,6 +67,15 @@ type EndpointConf* = object # TODO: make enum extMultiAddrs*: seq[MultiAddress] extMultiAddrsOnly*: bool +type EligibilityConf* = object + enabled*: bool + receiverAddress*: string + paymentAmountWei*: uint32 + ethClientUrls*: seq[string] + +type ReputationConf* = object + enabled*: bool + ## `WakuConf` is a valid configuration for a Waku node ## All information needed by a waku node should be contained ## In this object. A convenient `validate` method enables doing @@ -100,6 +109,8 @@ type WakuConf* {.requiresInit.} = ref object restServerConf*: Option[RestServerConf] metricsServerConf*: Option[MetricsServerConf] webSocketConf*: Option[WebSocketConf] + eligibilityConf*: Option[EligibilityConf] + reputationConf*: Option[ReputationConf] portsShift*: uint16 dnsAddrsNameServers*: seq[IpAddress] @@ -138,35 +149,39 @@ type WakuConf* {.requiresInit.} = ref object p2pReliability*: bool -proc logConf*(conf: WakuConf) = +proc logConf*(wakuConf: WakuConf) = info "Configuration: Enabled protocols", - relay = conf.relay, - rlnRelay = conf.rlnRelayConf.isSome(), - store = conf.storeServiceConf.isSome(), - filter = conf.filterServiceConf.isSome(), - lightPush = conf.lightPush, - peerExchange = conf.peerExchange + relay = wakuConf.relay, + rlnRelay = wakuConf.rlnRelayConf.isSome(), + store = wakuConf.storeServiceConf.isSome(), + filter = wakuConf.filterServiceConf.isSome(), + lightPush = wakuConf.lightPush, + peerExchange = wakuConf.peerExchange - info "Configuration. Network", cluster = conf.clusterId + info "Configuration. Network", cluster = wakuConf.clusterId - for shard in conf.subscribeShards: + for shard in wakuConf.subscribeShards: info "Configuration. Active Relay Shards", shard = shard - if conf.discv5Conf.isSome(): - for i in conf.discv5Conf.get().bootstrapNodes: + if wakuConf.discv5Conf.isSome(): + for i in wakuConf.discv5Conf.get().bootstrapNodes: info "Configuration. Bootstrap nodes", node = i.string - if conf.rlnRelayConf.isSome(): - var rlnRelayConf = conf.rlnRelayConf.get() + if wakuConf.rlnRelayConf.isSome(): + var rlnRelayConf = wakuConf.rlnRelayConf.get() if rlnRelayConf.dynamic: info "Configuration. Validation", mechanism = "onchain rln", contract = rlnRelayConf.ethContractAddress.string, - maxMessageSize = conf.maxMessageSizeBytes, + maxMessageSize = wakuConf.maxMessageSizeBytes, rlnEpochSizeSec = rlnRelayConf.epochSizeSec, rlnRelayUserMessageLimit = rlnRelayConf.userMessageLimit, ethClientUrls = rlnRelayConf.ethClientUrls + if wakuConf.eligibilityConf.isSome(): + let ec = wakuConf.eligibilityConf.get() + debug "eligibility: EligibilityConf created", enabled = ec.enabled, receiverAddress = $ec.receiverAddress, paymentAmountWei = ec.paymentAmountWei, ethClientUrls = ec.ethClientUrls + proc validateNodeKey(wakuConf: WakuConf): Result[void, string] = wakuConf.nodeKey.getPublicKey().isOkOr: return err("nodekey param is invalid") @@ -228,4 +243,17 @@ proc validate*(wakuConf: WakuConf): Result[void, string] = ?wakuConf.validateNodeKey() ?wakuConf.shardingConf.validateShards(wakuConf.subscribeShards) ?wakuConf.validateNoEmptyStrings() + + if wakuConf.eligibilityConf.isSome(): + let ec = wakuConf.eligibilityConf.get() + debug "eligibility: EligibilityConf validation start" + if ec.enabled: + if not wakuConf.rlnRelayConf.isSome(): + debug "eligibility: EligibilityConf validation failed - RLN relay not enabled" + return err("eligibility: RLN relay must be enabled if eligibility is enabled") + if not wakuConf.lightPush: + debug "eligibility: EligibilityConf validation failed - Lightpush not enabled" + return err("eligibility: Lightpush must be enabled if eligibility is enabled") + debug "eligibility: EligibilityConf validation successful" + return ok() diff --git a/waku/incentivization/common.nim b/waku/incentivization/common.nim index 071b4c18f2..533836e426 100644 --- a/waku/incentivization/common.nim +++ b/waku/incentivization/common.nim @@ -2,8 +2,12 @@ import std/options import waku/incentivization/rpc +type EligibilityStatusCode* = enum + SUCCESS = uint32(200) + PAYMENT_REQUIRED = uint32(402) + proc init*(T: type EligibilityStatus, isEligible: bool): T = if isEligible: - EligibilityStatus(statusCode: uint32(200), statusDesc: some("OK")) + EligibilityStatus(statusCode: uint32(EligibilityStatusCode.SUCCESS), statusDesc: some("OK")) else: - EligibilityStatus(statusCode: uint32(402), statusDesc: some("Payment Required")) + EligibilityStatus(statusCode: uint32(EligibilityStatusCode.PAYMENT_REQUIRED), statusDesc: some("Payment Required")) diff --git a/waku/incentivization/eligibility_manager.nim b/waku/incentivization/eligibility_manager.nim index b10b293e18..c1f660cbb6 100644 --- a/waku/incentivization/eligibility_manager.nim +++ b/waku/incentivization/eligibility_manager.nim @@ -8,13 +8,19 @@ const TxReceiptQueryTimeout = 3.seconds type EligibilityManager* = ref object # FIXME: make web3 private? web3*: Web3 seenTxIds*: HashSet[TxHash] + expectedToAddress*: Address + expectedValueWei*: UInt256 -# Initialize the eligibilityManager with a web3 instance +# Initialize the eligibilityManager with a web3 instance and expected params proc init*( - T: type EligibilityManager, ethClient: string + T: type EligibilityManager, ethClient: string, expectedToAddress: Address, expectedValueWei: UInt256 ): Future[EligibilityManager] {.async.} = - return - EligibilityManager(web3: await newWeb3(ethClient), seenTxIds: initHashSet[TxHash]()) + return EligibilityManager( + web3: await newWeb3(ethClient), + seenTxIds: initHashSet[TxHash](), + expectedToAddress: expectedToAddress, + expectedValueWei: expectedValueWei + ) # TODO: handle error if web3 instance is not established # Clean up the web3 instance @@ -49,9 +55,7 @@ proc getTxAndTxReceipt( proc isEligibleTxId*( eligibilityManager: EligibilityManager, - eligibilityProof: EligibilityProof, - expectedToAddress: Address, - expectedValueWei: UInt256, + eligibilityProof: EligibilityProof ): Future[Result[void, string]] {.async.} = ## We consider a tx eligible, ## in the context of service incentivization PoC, @@ -64,7 +68,6 @@ proc isEligibleTxId*( let txHash = TxHash.fromHex(byteutils.toHex(eligibilityProof.proofOfPayment.get())) # check that it is not a double-spend let txHashWasSeen = (txHash in eligibilityManager.seenTxIds) - eligibilityManager.seenTxIds.incl(txHash) if txHashWasSeen: return err("TxHash " & $txHash & " was already checked (double-spend attempt)") try: @@ -89,10 +92,15 @@ proc isEligibleTxId*( return err("A contract call tx is not eligible") # check that the to address is "as expected" let toAddress = toAddressOption.get() - if toAddress != expectedToAddress: + if toAddress != eligibilityManager.expectedToAddress: return err("Wrong destination address: " & $toAddress) # check that the amount is "as expected" let txValueWei = tx.value - if txValueWei != expectedValueWei: - return err("Wrong tx value: got " & $txValueWei & ", expected " & $expectedValueWei) + if txValueWei != eligibilityManager.expectedValueWei: + return err("Wrong tx value: got " & $txValueWei & ", expected " & $eligibilityManager.expectedValueWei) return ok() + +proc markTxIdSeen*(eligibilityManager: EligibilityManager, txIdBytes: seq[uint8]) = + ## Converts bytes to hex, then to TxHash, and adds to seenTxIds set + let txHash = TxHash.fromHex(byteutils.toHex(txIdBytes)) + eligibilityManager.seenTxIds.incl(txHash) diff --git a/waku/incentivization/reputation_manager.nim b/waku/incentivization/reputation_manager.nim index 3177c0fdf8..7d0ceca097 100644 --- a/waku/incentivization/reputation_manager.nim +++ b/waku/incentivization/reputation_manager.nim @@ -1,11 +1,22 @@ -import tables, std/options -import ../waku_lightpush_legacy/rpc +import tables, std/options, chronicles +import ../waku_lightpush/[rpc, common] +import libp2p/peerid -type - PeerId = string +const BadLightPushErrorCodes* = [ + LightPushErrorCode.INTERNAL_SERVER_ERROR, + LightPushErrorCode.SERVICE_NOT_AVAILABLE, + LightPushErrorCode.OUT_OF_RLN_PROOF, + LightPushErrorCode.NO_PEERS_TO_RELAY +] +# Note: if Bob's RLN proof is based on an outdated root, he will return INVALID_MESSAGE (code 420). +# This is arguably incorrect, as the issue is on Bob's side, not Alice's. +# See issue: https://github.com/waku-org/nwaku/issues/3531 +# We do not include INVALID_MESSAGE in BadLightPushErrorCodes, as it is a client-side error. +type ResponseQuality* = enum BadResponse + NeutralResponse GoodResponse # Encode reputation indicator as Option[bool]: @@ -24,25 +35,51 @@ proc setReputation*( manager.reputationOf[peer] = repValue proc getReputation*(manager: ReputationManager, peer: PeerId): Option[bool] = - if peer in manager.reputationOf: - result = manager.reputationOf[peer] - else: + try: + if peer in manager.reputationOf: + result = manager.reputationOf[peer] + else: + result = none(bool) + except KeyError: result = none(bool) -# Evaluate the quality of a PushResponse by checking its isSuccess field -proc evaluateResponse*(response: PushResponse): ResponseQuality = - if response.isSuccess: +### Lightpush-specific functionality ### + +# Evaluate the quality of a LightPushResponse by checking its status code +proc evaluateResponse*(response: LightPushResponse): ResponseQuality = + if response.isSuccess(): return GoodResponse - else: + elif response.statusCode in BadLightPushErrorCodes: return BadResponse + else: + return NeutralResponse -# Update reputation of the peer based on the quality of the response +# Update reputation of the peer based on LightPushResponse quality proc updateReputationFromResponse*( - manager: var ReputationManager, peer: PeerId, response: PushResponse + manager: var ReputationManager, peer: PeerId, response: LightPushResponse ) = let respQuality = evaluateResponse(response) case respQuality of BadResponse: + debug "Assign bad reputation for peer", peer = peer manager.setReputation(peer, some(false)) # false => BadRep of GoodResponse: + debug "Assign good reputation for peer", peer = peer manager.setReputation(peer, some(true)) # true => GoodRep + of NeutralResponse: + debug "Neutral response - reputation unchanged for peer", peer = peer + # Don't change reputation for neutral responses + +### Reputation conversion utilities ### + +proc convertReputationToString*(reputation: Option[bool]): Option[string] = + ## Converts reputation from Option[bool] to Option[string] + ## some(true) -> some("Good") + ## some(false) -> some("Bad") + ## none(bool) -> some("Neutral") + if reputation.isNone(): + return some("Neutral") + elif reputation.get(): + return some("Good") + else: + return some("Bad") diff --git a/waku/incentivization/rpc.nim b/waku/incentivization/rpc.nim index 5223f5b5b3..cfcbe24555 100644 --- a/waku/incentivization/rpc.nim +++ b/waku/incentivization/rpc.nim @@ -1,4 +1,9 @@ -import std/options +import + std/[options, strutils], + stew/byteutils, + json_serialization, + json_serialization/std/options, + ../waku_api/rest/serdes # Implementing the RFC: # https://github.com/vacp2p/rfc/tree/master/content/docs/rfcs/73 @@ -10,3 +15,26 @@ type EligibilityStatus* = object statusCode*: uint32 statusDesc*: Option[string] + +proc writeValue*( + writer: var JsonWriter[RestJson], value: EligibilityProof +) {.raises: [IOError].} = + if value.proofOfPayment.isSome(): + writer.writeValue("0x" & value.proofOfPayment.get().toHex()) + else: + writer.writeValue("") + +proc readValue*( + reader: var JsonReader[RestJson], value: var EligibilityProof +) {.raises: [SerializationError, IOError].} = + let hexStr = reader.readValue(string) + if hexStr.len > 0: + let startIndex = if hexStr.len > 2 and hexStr[0..1] == "0x": 2 else: 0 + try: + let bytes = hexToSeqByte(hexStr[startIndex..^1]) + value = EligibilityProof(proofOfPayment: some(bytes)) + except ValueError as e: + # Either handle the error or re-raise it + raise newException(SerializationError, "Invalid hex string: " & e.msg) + else: + value = EligibilityProof(proofOfPayment: none(seq[byte])) \ No newline at end of file diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 7deff05934..d37e7370f9 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -22,7 +22,8 @@ import ../../waku_metadata, ../health_monitor/online_monitor, ./peer_store/peer_storage, - ./waku_peer_store + ./waku_peer_store, + ../../incentivization/[reputation_manager, eligibility_manager] export waku_peer_store, peer_storage, peers @@ -96,6 +97,11 @@ type PeerManager* = ref object of RootObj started: bool shardedPeerManagement: bool # temp feature flag onConnectionChange*: ConnectionChangeHandler + # clients of light protocols (like Lightpush) may track servers' reputation + reputationManager*: Option[ReputationManager] + # servers of light protocols (like Lightpush) may track client requests' eligibility + eligibilityManager*: Option[EligibilityManager] + dnsNameServers*: seq[IpAddress] online: bool ## state managed by online_monitor module #~~~~~~~~~~~~~~~~~~~# @@ -231,15 +237,42 @@ proc selectPeer*( # For other protocols, we select the peer that is slotted for the given protocol pm.serviceSlots.withValue(proto, serviceSlot): + # FIXME: if there is just one peer, reputation doesn't affect choice? trace "Got peer from service slots", peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto return some(serviceSlot[]) # If not slotted, we select a random peer for the given protocol if peers.len > 0: + # if reputation is enabled, filter out bad-reputation peers + debug "Before filtering - total peers:", numPeers = peers.len + var preSelectedPeers = + if pm.reputationManager.isSome(): + debug "Reputation enabled: consider only non-negative reputation peers" + peers.filterIt: + let rep = + try: + pm.reputationManager.get().getReputation(it.peerId) + except KeyError: + none(bool) + rep == none(bool) or rep == some(true) + else: + peers + + debug "Pre-selected peers from peerstore: ", numPeers = preSelectedPeers.len + if preSelectedPeers.len == 0: + debug "No suitable service peers with good enough reputation!" + return none(RemotePeerInfo) + + let selectedPeer = sample(preSelectedPeers) + + if pm.reputationManager.isSome(): + debug "Selected peer has reputation", reputation = pm.reputationManager.get().getReputation(selectedPeer.peerId) + trace "Got peer from peerstore", - peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto - return some(peers[0]) + peerId = selectedPeer.peerId, multi = selectedPeer.addrs[0], protocol = proto + return some(selectedPeer) + trace "No peer found for protocol", protocol = proto return none(RemotePeerInfo) @@ -1029,6 +1062,9 @@ proc new*( maxFailedAttempts = MaxFailedAttempts, colocationLimit = DefaultColocationLimit, shardedPeerManagement = false, + reputationEnabled = false, + eligibilityEnabled = false, + dnsNameServers = newSeq[IpAddress](), ): PeerManager {.gcsafe.} = let capacity = switch.peerStore.capacity let maxConnections = switch.connManager.inSema.size @@ -1110,4 +1146,16 @@ proc new*( else: trace "no peer storage found" + pm.reputationManager = + if reputationEnabled: + some(ReputationManager.new()) + else: + none(ReputationManager) + + pm.eligibilityManager = + if eligibilityEnabled: + some(EligibilityManager.new()) + else: + none(EligibilityManager) + return pm diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c9d5578943..1b3bc9666f 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -48,7 +48,8 @@ import ../waku_rln_relay, ./net_config, ./peer_manager, - ../common/rate_limit/setting + ../common/rate_limit/setting, + ../incentivization/[eligibility_manager, rpc] declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicHistogram waku_histogram_message_size, @@ -1101,7 +1102,7 @@ proc legacyLightpushPublish*( except CatchableError: return err(getCurrentExceptionMsg()) -# TODO: Move to application module (e.g., wakunode2.nim) +# TODO: Move to application module (e.g, wakunode2.nim) proc legacyLightpushPublish*( node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage ): Future[legacy_lightpush_protocol.WakuLightPushResult[string]] {. @@ -1163,16 +1164,19 @@ proc lightpushPublishHandler( node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage, + eligibilityProof: Option[EligibilityProof] = none(EligibilityProof), peer: RemotePeerInfo | PeerInfo, ): Future[lightpush_protocol.WakuLightPushResult] {.async.} = + let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() if not node.wakuLightpushClient.isNil(): notice "publishing message with lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, - msg_hash = msgHash - return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer) + msg_hash = msgHash, + eligibilityProof = eligibilityProof + return await node.wakuLightpushClient.publish(some(pubsubTopic), message, eligibilityProof, peer) if not node.wakuLightPush.isNil(): notice "publishing message with self hosted lightpush", @@ -1187,6 +1191,7 @@ proc lightpushPublish*( node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage, + eligibilityProof: Option[EligibilityProof] = none(EligibilityProof), peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo), ): Future[lightpush_protocol.WakuLightPushResult] {.async.} = if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): @@ -1224,7 +1229,7 @@ proc lightpushPublish*( error "lightpush publish error", error = msg return lighpushErrorResult(LightPushErrorCode.INTERNAL_SERVER_ERROR, msg) - return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer) + return await lightpushPublishHandler(node, pubsubForPublish, message, eligibilityProof, toPeer) ## Waku RLN Relay proc mountRlnRelay*( @@ -1322,7 +1327,7 @@ proc startPeerExchangeLoop*(node: WakuNode) = return node.wakuPeerExchange.pxLoopHandle = node.peerExchangeLoop() -# TODO: Move to application module (e.g., wakunode2.nim) +# TODO: Move to application module (e.g, wakunode2.nim) proc setPeerExchangePeer*( node: WakuNode, peer: RemotePeerInfo | MultiAddress | string ) = diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 04cc310103..00fd047b4c 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -21,6 +21,7 @@ import waku_node, node/peer_manager, waku_enr/sharding, + incentivization/reputation_manager, ], ../responses, ../serdes, @@ -60,20 +61,26 @@ type PeerProtocolTuple = connected: Connectedness, agent: string, origin: PeerOrigin, + reputation: Option[bool], ] + + proc tuplesToWakuPeers(peers: var WakuPeers, peersTup: seq[PeerProtocolTuple]) = for peer in peersTup: peers.add( peer.multiaddr, peer.protocol, peer.shards, peer.connected, peer.agent, - peer.origin, + peer.origin, convertReputationToString(peer.reputation), ) proc populateAdminPeerInfo( peers: var WakuPeers, node: WakuNode, codec: Option[string] = none[string]() ) = + let pm = node.peerManager if codec.isNone(): - peers = node.peerManager.switch.peerStore.peers().mapIt(WakuPeer.init(it)) + peers = node.peerManager.switch.peerStore.peers().mapIt( + WakuPeer.init(it, if pm.reputationManager.isSome(): pm.reputationManager.get().getReputation(it.peerId) else: none(bool)) + ) else: let peersTuples = node.peerManager.switch.peerStore.peers(codec.get()).mapIt( ( @@ -83,6 +90,7 @@ proc populateAdminPeerInfo( connected: it.connectedness, agent: it.agent, origin: it.origin, + reputation: if pm.reputationManager.isSome(): pm.reputationManager.get().getReputation(it.peerId) else: none(bool), ) ) tuplesToWakuPeers(peers, peersTuples) @@ -112,7 +120,9 @@ proc getRelayPeers(node: WakuNode): PeersOfShards = relayPeers.add( PeersOfShard( shard: relayShard.shardId, - peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)), + peers: toSeq(pubsubPeers).mapIt( + WakuPeer.init(it, node.peerManager, if node.peerManager.reputationManager.isSome(): node.peerManager.reputationManager.get().getReputation(it.peerId) else: none(bool)) + ), ) ) return relayPeers @@ -129,7 +139,9 @@ proc getMeshPeers(node: WakuNode): PeersOfShards = meshPeers.add( PeersOfShard( shard: relayShard.shardId, - peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)), + peers: toSeq(peers).mapIt( + WakuPeer.init(it, node.peerManager, if node.peerManager.reputationManager.isSome(): node.peerManager.reputationManager.get().getReputation(it.peerId) else: none(bool)) + ), ) ) return meshPeers @@ -157,7 +169,8 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = if node.peerManager.switch.peerStore.peerExists(peerIdVal): let peerInfo = node.peerManager.switch.peerStore.getPeer(peerIdVal) - let peer = WakuPeer.init(peerInfo) + let reputation = if node.peerManager.reputationManager.isSome(): node.peerManager.reputationManager.get().getReputation(peerIdVal) else: none(bool) + let peer = WakuPeer.init(peerInfo, reputation) let resp = RestApiResponse.jsonResponse(peer, status = Http200).valueOr: error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( @@ -258,7 +271,9 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = let pubsubPeers = node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0)) let relayPeer = PeersOfShard( - shard: shard, peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)) + shard: shard, peers: toSeq(pubsubPeers).mapIt( + WakuPeer.init(it, node.peerManager, if node.peerManager.reputationManager.isSome(): node.peerManager.reputationManager.get().getReputation(it.peerId) else: none(bool)) + ) ) let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200).valueOr: @@ -307,7 +322,9 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = let peers = node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0)) let relayPeer = PeersOfShard( - shard: shard, peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)) + shard: shard, peers: toSeq(peers).mapIt( + WakuPeer.init(it, node.peerManager, if node.peerManager.reputationManager.isSome(): node.peerManager.reputationManager.get().getReputation(it.peerId) else: none(bool)) + ) ) let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200).valueOr: diff --git a/waku/waku_api/rest/admin/types.nim b/waku/waku_api/rest/admin/types.nim index 483acf8b87..e33c002fe3 100644 --- a/waku/waku_api/rest/admin/types.nim +++ b/waku/waku_api/rest/admin/types.nim @@ -7,7 +7,7 @@ import json_serialization/lexer, results, libp2p/protocols/pubsub/pubsubpeer -import waku/[waku_core, node/peer_manager], ../serdes +import waku/[waku_core, node/peer_manager, incentivization/reputation_manager], ../serdes #### Types type WakuPeer* = object @@ -18,6 +18,7 @@ type WakuPeer* = object agent*: string origin*: PeerOrigin score*: Option[float64] + reputation*: Option[string] # "Good", "Bad", or "Neutral" type WakuPeers* = seq[WakuPeer] @@ -50,6 +51,7 @@ proc writeValue*( writer.writeField("agent", value.agent) writer.writeField("origin", value.origin) writer.writeField("score", value.score) + writer.writeField("reputation", value.reputation) writer.endRecord() proc writeValue*( @@ -104,6 +106,7 @@ proc readValue*( agent: Option[string] origin: Option[PeerOrigin] score: Option[float64] + reputation: Option[string] for fieldName in readObjectFields(reader): case fieldName @@ -135,6 +138,10 @@ proc readValue*( if score.isSome(): reader.raiseUnexpectedField("Multiple `score` fields found", "WakuPeer") score = some(reader.readValue(float64)) + of "reputation": + if reputation.isSome(): + reader.raiseUnexpectedField("Multiple `reputation` fields found", "WakuPeer") + reputation = some(reader.readValue(string)) else: unrecognizedFieldWarning(value) @@ -164,6 +171,7 @@ proc readValue*( agent: agent.get(), origin: origin.get(), score: score, + reputation: reputation, ) proc readValue*( @@ -276,7 +284,9 @@ proc readValue*( func `==`*(a, b: WakuPeer): bool {.inline.} = return a.multiaddr == b.multiaddr -proc init*(T: type WakuPeer, peerInfo: RemotePeerInfo): WakuPeer = + + +proc init*(T: type WakuPeer, peerInfo: RemotePeerInfo, reputation: Option[bool] = none(bool)): WakuPeer = result = WakuPeer( multiaddr: constructMultiaddrStr(peerInfo), protocols: peerInfo.protocols, @@ -285,9 +295,10 @@ proc init*(T: type WakuPeer, peerInfo: RemotePeerInfo): WakuPeer = agent: peerInfo.agent, origin: peerInfo.origin, score: none(float64), + reputation: convertReputationToString(reputation), ) -proc init*(T: type WakuPeer, pubsubPeer: PubSubPeer, pm: PeerManager): WakuPeer = +proc init*(T: type WakuPeer, pubsubPeer: PubSubPeer, pm: PeerManager, reputation: Option[bool] = none(bool)): WakuPeer = let peerInfo = pm.getPeer(pubsubPeer.peerId) result = WakuPeer( multiaddr: constructMultiaddrStr(peerInfo), @@ -297,6 +308,7 @@ proc init*(T: type WakuPeer, pubsubPeer: PubSubPeer, pm: PeerManager): WakuPeer agent: peerInfo.agent, origin: peerInfo.origin, score: some(pubsubPeer.score), + reputation: convertReputationToString(reputation), ) proc add*( @@ -307,6 +319,7 @@ proc add*( connected: Connectedness, agent: string, origin: PeerOrigin, + reputation: Option[string] = none(string), ) = var peer: WakuPeer = WakuPeer( multiaddr: multiaddr, @@ -315,6 +328,8 @@ proc add*( connected: connected, agent: agent, origin: origin, + score: none(float64), + reputation: reputation, ) let idx = peers.find(peer) diff --git a/waku/waku_api/rest/lightpush/handlers.nim b/waku/waku_api/rest/lightpush/handlers.nim index a724aa1c9f..e0ed9cac62 100644 --- a/waku/waku_api/rest/lightpush/handlers.nim +++ b/waku/waku_api/rest/lightpush/handlers.nim @@ -17,7 +17,8 @@ import ../serdes, ../responses, ../rest_serdes, - ./types + ./types, + waku/incentivization/rpc export types @@ -93,7 +94,9 @@ proc installLightPushRequestHandler*( makeRestResponse(lightpushResultServiceUnavailable(NoPeerNoneFoundError)) toPeer = some(aPeer) - let subFut = node.lightpushPublish(req.pubsubTopic, msg, toPeer) + debug "Handling Lightpush request:", req + + let subFut = node.lightpushPublish(req.pubsubTopic, msg, req.eligibilityProof, toPeer) if not await subFut.withTimeout(FutTimeoutForPushRequestProcessing): error "Failed to request a message push due to timeout!" diff --git a/waku/waku_api/rest/lightpush/types.nim b/waku/waku_api/rest/lightpush/types.nim index 1fb87ab45c..e2efa5879f 100644 --- a/waku/waku_api/rest/lightpush/types.nim +++ b/waku/waku_api/rest/lightpush/types.nim @@ -7,7 +7,7 @@ import json_serialization/std/options, presto/[route, client] -import ../../../waku_core, ../relay/types as relay_types, ../serdes +import ../../../waku_core, ../../../incentivization/rpc, ../relay/types as relay_types, ../serdes export relay_types @@ -17,6 +17,7 @@ type PushRequest* = object pubsubTopic*: Option[PubSubTopic] message*: RelayWakuMessage + eligibilityProof*: Option[EligibilityProof] PushResponse* = object statusDesc*: Option[string] @@ -30,6 +31,8 @@ proc writeValue*( if value.pubsubTopic.isSome(): writer.writeField("pubsubTopic", value.pubsubTopic.get()) writer.writeField("message", value.message) + if value.eligibilityProof.isSome(): + writer.writeField("eligibilityProof", value.eligibilityProof.get()) writer.endRecord() proc readValue*( @@ -38,6 +41,7 @@ proc readValue*( var pubsubTopic = none(PubsubTopic) message = none(RelayWakuMessage) + eligibilityProof = none(EligibilityProof) var keys = initHashSet[string]() for fieldName in readObjectFields(reader): @@ -55,6 +59,8 @@ proc readValue*( pubsubTopic = some(reader.readValue(PubsubTopic)) of "message": message = some(reader.readValue(RelayWakuMessage)) + of "eligibilityProof": + eligibilityProof = some(reader.readValue(EligibilityProof)) else: unrecognizedFieldWarning(value) @@ -68,6 +74,7 @@ proc readValue*( else: some(pubsubTopic.get()), message: message.get(), + eligibilityProof: eligibilityProof, ) proc writeValue*( diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index efb330d91e..19ce0cc253 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -11,7 +11,8 @@ import ./common, ./protocol_metrics, ./rpc, - ./rpc_codec + ./rpc_codec, + ../incentivization/[reputation_manager, eligibility_manager, rpc] logScope: topics = "waku lightpush client" @@ -45,7 +46,9 @@ proc sendPushRequest( try: buffer = await connection.readLp(DefaultMaxRpcSize.int) except LPStreamRemoteClosedError: - error "Failed to read response from peer", error = getCurrentExceptionMsg() + error "Failed to read responose from peer", error = getCurrentExceptionMsg() + if wl.peerManager.reputationManager.isSome: + wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false)) return lightpushResultInternalError( "Failed to read response from peer: " & getCurrentExceptionMsg() ) @@ -53,38 +56,48 @@ proc sendPushRequest( let response = LightpushResponse.decode(buffer).valueOr: error "failed to decode response" waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure]) + if wl.peerManager.reputationManager.isSome: + wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false)) return lightpushResultInternalError(decodeRpcFailure) if response.requestId != req.requestId and response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS: error "response failure, requestId mismatch", requestId = req.requestId, responseRequestId = response.requestId + if wl.peerManager.reputationManager.isSome: + wl.peerManager.reputationManager.get().setReputation(peer.peerId, some(false)) return lightpushResultInternalError("response failure, requestId mismatch") + if wl.peerManager.reputationManager.isSome: + wl.peerManager.reputationManager.get().updateReputationFromResponse( + peer.peerId, response + ) + return toPushResult(response) proc publish*( wl: WakuLightPushClient, pubSubTopic: Option[PubsubTopic] = none(PubsubTopic), wakuMessage: WakuMessage, + eligibilityProof: Option[EligibilityProof] = none(EligibilityProof), peer: PeerId | RemotePeerInfo, ): Future[WakuLightPushResult] {.async, gcsafe.} = var message = wakuMessage if message.timestamp == 0: message.timestamp = getNowInNanosecondTime() - when peer is PeerId: - info "publish", - peerId = shortLog(peer), - msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex - else: - info "publish", - peerId = shortLog(peer.peerId), - msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex + info "publish", + peerId = shortLog( + when peer is PeerId: peer + else: peer.peerId + ), + msg_hash = computeMessageHash(pubSubTopic.get(""), message).to0xHex + # i13n POC: add eligibilityProof to the request let pushRequest = LightpushRequest( - requestId: generateRequestId(wl.rng), pubSubTopic: pubSubTopic, message: message + requestId: generateRequestId(wl.rng), pubSubTopic: pubSubTopic, message: message, eligibilityProof: eligibilityProof ) + debug "Created Lightpush request: ", pushRequest let publishedCount = ?await wl.sendPushRequest(pushRequest, peer) for obs in wl.publishObservers: @@ -93,7 +106,8 @@ proc publish*( return lightpushSuccessResult(publishedCount) proc publishToAny*( - wl: WakuLightPushClient, pubSubTopic: PubsubTopic, wakuMessage: WakuMessage + wl: WakuLightPushClient, pubSubTopic: PubsubTopic, wakuMessage: WakuMessage, + eligibilityproof: Option[EligibilityProof] = none(EligibilityProof) ): Future[WakuLightPushResult] {.async, gcsafe.} = ## This proc is similar to the publish one but in this case ## we don't specify a particular peer and instead we get it from peer manager @@ -103,21 +117,10 @@ proc publishToAny*( message.timestamp = getNowInNanosecondTime() info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex - let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: # TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side? return lighpushErrorResult( LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers" ) - let pushRequest = LightpushRequest( - requestId: generateRequestId(wl.rng), - pubSubTopic: some(pubSubTopic), - message: message, - ) - let publishedCount = ?await wl.sendPushRequest(pushRequest, peer) - - for obs in wl.publishObservers: - obs.onMessagePublished(pubSubTopic, message) - - return lightpushSuccessResult(publishedCount) + return await wl.publish(some(pubSubTopic), message, eligibilityproof, peer) diff --git a/waku/waku_lightpush/common.nim b/waku/waku_lightpush/common.nim index f2687834e2..f810dd904f 100644 --- a/waku/waku_lightpush/common.nim +++ b/waku/waku_lightpush/common.nim @@ -11,6 +11,7 @@ const LightPushSuccessCode* = (SUCCESS: LightPushStatusCode(200)) const LightPushErrorCode* = ( BAD_REQUEST: LightPushStatusCode(400), + PAYMENT_REQUIRED: LightPushStatusCode(402), PAYLOAD_TOO_LARGE: LightPushStatusCode(413), INVALID_MESSAGE: LightPushStatusCode(420), UNSUPPORTED_PUBSUB_TOPIC: LightPushStatusCode(421), diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 955b1ade53..c1bfbf630f 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -16,7 +16,8 @@ import ./rpc, ./rpc_codec, ./protocol_metrics, - ../common/rate_limit/request_limiter + ../common/rate_limit/request_limiter, + ../incentivization/[eligibility_manager, rpc] logScope: topics = "waku lightpush" @@ -89,6 +90,48 @@ proc handleRequest*( statusDesc: some(desc), ) + # Check eligibility if manager is available + if wl.peerManager.eligibilityManager.isSome(): + debug "eligibilityManager is enabled" + let em = wl.peerManager.eligibilityManager.get() + + try: + debug "checking eligibilityProof..." + + # Check if eligibility proof is provided + if pushRequest.eligibilityProof.isNone(): + let msg = "Eligibility proof is required" + error "lightpush request handling error", error = msg + return LightpushResponse( + requestId: pushRequest.requestId, + statusCode: LightPushErrorCode.PAYMENT_REQUIRED, + statusDesc: some(msg), + ) + + let isEligible = await em.isEligibleTxId(pushRequest.eligibilityProof.get()) + if isEligible.isErr(): + let msg = "Eligibility check failed: " & isEligible.error + error "lightpush request handling error", error = msg + return LightpushResponse( + requestId: pushRequest.requestId, + statusCode: LightPushErrorCode.PAYMENT_REQUIRED, + statusDesc: some(msg), + ) + + debug "Eligibility check passed!" + + except CatchableError: + let msg = "Eligibility check threw exception: " & getCurrentExceptionMsg() + error "lightpush request handling error", error = msg + return LightpushResponse( + requestId: pushRequest.requestId, + statusCode: LightPushErrorCode.PAYMENT_REQUIRED, + statusDesc: some(msg), + ) + else: + # the service node doesn't want to check eligibility + debug "eligibilityManager is disabled - skipping eligibility check" + let relayPeerCount = (await handleRequest(wl, peerId, pushRequest)).valueOr: let desc = error.desc waku_lightpush_v3_errors.inc(labelValues = [$error.code]) @@ -97,6 +140,12 @@ proc handleRequest*( requestId: pushRequest.requestId, statusCode: error.code, statusDesc: desc ) + # Add txid to seen txids in eligibility manager after all checks pass + if wl.peerManager.eligibilityManager.isSome() and pushRequest.eligibilityProof.isSome(): + let em = wl.peerManager.eligibilityManager.get() + let txIdBytes = pushRequest.eligibilityProof.get().proofOfPayment.get() + em.markTxIdSeen(txIdBytes) + return LightPushResponse( requestId: pushRequest.requestId, statusCode: LightPushSuccessCode.SUCCESS, diff --git a/waku/waku_lightpush/rpc.nim b/waku/waku_lightpush/rpc.nim index f19563b991..3e370e25df 100644 --- a/waku/waku_lightpush/rpc.nim +++ b/waku/waku_lightpush/rpc.nim @@ -2,6 +2,7 @@ import std/options import ../waku_core +import ../incentivization/rpc type LightPushStatusCode* = distinct uint32 proc `==`*(a, b: LightPushStatusCode): bool {.borrow.} @@ -12,6 +13,7 @@ type requestId*: string pubSubTopic*: Option[PubsubTopic] message*: WakuMessage + eligibilityProof*: Option[EligibilityProof] LightPushResponse* = object requestId*: string diff --git a/waku/waku_lightpush/rpc_codec.nim b/waku/waku_lightpush/rpc_codec.nim index 0a4f934d6a..05c7fdfd28 100644 --- a/waku/waku_lightpush/rpc_codec.nim +++ b/waku/waku_lightpush/rpc_codec.nim @@ -2,6 +2,7 @@ import std/options import ../common/protobuf, ../waku_core, ./rpc +import ../incentivization/[rpc, rpc_codec] const DefaultMaxRpcSize* = -1 @@ -11,8 +12,11 @@ proc encode*(rpc: LightpushRequest): ProtoBuffer = pb.write3(1, rpc.requestId) pb.write3(20, rpc.pubSubTopic) pb.write3(21, rpc.message.encode()) - pb.finish3() + if rpc.eligibilityProof.isSome(): + pb.write3(22, rpc.eligibilityProof.get().encode()) + + pb.finish3() return pb proc decode*(T: type LightpushRequest, buffer: seq[byte]): ProtobufResult[T] = @@ -37,6 +41,13 @@ proc decode*(T: type LightpushRequest, buffer: seq[byte]): ProtobufResult[T] = else: rpc.message = ?WakuMessage.decode(messageBuf) + var eligibilityProofBytes: seq[byte] + if not ?pb.getField(22, eligibilityProofBytes): + rpc.eligibilityProof = none(EligibilityProof) + else: + let decodedProof = ?EligibilityProof.decode(eligibilityProofBytes) + rpc.eligibilityProof = some(decodedProof) + return ok(rpc) proc encode*(rpc: LightPushResponse): ProtoBuffer =