Skip to content

Commit 593bc45

Browse files
authored
feat: reliable channels search up to 30 days to find message (#2657)
* feat: query on connect stops on predicate * test: query on connect stops at predicate * feat: reliable channels search up to 30 days to find message Queries stop once a valid sync or content message is found in the channel. * fix: protect against decoding exceptions * stop range queries on messages with a causal history
1 parent bbcfc94 commit 593bc45

File tree

5 files changed

+778
-46
lines changed

5 files changed

+778
-46
lines changed

packages/sdk/src/query_on_connect/query_on_connect.spec.ts

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ describe("QueryOnConnect", () => {
9595
it("should create QueryOnConnect instance with all required parameters", () => {
9696
queryOnConnect = new QueryOnConnect(
9797
mockDecoders,
98+
() => false,
9899
mockPeerManagerEventEmitter,
99100
mockWakuEventEmitter,
100101
mockQueryGenerator,
@@ -108,6 +109,7 @@ describe("QueryOnConnect", () => {
108109
it("should create QueryOnConnect instance without options", () => {
109110
queryOnConnect = new QueryOnConnect(
110111
mockDecoders,
112+
() => false,
111113
mockPeerManagerEventEmitter,
112114
mockWakuEventEmitter,
113115
mockQueryGenerator
@@ -120,6 +122,7 @@ describe("QueryOnConnect", () => {
120122
it("should accept empty decoders array", () => {
121123
queryOnConnect = new QueryOnConnect(
122124
[],
125+
() => false,
123126
mockPeerManagerEventEmitter,
124127
mockWakuEventEmitter,
125128
mockQueryGenerator,
@@ -134,6 +137,7 @@ describe("QueryOnConnect", () => {
134137
beforeEach(() => {
135138
queryOnConnect = new QueryOnConnect(
136139
mockDecoders,
140+
() => false,
137141
mockPeerManagerEventEmitter,
138142
mockWakuEventEmitter,
139143
mockQueryGenerator,
@@ -173,6 +177,7 @@ describe("QueryOnConnect", () => {
173177
beforeEach(() => {
174178
queryOnConnect = new QueryOnConnect(
175179
mockDecoders,
180+
() => false,
176181
mockPeerManagerEventEmitter,
177182
mockWakuEventEmitter,
178183
mockQueryGenerator,
@@ -224,6 +229,7 @@ describe("QueryOnConnect", () => {
224229

225230
queryOnConnect = new QueryOnConnect(
226231
mockDecoders,
232+
() => false,
227233
mockPeerManagerEventEmitter,
228234
mockWakuEventEmitter,
229235
mockQueryGenerator,
@@ -276,6 +282,7 @@ describe("QueryOnConnect", () => {
276282

277283
queryOnConnect = new QueryOnConnect(
278284
mockDecoders,
285+
() => false,
279286
mockPeerManagerEventEmitter,
280287
mockWakuEventEmitter,
281288
mockQueryGenerator,
@@ -298,6 +305,7 @@ describe("QueryOnConnect", () => {
298305

299306
queryOnConnect = new QueryOnConnect(
300307
mockDecoders,
308+
() => false,
301309
mockPeerManagerEventEmitter,
302310
mockWakuEventEmitter,
303311
mockQueryGenerator,
@@ -320,6 +328,7 @@ describe("QueryOnConnect", () => {
320328

321329
queryOnConnect = new QueryOnConnect(
322330
mockDecoders,
331+
() => false,
323332
mockPeerManagerEventEmitter,
324333
mockWakuEventEmitter,
325334
mockQueryGenerator,
@@ -391,6 +400,7 @@ describe("QueryOnConnect", () => {
391400

392401
const queryOnConnect = new QueryOnConnect(
393402
mockDecoders,
403+
() => false,
394404
mockPeerManagerEventEmitter,
395405
mockWakuEventEmitter,
396406
mockQueryGenerator,
@@ -418,6 +428,7 @@ describe("QueryOnConnect", () => {
418428

419429
queryOnConnect = new QueryOnConnect(
420430
mockDecoders,
431+
() => false,
421432
mockPeerManagerEventEmitter,
422433
mockWakuEventEmitter,
423434
mockQueryGenerator,
@@ -473,6 +484,7 @@ describe("QueryOnConnect", () => {
473484

474485
queryOnConnect = new QueryOnConnect(
475486
mockDecoders,
487+
() => false,
476488
mockPeerManagerEventEmitter,
477489
mockWakuEventEmitter,
478490
mockQueryGenerator,
@@ -605,6 +617,7 @@ describe("QueryOnConnect", () => {
605617

606618
queryOnConnect = new QueryOnConnect(
607619
mockDecoders,
620+
() => false,
608621
mockPeerManagerEventEmitter,
609622
mockWakuEventEmitter,
610623
mockQueryGenerator,
@@ -750,6 +763,248 @@ describe("QueryOnConnect", () => {
750763
expect(mockQueryGenerator.calledTwice).to.be.true;
751764
});
752765
});
766+
767+
describe("stopIfTrue predicate", () => {
768+
beforeEach(() => {
769+
mockPeerManagerEventEmitter.addEventListener = sinon.stub();
770+
mockWakuEventEmitter.addEventListener = sinon.stub();
771+
});
772+
773+
it("should stop query iteration when stopIfTrue returns true", async () => {
774+
const messages = [
775+
{
776+
hash: new Uint8Array(),
777+
hashStr: "msg1",
778+
version: 1,
779+
timestamp: new Date(),
780+
contentTopic: "/test/1/content",
781+
pubsubTopic: "/waku/2/default-waku/proto",
782+
payload: new Uint8Array([1]),
783+
rateLimitProof: undefined,
784+
ephemeral: false,
785+
meta: undefined
786+
},
787+
{
788+
hash: new Uint8Array(),
789+
hashStr: "stop-hash",
790+
version: 1,
791+
timestamp: new Date(),
792+
contentTopic: "/test/1/content",
793+
pubsubTopic: "/waku/2/default-waku/proto",
794+
payload: new Uint8Array([2]),
795+
rateLimitProof: undefined,
796+
ephemeral: false,
797+
meta: undefined
798+
},
799+
{
800+
hash: new Uint8Array(),
801+
hashStr: "msg3",
802+
version: 1,
803+
timestamp: new Date(),
804+
contentTopic: "/test/1/content",
805+
pubsubTopic: "/waku/2/default-waku/proto",
806+
payload: new Uint8Array([3]),
807+
rateLimitProof: undefined,
808+
ephemeral: false,
809+
meta: undefined
810+
}
811+
];
812+
813+
// Setup generator to yield 3 pages, stop should occur on page 2
814+
const mockAsyncGenerator = async function* (): AsyncGenerator<
815+
Promise<IDecodedMessage | undefined>[]
816+
> {
817+
yield [Promise.resolve(messages[0])];
818+
yield [Promise.resolve(messages[1])];
819+
yield [Promise.resolve(messages[2])];
820+
};
821+
mockQueryGenerator.returns(mockAsyncGenerator());
822+
823+
const stopPredicate = (msg: IDecodedMessage): boolean =>
824+
msg.hashStr === "stop-hash";
825+
826+
queryOnConnect = new QueryOnConnect(
827+
mockDecoders,
828+
stopPredicate,
829+
mockPeerManagerEventEmitter,
830+
mockWakuEventEmitter,
831+
mockQueryGenerator,
832+
options
833+
);
834+
835+
const receivedMessages: IDecodedMessage[] = [];
836+
queryOnConnect.addEventListener(
837+
QueryOnConnectEvent.MessagesRetrieved,
838+
(event: CustomEvent<IDecodedMessage[]>) => {
839+
receivedMessages.push(...event.detail);
840+
}
841+
);
842+
843+
queryOnConnect.start();
844+
await queryOnConnect["maybeQuery"](mockPeerId);
845+
846+
// Should have received messages from first 2 pages only
847+
expect(receivedMessages).to.have.length(2);
848+
expect(receivedMessages[0].hashStr).to.equal("msg1");
849+
expect(receivedMessages[1].hashStr).to.equal("stop-hash");
850+
});
851+
852+
it("should process all pages when stopIfTrue never returns true", async () => {
853+
const messages = [
854+
{
855+
hash: new Uint8Array(),
856+
hashStr: "msg1",
857+
version: 1,
858+
timestamp: new Date(),
859+
contentTopic: "/test/1/content",
860+
pubsubTopic: "/waku/2/default-waku/proto",
861+
payload: new Uint8Array([1]),
862+
rateLimitProof: undefined,
863+
ephemeral: false,
864+
meta: undefined
865+
},
866+
{
867+
hash: new Uint8Array(),
868+
hashStr: "msg2",
869+
version: 1,
870+
timestamp: new Date(),
871+
contentTopic: "/test/1/content",
872+
pubsubTopic: "/waku/2/default-waku/proto",
873+
payload: new Uint8Array([2]),
874+
rateLimitProof: undefined,
875+
ephemeral: false,
876+
meta: undefined
877+
},
878+
{
879+
hash: new Uint8Array(),
880+
hashStr: "msg3",
881+
version: 1,
882+
timestamp: new Date(),
883+
contentTopic: "/test/1/content",
884+
pubsubTopic: "/waku/2/default-waku/proto",
885+
payload: new Uint8Array([3]),
886+
rateLimitProof: undefined,
887+
ephemeral: false,
888+
meta: undefined
889+
}
890+
];
891+
892+
const mockAsyncGenerator = async function* (): AsyncGenerator<
893+
Promise<IDecodedMessage | undefined>[]
894+
> {
895+
yield [Promise.resolve(messages[0])];
896+
yield [Promise.resolve(messages[1])];
897+
yield [Promise.resolve(messages[2])];
898+
};
899+
mockQueryGenerator.returns(mockAsyncGenerator());
900+
901+
const stopPredicate = (): boolean => false;
902+
903+
queryOnConnect = new QueryOnConnect(
904+
mockDecoders,
905+
stopPredicate,
906+
mockPeerManagerEventEmitter,
907+
mockWakuEventEmitter,
908+
mockQueryGenerator,
909+
options
910+
);
911+
912+
const receivedMessages: IDecodedMessage[] = [];
913+
queryOnConnect.addEventListener(
914+
QueryOnConnectEvent.MessagesRetrieved,
915+
(event: CustomEvent<IDecodedMessage[]>) => {
916+
receivedMessages.push(...event.detail);
917+
}
918+
);
919+
920+
queryOnConnect.start();
921+
await queryOnConnect["maybeQuery"](mockPeerId);
922+
923+
// Should have received all 3 messages
924+
expect(receivedMessages).to.have.length(3);
925+
});
926+
927+
it("should stop on first message of a page if stopIfTrue matches", async () => {
928+
const messages = [
929+
{
930+
hash: new Uint8Array(),
931+
hashStr: "stop-hash",
932+
version: 1,
933+
timestamp: new Date(),
934+
contentTopic: "/test/1/content",
935+
pubsubTopic: "/waku/2/default-waku/proto",
936+
payload: new Uint8Array([1]),
937+
rateLimitProof: undefined,
938+
ephemeral: false,
939+
meta: undefined
940+
},
941+
{
942+
hash: new Uint8Array(),
943+
hashStr: "msg2",
944+
version: 1,
945+
timestamp: new Date(),
946+
contentTopic: "/test/1/content",
947+
pubsubTopic: "/waku/2/default-waku/proto",
948+
payload: new Uint8Array([2]),
949+
rateLimitProof: undefined,
950+
ephemeral: false,
951+
meta: undefined
952+
},
953+
{
954+
hash: new Uint8Array(),
955+
hashStr: "msg3",
956+
version: 1,
957+
timestamp: new Date(),
958+
contentTopic: "/test/1/content",
959+
pubsubTopic: "/waku/2/default-waku/proto",
960+
payload: new Uint8Array([3]),
961+
rateLimitProof: undefined,
962+
ephemeral: false,
963+
meta: undefined
964+
}
965+
];
966+
967+
const mockAsyncGenerator = async function* (): AsyncGenerator<
968+
Promise<IDecodedMessage | undefined>[]
969+
> {
970+
yield [
971+
Promise.resolve(messages[0]),
972+
Promise.resolve(messages[1]),
973+
Promise.resolve(messages[2])
974+
];
975+
};
976+
mockQueryGenerator.returns(mockAsyncGenerator());
977+
978+
const stopPredicate = (msg: IDecodedMessage): boolean =>
979+
msg.hashStr === "stop-hash";
980+
981+
queryOnConnect = new QueryOnConnect(
982+
mockDecoders,
983+
stopPredicate,
984+
mockPeerManagerEventEmitter,
985+
mockWakuEventEmitter,
986+
mockQueryGenerator,
987+
options
988+
);
989+
990+
const receivedMessages: IDecodedMessage[] = [];
991+
queryOnConnect.addEventListener(
992+
QueryOnConnectEvent.MessagesRetrieved,
993+
(event: CustomEvent<IDecodedMessage[]>) => {
994+
receivedMessages.push(...event.detail);
995+
}
996+
);
997+
998+
queryOnConnect.start();
999+
await queryOnConnect["maybeQuery"](mockPeerId);
1000+
1001+
// Should have received all 3 messages from the page, even though first matched
1002+
expect(receivedMessages).to.have.length(3);
1003+
expect(receivedMessages[0].hashStr).to.equal("stop-hash");
1004+
expect(receivedMessages[1].hashStr).to.equal("msg2");
1005+
expect(receivedMessages[2].hashStr).to.equal("msg3");
1006+
});
1007+
});
7531008
});
7541009

7551010
describe("calculateTimeRange", () => {

packages/sdk/src/query_on_connect/query_on_connect.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
const log = new Logger("sdk:query-on-connect");
1818

1919
export const DEFAULT_FORCE_QUERY_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
20-
export const MAX_TIME_RANGE_QUERY_MS = 24 * 60 * 60 * 1000; // 24 hours
20+
export const MAX_TIME_RANGE_QUERY_MS = 30 * 24 * 60 * 60 * 1000; // 30 days (queries are split)
2121

2222
export interface QueryOnConnectOptions {
2323
/**
@@ -54,6 +54,7 @@ export class QueryOnConnect<
5454

5555
public constructor(
5656
public decoders: IDecoder<T>[],
57+
public stopIfTrue: (msg: T) => boolean,
5758
private readonly peerManagerEventEmitter: TypedEventEmitter<IPeerManagerEvents>,
5859
private readonly wakuEventEmitter: IWakuEventEmitter,
5960
private readonly _queryGenerator: <T extends IDecodedMessage>(
@@ -125,8 +126,13 @@ export class QueryOnConnect<
125126
const messages = (await Promise.all(page)).filter(
126127
(m) => m !== undefined
127128
);
129+
const stop = messages.some((msg: T) => this.stopIfTrue(msg));
128130
// Bundle the messages to help batch process by sds
129131
this.dispatchMessages(messages);
132+
133+
if (stop) {
134+
break;
135+
}
130136
}
131137

132138
// Didn't throw, so it didn't fail

0 commit comments

Comments
 (0)