Skip to content

Commit 645b543

Browse files
committed
fix(store): fix validation to allow store query by hash
1 parent f85a476 commit 645b543

File tree

7 files changed

+141
-31
lines changed

7 files changed

+141
-31
lines changed

package-lock.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/src/lib/store/rpc.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export class StoreQueryRequest {
1414
public static create(params: QueryRequestParams): StoreQueryRequest {
1515
const request = new StoreQueryRequest({
1616
...params,
17+
contentTopics: params.contentTopics || [],
1718
requestId: uuid(),
1819
timeStart: params.timeStart
1920
? BigInt(params.timeStart.getTime() * ONE_MILLION)
@@ -27,20 +28,28 @@ export class StoreQueryRequest {
2728
: undefined
2829
});
2930

30-
// Validate request parameters based on RFC
31-
if (
32-
(params.pubsubTopic && !params.contentTopics) ||
33-
(!params.pubsubTopic && params.contentTopics)
34-
) {
35-
throw new Error(
36-
"Both pubsubTopic and contentTopics must be set or unset"
37-
);
31+
// Validate that when not using messageHashes, pubsubTopic and contentTopics are consistent
32+
if (!params.messageHashes || params.messageHashes.length === 0) {
33+
// If we're doing a content-based query (not message hash), both pubsubTopic and contentTopics
34+
// should be set or unset together (as per the RFC)
35+
if (
36+
(params.pubsubTopic && !params.contentTopics) ||
37+
(!params.pubsubTopic &&
38+
params.contentTopics &&
39+
params.contentTopics.length > 0)
40+
) {
41+
throw new Error(
42+
"Both pubsubTopic and contentTopics must be set or unset for content-based queries"
43+
);
44+
}
3845
}
3946

47+
// If messageHashes is provided, no content filters should be included
4048
if (
4149
params.messageHashes &&
50+
params.messageHashes.length > 0 &&
4251
(params.pubsubTopic ||
43-
params.contentTopics ||
52+
(params.contentTopics && params.contentTopics.length > 0) ||
4453
params.timeStart ||
4554
params.timeEnd)
4655
) {

packages/core/src/lib/store/store.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
4141
peerId: PeerId
4242
): AsyncGenerator<Promise<T | undefined>[]> {
4343
if (
44+
queryOpts.contentTopics &&
4445
queryOpts.contentTopics.toString() !==
45-
Array.from(decoders.keys()).toString()
46+
Array.from(decoders.keys()).toString()
4647
) {
4748
throw new Error(
4849
"Internal error, the decoders should match the query's content topics"

packages/sdk/src/store/store.ts

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,40 @@ export class Store implements IStore {
5858
decoders: IDecoder<T>[],
5959
options?: Partial<QueryRequestParams>
6060
): AsyncGenerator<Promise<T | undefined>[]> {
61-
const { pubsubTopic, contentTopics, decodersAsMap } =
62-
this.validateDecodersAndPubsubTopic(decoders);
63-
64-
const queryOpts = {
65-
pubsubTopic,
66-
contentTopics,
67-
includeData: true,
68-
paginationForward: true,
69-
...options
70-
};
61+
let queryOpts: QueryRequestParams;
62+
let decodersAsMap: Map<string, IDecoder<T>> = new Map();
63+
if (options?.messageHashes) {
64+
// For message hash queries, we need a dummy pubsubTopic to satisfy the type
65+
// This value will not be used for filtering since messageHashes take precedence
66+
const dummyPubsubTopic =
67+
options.pubsubTopic ||
68+
this.protocol.pubsubTopics[0] ||
69+
"/waku/2/default-waku/proto";
70+
71+
queryOpts = {
72+
...options,
73+
pubsubTopic: dummyPubsubTopic,
74+
contentTopics: [],
75+
includeData: true,
76+
paginationForward: true
77+
};
78+
} else {
79+
const {
80+
pubsubTopic,
81+
contentTopics,
82+
decodersAsMap: _decodersAsMap
83+
} = this.validateDecodersAndPubsubTopic(decoders);
84+
85+
decodersAsMap = _decodersAsMap;
86+
87+
queryOpts = {
88+
pubsubTopic,
89+
contentTopics,
90+
includeData: true,
91+
paginationForward: true,
92+
...options
93+
};
94+
}
7195

7296
const peer = await this.getPeerToUse();
7397

packages/tests/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
"@waku/core": "*",
5656
"@waku/enr": "*",
5757
"@waku/interfaces": "*",
58+
"@waku/message-hash": "^0.1.17",
5859
"@waku/utils": "*",
5960
"app-root-path": "^3.1.0",
6061
"chai-as-promised": "^7.1.1",
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { DecodedMessage } from "@waku/core";
2+
import type { LightNode } from "@waku/interfaces";
3+
import { messageHash } from "@waku/message-hash";
4+
import { assert } from "chai";
5+
6+
import {
7+
afterEachCustom,
8+
beforeEachCustom,
9+
ServiceNode,
10+
tearDownNodes
11+
} from "../../src/index.js";
12+
13+
import {
14+
runStoreNodes,
15+
sendMessages,
16+
TestDecoder,
17+
TestShardInfo,
18+
totalMsgs
19+
} from "./utils.js";
20+
21+
describe("Waku Store, message hash query", function () {
22+
this.timeout(15000);
23+
let waku: LightNode;
24+
let nwaku: ServiceNode;
25+
26+
beforeEachCustom(this, async () => {
27+
[nwaku, waku] = await runStoreNodes(this.ctx, TestShardInfo);
28+
});
29+
30+
afterEachCustom(this, async () => {
31+
await tearDownNodes(nwaku, [waku]);
32+
});
33+
34+
it("can query messages by message hash", async function () {
35+
// Send messages first
36+
await sendMessages(
37+
nwaku,
38+
totalMsgs,
39+
TestDecoder.contentTopic,
40+
TestDecoder.pubsubTopic
41+
);
42+
43+
// Generate message hashes for the test
44+
const messageHashes: Uint8Array[] = [];
45+
46+
// Create message hashes for all numbers from 0 to totalMsgs-1, matching the payload pattern in sendMessages
47+
for (let i = 0; i < totalMsgs; i++) {
48+
// Using type assertion to handle type mismatch
49+
messageHashes.push(
50+
messageHash(TestDecoder.pubsubTopic, {
51+
payload: new Uint8Array([i]) as any,
52+
contentTopic: TestDecoder.contentTopic,
53+
version: undefined,
54+
timestamp: undefined,
55+
meta: undefined,
56+
rateLimitProof: undefined,
57+
ephemeral: undefined
58+
})
59+
);
60+
}
61+
62+
// Query messages by hash
63+
const messages: DecodedMessage[] = [];
64+
for await (const page of waku.store.queryGenerator([], {
65+
messageHashes
66+
})) {
67+
for await (const msg of page) {
68+
messages.push(msg as DecodedMessage);
69+
}
70+
}
71+
assert.equal(messages.length, messageHashes.length);
72+
});
73+
});

packages/tests/tests/store/utils.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { expect } from "chai";
1717
import { Context } from "mocha";
1818

1919
import { delay, NOISE_KEY_1, runNodes, ServiceNode } from "../../src/index.js";
20+
import { MessageRpcQuery } from "../../src/types.js";
2021

2122
export const log = new Logger("test:store");
2223

@@ -49,20 +50,20 @@ export async function sendMessages(
4950
instance: ServiceNode,
5051
numMessages: number,
5152
contentTopic: string,
52-
pubsubTopic: string
53-
): Promise<void> {
53+
pubsubTopic: string,
54+
timestamp: boolean = false
55+
): Promise<MessageRpcQuery[]> {
56+
const messages: MessageRpcQuery[] = new Array<MessageRpcQuery>(numMessages);
5457
for (let i = 0; i < numMessages; i++) {
55-
expect(
56-
await instance.sendMessage(
57-
ServiceNode.toMessageRpcQuery({
58-
payload: new Uint8Array([i]),
59-
contentTopic: contentTopic
60-
}),
61-
pubsubTopic
62-
)
63-
).to.eq(true);
58+
messages[i] = ServiceNode.toMessageRpcQuery({
59+
payload: new Uint8Array([i]),
60+
contentTopic: contentTopic,
61+
timestamp: timestamp ? new Date() : undefined
62+
});
63+
expect(await instance.sendMessage(messages[i], pubsubTopic)).to.eq(true);
6464
await delay(1); // to ensure each timestamp is unique.
6565
}
66+
return messages;
6667
}
6768

6869
export async function sendMessagesAutosharding(

0 commit comments

Comments
 (0)