diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 8021ac0a91..c6936f5650 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -25,4 +25,8 @@ export { StreamManager } from "./lib/stream_manager/index.js"; export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js"; -export { messageHash, messageHashStr } from "./lib/message_hash/index.js"; +export { + messageHash, + messageHashStr, + deterministicMessageHashing +} from "./lib/message_hash/index.js"; diff --git a/packages/core/src/lib/message_hash/index.ts b/packages/core/src/lib/message_hash/index.ts index e641c301f3..ab889ebe6f 100644 --- a/packages/core/src/lib/message_hash/index.ts +++ b/packages/core/src/lib/message_hash/index.ts @@ -1 +1,5 @@ -export { messageHash, messageHashStr } from "./message_hash.js"; +export { + messageHash, + messageHashStr, + deterministicMessageHashing +} from "./message_hash.js"; diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index caecb73aec..36b6bf67bc 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -105,6 +105,7 @@ export interface IEncoder { export interface IDecoder { contentTopic: string; pubsubTopic: PubsubTopic; + routingInfo: IRoutingInfo; fromWireToProtoObj: (bytes: Uint8Array) => Promise; fromProtoObj: ( pubsubTopic: string, diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 71914755f7..16e3f55f01 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -11,9 +11,10 @@ import type { HealthStatus } from "./health_status.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; import { IDecodedMessage, IDecoder, IEncoder } from "./message.js"; +import { ContentTopic } from "./misc.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; -import type { ShardId } from "./sharding.js"; +import type { IRoutingInfo, ShardId } from "./sharding.js"; import type { IStore } from "./store.js"; export type CreateDecoderParams = { @@ -27,7 +28,8 @@ export type CreateEncoderParams = CreateDecoderParams & { export enum WakuEvent { Connection = "waku:connection", - Health = "waku:health" + Health = "waku:health", + SubscribeError = "subscribe:error" } export interface IWakuEvents { @@ -52,9 +54,28 @@ export interface IWakuEvents { * }); */ [WakuEvent.Health]: CustomEvent; + + /** + * Emitted when there is an irrecoverable error when subscribing. + * + * @example + * ```typescript + * waku.addEventListener(WakuEvent.SubscribeError, (event) => { + * console.log(event.detail); + * }); + */ + [WakuEvent.SubscribeError]: CustomEvent; +} + +export interface IMessageEmitterEvents { + [contentTopic: string]: CustomEvent<{ + payload: Uint8Array; + messageHash: Uint8Array; + }>; } export type IWakuEventEmitter = TypedEventEmitter; +export type IMessageEmitter = TypedEventEmitter; export interface IWaku { libp2p: Libp2p; @@ -78,6 +99,20 @@ export interface IWaku { */ events: IWakuEventEmitter; + /** + * Emits messages on their content topic. Messages may be coming from subscriptions + * or store queries (TODO). The payload is directly emitted + * + * @example + * ```typescript + * waku.messageEmitter.addEventListener("/some/0/content-topic/proto", (event) => { + * const payload: UInt8Array = event.detail + * MyDecoder.decode(payload); + * }); + * ``` + */ + messageEmitter: IMessageEmitter; + /** * Returns a unique identifier for a node on the network. * @@ -221,6 +256,8 @@ export interface IWaku { */ createDecoder(params: CreateDecoderParams): IDecoder; + createRoutingInfo(contentTopic?: string, shardId?: number): IRoutingInfo; + /** * Creates an encoder for Waku messages on a specific content topic. * @@ -251,6 +288,8 @@ export interface IWaku { */ createEncoder(params: CreateEncoderParams): IEncoder; + subscribe(contentTopics: ContentTopic[]): void; + /** * @returns {boolean} `true` if the node was started and `false` otherwise */ diff --git a/packages/message-encryption/src/crypto/utils.ts b/packages/message-encryption/src/crypto/utils.ts index fd57811bd4..15e9ed4b08 100644 --- a/packages/message-encryption/src/crypto/utils.ts +++ b/packages/message-encryption/src/crypto/utils.ts @@ -74,3 +74,32 @@ export async function sign( export function keccak256(input: Uint8Array): Uint8Array { return new Uint8Array(sha3.keccak256.arrayBuffer(input)); } + +/** + * Compare two public keys, can be used to verify that a given signature matches + * expectations. + * + * @param publicKeyA - The first public key to compare + * @param publicKeyB - The second public key to compare + * @returns true if the public keys are the same + */ +export function comparePublicKeys( + publicKeyA: Uint8Array | undefined, + publicKeyB: Uint8Array | undefined +): boolean { + if (!publicKeyA || !publicKeyB) { + return false; + } + + if (publicKeyA.length !== publicKeyB.length) { + return false; + } + + for (let i = 0; i < publicKeyA.length; i++) { + if (publicKeyA[i] !== publicKeyB[i]) { + return false; + } + } + + return true; +} diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index 79b6753833..fdace3c8dc 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -1,11 +1,17 @@ import { + comparePublicKeys, generatePrivateKey, generateSymmetricKey, getPublicKey } from "./crypto/index.js"; import { DecodedMessage } from "./decoded_message.js"; -export { generatePrivateKey, generateSymmetricKey, getPublicKey }; +export { + generatePrivateKey, + generateSymmetricKey, + getPublicKey, + comparePublicKeys +}; export type { DecodedMessage }; export * as ecies from "./ecies.js"; diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 87fcbbb8fc..2f44ef8606 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -206,3 +206,105 @@ export function createDecoder( ): Decoder { return new Decoder(contentTopic, routingInfo, symKey); } + +/** + * Result of decrypting a message with AES symmetric encryption. + */ +export interface SymmetricDecryptionResult { + /** The decrypted payload */ + payload: Uint8Array; + /** The signature if the message was signed */ + signature?: Uint8Array; + /** The recovered public key if the message was signed */ + signaturePublicKey?: Uint8Array; +} + +/** + * AES symmetric encryption. + * + * + * Follows [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/) encryption standard. + */ +export class SymmetricEncryption { + /** + * Creates an AES Symmetric encryption instance. + * + * @param symKey - The symmetric key for encryption (32 bytes recommended) + * @param sigPrivKey - Optional private key to sign messages before encryption + */ + public constructor( + private symKey: Uint8Array, + private sigPrivKey?: Uint8Array + ) {} + + /** + * Encrypts a byte array payload. + * + * The encryption process: + * 1. Optionally signs the payload with the private key + * 2. Adds padding to obscure payload size + * 3. Encrypts using AES-256-GCM + * + * @param payload - The data to encrypt + * @returns The encrypted payload + */ + public async encrypt(payload: Uint8Array): Promise { + const preparedPayload = await preCipher(payload, this.sigPrivKey); + return encryptSymmetric(preparedPayload, this.symKey); + } +} + +/** + * AES symmetric decryption. + * + * Follows [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/) encryption standard. + */ +export class SymmetricDecryption { + /** + * Creates an AES Symmetric decryption instance. + * + * @param symKey - The symmetric key for decryption (must match encryption key) + */ + public constructor(private symKey: Uint8Array) {} + + /** + * Decrypts an encrypted byte array payload. + * + * The decryption process: + * 1. Decrypts using AES-256-GCM + * 2. Removes padding + * 3. Verifies and recovers signature if present + * + * @param encryptedPayload - The encrypted data (from [[SymmetricEncryption.encrypt]]) + * @returns Object containing the decrypted payload and signature info, or undefined if decryption fails + */ + public async decrypt( + encryptedPayload: Uint8Array + ): Promise { + try { + const decryptedData = await decryptSymmetric( + encryptedPayload, + this.symKey + ); + + if (!decryptedData) { + return undefined; + } + + const result = postCipher(decryptedData); + + if (!result) { + return undefined; + } + + return { + payload: result.payload, + signature: result.sig?.signature, + signaturePublicKey: result.sig?.publicKey + }; + } catch (error) { + log.error("Failed to decrypt payload", error); + return undefined; + } + } +} diff --git a/packages/sdk/src/reliable_channel/events.ts b/packages/sdk/src/reliable_channel/events.ts index c79c2c0c0f..2382c73103 100644 --- a/packages/sdk/src/reliable_channel/events.ts +++ b/packages/sdk/src/reliable_channel/events.ts @@ -1,4 +1,4 @@ -import { IDecodedMessage, ProtocolError } from "@waku/interfaces"; +import { ProtocolError } from "@waku/interfaces"; import type { HistoryEntry, MessageId } from "@waku/sds"; export const ReliableChannelEvent = { @@ -56,8 +56,7 @@ export interface ReliableChannelEvents { possibleAckCount: number; }>; "message-acknowledged": CustomEvent; - // TODO probably T extends IDecodedMessage? - "message-received": CustomEvent; + "message-received": CustomEvent; "irretrievable-message": CustomEvent; "sending-message-irrecoverable-error": CustomEvent<{ messageId: MessageId; diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 49b55aa495..783220f17e 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -1,15 +1,10 @@ import { TypedEventEmitter } from "@libp2p/interface"; -import { messageHash } from "@waku/core"; import { - type Callback, + type ContentTopic, type IDecodedMessage, type IDecoder, - type IEncoder, - type IMessage, - ISendOptions, type IWaku, LightPushError, - LightPushSDKResult, QueryRequestParams } from "@waku/interfaces"; import { @@ -23,14 +18,9 @@ import { SyncMessage } from "@waku/sds"; import { Logger } from "@waku/utils"; - -import { - QueryOnConnect, - QueryOnConnectEvent -} from "../query_on_connect/index.js"; +import { bytesToHex } from "@waku/utils/bytes"; import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js"; -import { MissingMessageRetriever } from "./missing_message_retriever.js"; import { RetryManager } from "./retry_manager.js"; const log = new Logger("sdk:reliable-channel"); @@ -113,6 +103,16 @@ export type ReliableChannelOptions = MessageChannelOptions & { processTaskMinElapseMs?: number; }; +/** + * It is best for SDS (e2e reliability) to happen within the encryption layer. + * Hence, the consumer need to pass encryption and decryption methods for + * outgoing and incoming messages. + */ +export interface IEncryption { + encrypt: (clearPayload: Uint8Array) => Uint8Array | Promise; + decrypt: (encryptedPayload: Uint8Array) => Uint8Array | Promise; +} + /** * An easy-to-use reliable channel that ensures all participants to the channel have eventual message consistency. * @@ -122,19 +122,16 @@ export type ReliableChannelOptions = MessageChannelOptions & { * @emits [[ReliableChannelEvents]] * */ -export class ReliableChannel< - T extends IDecodedMessage -> extends TypedEventEmitter { +export class ReliableChannel extends TypedEventEmitter { + // TODO: this is PoC, we assume that message id is returned, and `undefined` means some error. + // Borrowed from https://github.com/waku-org/js-waku/pull/2583/ for now private readonly _send: ( - encoder: IEncoder, - message: IMessage, - sendOptions?: ISendOptions - ) => Promise; + contentTopic: string, + payload: Uint8Array, + ephemeral?: boolean + ) => Promise; - private readonly _subscribe: ( - decoders: IDecoder | IDecoder[], - callback: Callback - ) => Promise; + private readonly _subscribe: (contentTopics: ContentTopic[]) => void; private readonly _retrieve?: ( decoders: IDecoder[], @@ -147,36 +144,36 @@ export class ReliableChannel< private readonly sweepInBufIntervalMs: number; private processTaskTimeout: ReturnType | undefined; private readonly retryManager: RetryManager | undefined; - private readonly missingMessageRetriever?: MissingMessageRetriever; - private readonly queryOnConnect?: QueryOnConnect; + // private readonly missingMessageRetriever?: MissingMessageRetriever; + // private readonly queryOnConnect?: QueryOnConnect; private readonly processTaskMinElapseMs: number; private _started: boolean; + private encryption: IEncryption; private constructor( public node: IWaku, public messageChannel: MessageChannel, - private encoder: IEncoder, - private decoder: IDecoder, + private contentTopic: ContentTopic, + encryption?: IEncryption, options?: ReliableChannelOptions ) { super(); if (node.lightPush) { - this._send = node.lightPush.send.bind(node.lightPush); + // TODO: this is just a PoC + // this._send = node.lightPush.send.bind(node.lightPush); } else if (node.relay) { - this._send = node.relay.send.bind(node.relay); + // this._send = node.relay.send.bind(node.relay); } else { throw "No protocol available to send messages"; } - if (node.filter) { - this._subscribe = node.filter.subscribe.bind(node.filter); - } else if (node.relay) { - // TODO: Why do relay and filter have different interfaces? - // this._subscribe = node.relay.subscribeWithUnsubscribe; - throw "Not implemented"; - } else { - throw "No protocol available to receive messages"; - } + this._subscribe = node.subscribe.bind(node); + + // If no encryption, just set a pass through without changing the payload to keep the code simpler + this.encryption = encryption ?? { + encrypt: (p: Uint8Array) => p, + decrypt: (p: Uint8Array) => p + }; if (node.store) { this._retrieve = node.store.queryGenerator.bind(node.store); @@ -185,13 +182,14 @@ export class ReliableChannel< peerManagerEvents !== undefined && (options?.queryOnConnect ?? true) ) { - this.queryOnConnect = new QueryOnConnect( - [this.decoder], - this.isChannelMessageWithCausalHistory.bind(this), - peerManagerEvents, - node.events, - this._retrieve.bind(this) - ); + // this.queryOnConnect = new QueryOnConnect( + // [this.decoder], + // this.isChannelMessageWithCausalHistory.bind(this), + // peerManagerEvents, + // node.events, + // this._retrieve.bind(this) + // ); + // TODO: stop using decoder for store } } @@ -215,14 +213,15 @@ export class ReliableChannel< options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; if (this._retrieve) { - this.missingMessageRetriever = new MissingMessageRetriever( - this.decoder, - options?.retrieveFrequencyMs, - this._retrieve, - async (msg: T) => { - await this.processIncomingMessage(msg); - } - ); + // this.missingMessageRetriever = new MissingMessageRetriever( + // this.decoder, + // options?.retrieveFrequencyMs, + // this._retrieve, + // async (msg: T) => { + // await this.processIncomingMessage(msg.payload); + // } + // ); + // TODO: stop using decoder for store } this._started = false; @@ -261,26 +260,26 @@ export class ReliableChannel< * @param decoder A channel operates within a singular encryption layer, hence the same decoder is needed for all messages * @param options */ - public static async create( + public static async create( node: IWaku, channelId: ChannelId, senderId: SenderId, - encoder: IEncoder, - decoder: IDecoder, + contentTopic: ContentTopic, + encryption?: IEncryption, options?: ReliableChannelOptions - ): Promise> { + ): Promise { const sdsMessageChannel = new MessageChannel(channelId, senderId, options); const messageChannel = new ReliableChannel( node, sdsMessageChannel, - encoder, - decoder, + contentTopic, + encryption, options ); const autoStart = options?.autoStart ?? true; if (autoStart) { - await messageChannel.start(); + messageChannel.start(); } return messageChannel; @@ -317,51 +316,32 @@ export class ReliableChannel< // `payload` wrapped in SDS const sdsPayload = sdsMessage.encode(); - - const wakuMessage = { - payload: sdsPayload - }; + const encPayload = await this.encryption.encrypt(sdsPayload); const messageId = ReliableChannel.getMessageId(messagePayload); - // TODO: should the encoder give me the message hash? - // Encoding now to fail early, used later to get message hash - const protoMessage = await this.encoder.toProtoObj(wakuMessage); - if (!protoMessage) { - this.safeSendEvent("sending-message-irrecoverable-error", { - detail: { - messageId: messageId, - error: "could not encode message" - } - }); - return { success: false }; - } - const retrievalHint = messageHash( - this.encoder.pubsubTopic, - protoMessage - ); - this.safeSendEvent("sending-message", { detail: messageId }); - const sendRes = await this._send(this.encoder, wakuMessage); + const retrievalHint = await this._send(this.contentTopic, encPayload); // If it's a recoverable failure, we will try again to send later // If not, then we should error to the user now - for (const { error } of sendRes.failures) { - if (IRRECOVERABLE_SENDING_ERRORS.includes(error)) { - // Not recoverable, best to return it - log.error("Irrecoverable error, cannot send message: ", error); - this.safeSendEvent("sending-message-irrecoverable-error", { - detail: { - messageId, - error - } - }); - return { success: false, retrievalHint }; - } - } + // for (const { error } of sendRes.failures) { + // if (IRRECOVERABLE_SENDING_ERRORS.includes(error)) { + // // Not recoverable, best to return it + // log.error("Irrecoverable error, cannot send message: ", error); + // this.safeSendEvent("sending-message-irrecoverable-error", { + // detail: { + // messageId, + // error + // } + // }); + // return { success: false, retrievalHint }; + // } + // } + // TODO: if failure, process it return { success: true, @@ -381,26 +361,35 @@ export class ReliableChannel< }); } - private async subscribe(): Promise { + private subscribe(): void { this.assertStarted(); - return this._subscribe(this.decoder, async (message: T) => { - await this.processIncomingMessage(message); + this.node.messageEmitter.addEventListener(this.contentTopic, (event) => { + const { payload, messageHash } = event.detail; + // messageHash is the retrievalHint + void this.processIncomingMessage(payload, messageHash); }); + + this._subscribe([this.contentTopic]); } /** * Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done. - * @param msg * @private + * @param payload */ - private async processIncomingMessage( - msg: T + private async processIncomingMessage( + payload: Uint8Array, + retrievalHint: Uint8Array ): Promise { - // New message arrives, we need to unwrap it first - const sdsMessage = SdsMessage.decode(msg.payload); + // Decrypt first + // TODO: skip on failure + const decPayload = await this.encryption.decrypt(payload); + + // Unwrap SDS layer + const sdsMessage = SdsMessage.decode(decPayload); if (!sdsMessage) { - log.error("could not SDS decode message", msg); + log.error("could not SDS decode message"); return; } @@ -412,48 +401,33 @@ export class ReliableChannel< return; } - const retrievalHint = msg.hash; - log.info(`processing message ${sdsMessage.messageId}:${msg.hashStr}`); + log.info( + `processing message ${sdsMessage.messageId}:${bytesToHex(retrievalHint)}` + ); // SDS Message decoded, let's pass it to the channel so we can learn about // missing messages or the status of previous outgoing messages this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); - this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); + // TODO + // this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); if (sdsMessage.content && sdsMessage.content.length > 0) { // Now, process the message with callback - - // Overrides msg.payload with unwrapped payload - // TODO: can we do better? - const { payload: _p, ...allButPayload } = msg; - const unwrappedMessage = Object.assign(allButPayload, { - payload: sdsMessage.content, - hash: msg.hash, - hashStr: msg.hashStr, - version: msg.version, - contentTopic: msg.contentTopic, - pubsubTopic: msg.pubsubTopic, - timestamp: msg.timestamp, - rateLimitProof: msg.rateLimitProof, - ephemeral: msg.ephemeral, - meta: msg.meta - }); - this.safeSendEvent("message-received", { - detail: unwrappedMessage as unknown as T + detail: sdsMessage.content }); } this.queueProcessTasks(); } - private async processIncomingMessages( - messages: T[] - ): Promise { - for (const message of messages) { - await this.processIncomingMessage(message); - } - } + // private async processIncomingMessages( + // messages: T[] + // ): Promise { + // for (const message of messages) { + // await this.processIncomingMessage(message.payload); + // } + // } // TODO: For now we only queue process tasks for incoming messages // As this is where there is most volume @@ -472,17 +446,17 @@ export class ReliableChannel< } } - public async start(): Promise { - if (this._started) return true; + public start(): void { + if (this._started) return; this._started = true; this.setupEventListeners(); this.restartSync(); this.startSweepIncomingBufferLoop(); - if (this._retrieve) { - this.missingMessageRetriever?.start(); - this.queryOnConnect?.start(); - } - return this.subscribe(); + // if (this._retrieve) { + // this.missingMessageRetriever?.start(); + // this.queryOnConnect?.start(); + // } + this.subscribe(); } public stop(): void { @@ -490,8 +464,8 @@ export class ReliableChannel< this._started = false; this.stopSync(); this.stopSweepIncomingBufferLoop(); - this.missingMessageRetriever?.stop(); - this.queryOnConnect?.stop(); + // this.missingMessageRetriever?.stop(); + // this.queryOnConnect?.stop(); // TODO unsubscribe // TODO unsetMessageListeners } @@ -665,27 +639,27 @@ export class ReliableChannel< } ); - this.messageChannel.addEventListener( - MessageChannelEvent.InMessageMissing, - (event) => { - for (const { messageId, retrievalHint } of event.detail) { - if (retrievalHint && this.missingMessageRetriever) { - this.missingMessageRetriever.addMissingMessage( - messageId, - retrievalHint - ); - } - } - } - ); - - if (this.queryOnConnect) { - this.queryOnConnect.addEventListener( - QueryOnConnectEvent.MessagesRetrieved, - (event) => { - void this.processIncomingMessages(event.detail); - } - ); - } + // this.messageChannel.addEventListener( + // MessageChannelEvent.InMessageMissing, + // (event) => { + // for (const { messageId, retrievalHint } of event.detail) { + // if (retrievalHint && this.missingMessageRetriever) { + // this.missingMessageRetriever.addMissingMessage( + // messageId, + // retrievalHint + // ); + // } + // } + // } + // ); + + // if (this.queryOnConnect) { + // this.queryOnConnect.addEventListener( + // QueryOnConnectEvent.MessagesRetrieved, + // (event) => { + // void this.processIncomingMessages(event.detail); + // } + // ); + // } } } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 7336b06df7..412cfa30a5 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -5,8 +5,14 @@ import { TypedEventEmitter } from "@libp2p/interface"; import type { MultiaddrInput } from "@multiformats/multiaddr"; -import { ConnectionManager, createDecoder, createEncoder } from "@waku/core"; -import type { +import { + ConnectionManager, + createDecoder, + createEncoder, + DecodedMessage +} from "@waku/core"; +import { + ContentTopic, CreateDecoderParams, CreateEncoderParams, CreateNodeOptions, @@ -15,20 +21,22 @@ import type { IEncoder, IFilter, ILightPush, + IMessageEmitter, IRelay, IRoutingInfo, IStore, IWaku, IWakuEventEmitter, Libp2p, - NetworkConfig + NetworkConfig, + PubsubTopic } from "@waku/interfaces"; import { DefaultNetworkConfig, HealthStatus, Protocols } from "@waku/interfaces"; -import { createRoutingInfo, Logger } from "@waku/utils"; +import { createRoutingInfo, Logger, pushOrInitMapSet } from "@waku/utils"; import { Filter } from "../filter/index.js"; import { HealthIndicator } from "../health_indicator/index.js"; @@ -54,6 +62,7 @@ export class WakuNode implements IWaku { public lightPush?: ILightPush; public readonly events: IWakuEventEmitter = new TypedEventEmitter(); + public readonly messageEmitter: IMessageEmitter = new TypedEventEmitter(); private readonly networkConfig: NetworkConfig; @@ -134,6 +143,41 @@ export class WakuNode implements IWaku { ); } + public subscribe(contentTopics: ContentTopic[]): void { + // Group decoders by pubsubTopics in case they spread across several shards + const ctToDecoders: Map< + PubsubTopic, + Set> + > = new Map(); + for (const contentTopic of contentTopics) { + const decoder = this.createDecoder({ contentTopic }); + pushOrInitMapSet(ctToDecoders, decoder.pubsubTopic, decoder); + } + + if (this.filter) { + for (const [_, decoders] of ctToDecoders) { + void this.filter + .subscribe( + Array.from(decoders), + this.emitIncomingMessages.bind(this, Array.from(contentTopics)) + ) + .then((_) => { + // TODO: emit irrecoverable errors + }) + .catch((_) => { + // TODO: emit irrecoverable errors + }); + } + + return; + } + + if (this.relay) { + throw "not implemented"; + } + throw "no subscribe protocol available"; + } + public get peerId(): PeerId { return this.libp2p.peerId; } @@ -282,10 +326,26 @@ export class WakuNode implements IWaku { }); } - private createRoutingInfo( + public createRoutingInfo( contentTopic?: string, shardId?: number ): IRoutingInfo { return createRoutingInfo(this.networkConfig, { contentTopic, shardId }); } + + private emitIncomingMessages( + contentTopics: ContentTopic[], + message: { + contentTopic: ContentTopic; + payload: Uint8Array; + } + ): void { + if (contentTopics.includes(message.contentTopic)) { + this.messageEmitter.dispatchEvent( + new CustomEvent(message.contentTopic, { + detail: message.payload + }) + ); + } + } } diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 85de368b23..742c09c092 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -124,14 +124,14 @@ export class ServiceNodesFleet { } class MultipleNodesMessageCollector { - public callback: (msg: IDecodedMessage) => void = () => {}; - protected messageList: Array = []; + public callback: (msg: Partial) => void = () => {}; + protected messageList: Array> = []; public constructor( private messageCollectors: MessageCollector[], private relayNodes?: ServiceNode[], private strictChecking: boolean = false ) { - this.callback = (msg: IDecodedMessage): void => { + this.callback = (msg: Partial): void => { log.info("Got a message"); this.messageList.push(msg); }; @@ -153,7 +153,9 @@ class MultipleNodesMessageCollector { } } - public getMessage(index: number): MessageRpcResponse | IDecodedMessage { + public getMessage( + index: number + ): MessageRpcResponse | Partial { return this.messageList[index]; } diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 9483bc8ba6..cdd22bb621 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -7,10 +7,17 @@ import type { RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; -import { generateSymmetricKey } from "@waku/message-encryption"; +import { + comparePublicKeys, + generatePrivateKey, + generateSymmetricKey, + getPublicKey +} from "@waku/message-encryption"; import { createDecoder, - createEncoder + createEncoder, + SymmetricDecryption, + SymmetricDecryptionResult } from "@waku/message-encryption/symmetric"; import { createRelayNode } from "@waku/relay"; import { @@ -29,8 +36,11 @@ import { makeLogFileName, NOISE_KEY_1, NOISE_KEY_2, + runMultipleNodes, ServiceNode, - tearDownNodes + ServiceNodesFleet, + tearDownNodes, + teardownNodesWithRedundancy } from "../src/index.js"; const TestContentTopic = "/test/1/waku/utf8"; @@ -291,3 +301,148 @@ describe("User Agent", function () { ); }); }); + +describe("Waku API", function () { + describe("WakuNode.subscribe (light node)", function () { + this.timeout(100000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + const messageText = "some message"; + const messagePayload = utf8ToBytes(messageText); + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestRoutingInfo, + undefined + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Subscribe and receive messages on 2 different content topics", async function () { + // Subscribe to the first content topic and send a message. + waku.messageEmitter.addEventListener(TestContentTopic, (event) => { + // TODO: fix the callback type + serviceNodes.messageCollector.callback({ + contentTopic: TestContentTopic, + payload: event.detail + }); + }); + waku.subscribe([TestContentTopic]); + + await waku.lightPush.send(TestEncoder, { payload: messagePayload }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true, + "Waiting for the first message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + + // Modify subscription to include a new content topic and send a message. + const newMessageText = "Filtering still works!"; + const newContentTopic = "/test/2/waku-filter/default"; + const newRoutingInfo = createRoutingInfo(DefaultTestNetworkConfig, { + contentTopic: newContentTopic + }); + const newEncoder = createPlainEncoder({ + contentTopic: newContentTopic, + routingInfo: newRoutingInfo + }); + // subscribe to second content topic + waku.messageEmitter.addEventListener(newContentTopic, (event) => { + // TODO: fix the callback type + serviceNodes.messageCollector.callback({ + contentTopic: TestContentTopic, + payload: event.detail + }); + }); + waku.subscribe([newContentTopic]); + + await waku.lightPush.send(newEncoder, { + payload: utf8ToBytes(newMessageText) + }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true, + "Waiting for the second message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: newContentTopic, + expectedMessageText: newMessageText, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + + // Send another message on the initial content topic to verify it still works. + const thirdMessageText = "Filtering still works on first subscription!"; + const thirdMessagePayload = { payload: utf8ToBytes(thirdMessageText) }; + await waku.lightPush.send(TestEncoder, thirdMessagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + true, + "Waiting for the third message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(2, { + expectedMessageText: thirdMessageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + }); + + it("Subscribe and receive messages encrypted with AES", async function () { + const symKey = generateSymmetricKey(); + const senderPrivKey = generatePrivateKey(); + // TODO: For now, still using encoder + const newEncoder = createEncoder({ + contentTopic: TestContentTopic, + routingInfo: TestRoutingInfo, + symKey, + sigPrivKey: senderPrivKey + }); + + // Setup payload decryption + const symDecryption = new SymmetricDecryption(symKey); + + // subscribe to second content topic + waku.messageEmitter.addEventListener(TestContentTopic, (event) => { + const encryptedPayload = event.detail; + void symDecryption + .decrypt(encryptedPayload) + .then((decryptionResult: SymmetricDecryptionResult | undefined) => { + if (!decryptionResult) return; + serviceNodes.messageCollector.callback({ + contentTopic: TestContentTopic, + payload: decryptionResult.payload + }); + + // TODO: probably best to adapt the message collector + expect(decryptionResult?.signature).to.not.be.undefined; + expect( + comparePublicKeys( + getPublicKey(senderPrivKey), + decryptionResult?.signaturePublicKey + ) + ); + // usually best to ignore decryption failure + }); + }); + waku.subscribe([TestContentTopic]); + + await waku.lightPush.send(newEncoder, { + payload: utf8ToBytes(messageText) + }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true, + "Waiting for the message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: TestContentTopic, + expectedMessageText: messageText, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + }); + }); +});