Skip to content

Commit 788f7e6

Browse files
feat: incorporate sds-r into reliable channels (#2701)
* wip * feat: integrate sds-r with message channels * fix: fix implementation guide, remove unrelated claude file * feat: integrate sds-r within reliable channels SDK * fix: fix import, export * fix: fix build errors, simplify parallel operation * fix: sigh. this file has 9 lives * fix: simplify more * fix: disable repair if not part of retrieval strategy * fix: remove dead code, simplify * fix: improve repair loop Co-authored-by: fryorcraken <[email protected]> * chore: make retrievalStrategy mandatory argument * chore: add repair multiplier, safer checks --------- Co-authored-by: fryorcraken <[email protected]> Co-authored-by: fryorcraken <[email protected]>
1 parent e5f51d7 commit 788f7e6

File tree

4 files changed

+110
-48
lines changed

4 files changed

+110
-48
lines changed

packages/sdk/src/reliable_channel/reliable_channel.ts

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import {
1919
MessageChannelEvent,
2020
MessageChannelEvents,
2121
type MessageChannelOptions,
22+
type ParticipantId,
2223
Message as SdsMessage,
23-
type SenderId,
2424
SyncMessage
2525
} from "@waku/sds";
2626
import { Logger } from "@waku/utils";
@@ -39,9 +39,11 @@ import { ISyncStatusEvents, SyncStatus } from "./sync_status.js";
3939
const log = new Logger("sdk:reliable-channel");
4040

4141
const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds
42+
const SYNC_INTERVAL_REPAIR_MULTIPLIER = 0.3; // Reduce sync interval when repairs pending
4243
const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds
4344
const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
4445
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
46+
const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds
4547
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;
4648

4749
const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
@@ -51,6 +53,15 @@ const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
5153
LightPushError.RLN_PROOF_GENERATION
5254
];
5355

56+
/**
57+
* Strategy for retrieving missing messages.
58+
* - 'both': Use SDS-R peer repair and Store queries in parallel (default)
59+
* - 'sds-r-only': Only use SDS-R peer repair
60+
* - 'store-only': Only use Store queries (legacy behavior)
61+
* - 'none': No automatic retrieval
62+
*/
63+
export type RetrievalStrategy = "both" | "sds-r-only" | "store-only" | "none";
64+
5465
export type ReliableChannelOptions = MessageChannelOptions & {
5566
/**
5667
* The minimum interval between 2 sync messages in the channel.
@@ -81,6 +92,7 @@ export type ReliableChannelOptions = MessageChannelOptions & {
8192

8293
/**
8394
* How often store queries are done to retrieve missing messages.
95+
* Only applies when retrievalStrategy includes Store ('both' or 'store-only').
8496
*
8597
* @default 10,000 (10 seconds)
8698
*/
@@ -114,6 +126,13 @@ export type ReliableChannelOptions = MessageChannelOptions & {
114126
* @default 1000 (1 second)
115127
*/
116128
processTaskMinElapseMs?: number;
129+
130+
/**
131+
* Strategy for retrieving missing messages.
132+
*
133+
* @default 'both'
134+
*/
135+
retrievalStrategy?: RetrievalStrategy;
117136
};
118137

119138
/**
@@ -152,6 +171,7 @@ export class ReliableChannel<
152171
private syncRandomTimeout: RandomTimeout;
153172
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
154173
private readonly sweepInBufIntervalMs: number;
174+
private sweepRepairInterval: ReturnType<typeof setInterval> | undefined;
155175
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
156176
private readonly retryManager: RetryManager | undefined;
157177
private readonly missingMessageRetriever?: MissingMessageRetriever<T>;
@@ -165,6 +185,7 @@ export class ReliableChannel<
165185
public messageChannel: MessageChannel,
166186
private encoder: IEncoder,
167187
private decoder: IDecoder<T>,
188+
private retrievalStrategy: RetrievalStrategy,
168189
options?: ReliableChannelOptions
169190
) {
170191
super();
@@ -226,7 +247,8 @@ export class ReliableChannel<
226247
this.processTaskMinElapseMs =
227248
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;
228249

229-
if (this._retrieve) {
250+
// Only enable Store retrieval based on strategy
251+
if (this._retrieve && this.shouldUseStore()) {
230252
this.missingMessageRetriever = new MissingMessageRetriever(
231253
this.decoder,
232254
options?.retrieveFrequencyMs,
@@ -290,17 +312,26 @@ export class ReliableChannel<
290312
public static async create<T extends IDecodedMessage>(
291313
node: IWaku,
292314
channelId: ChannelId,
293-
senderId: SenderId,
315+
senderId: ParticipantId,
294316
encoder: IEncoder,
295317
decoder: IDecoder<T>,
296318
options?: ReliableChannelOptions
297319
): Promise<ReliableChannel<T>> {
298-
const sdsMessageChannel = new MessageChannel(channelId, senderId, options);
320+
// Enable SDS-R repair only if retrieval strategy uses it
321+
const retrievalStrategy = options?.retrievalStrategy ?? "both";
322+
const enableRepair =
323+
retrievalStrategy === "both" || retrievalStrategy === "sds-r-only";
324+
325+
const sdsMessageChannel = new MessageChannel(channelId, senderId, {
326+
...options,
327+
enableRepair
328+
});
299329
const messageChannel = new ReliableChannel(
300330
node,
301331
sdsMessageChannel,
302332
encoder,
303333
decoder,
334+
retrievalStrategy,
304335
options
305336
);
306337

@@ -455,6 +486,7 @@ export class ReliableChannel<
455486
// missing messages or the status of previous outgoing messages
456487
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
457488

489+
// Remove from Store retriever if message was retrieved
458490
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);
459491

460492
if (sdsMessage.content && sdsMessage.content.length > 0) {
@@ -528,6 +560,9 @@ export class ReliableChannel<
528560
this.setupEventListeners();
529561
this.restartSync();
530562
this.startSweepIncomingBufferLoop();
563+
564+
this.startRepairSweepLoop();
565+
531566
if (this._retrieve) {
532567
this.missingMessageRetriever?.start();
533568
this.queryOnConnect?.start();
@@ -544,6 +579,7 @@ export class ReliableChannel<
544579
this.removeAllEventListeners();
545580
this.stopSync();
546581
this.stopSweepIncomingBufferLoop();
582+
this.stopRepairSweepLoop();
547583
this.clearProcessTasks();
548584

549585
if (this.activePendingProcessTask) {
@@ -582,8 +618,55 @@ export class ReliableChannel<
582618
}
583619
}
584620

621+
private startRepairSweepLoop(): void {
622+
if (!this.shouldUseSdsR()) {
623+
return;
624+
}
625+
this.stopRepairSweepLoop();
626+
this.sweepRepairInterval = setInterval(() => {
627+
void this.messageChannel
628+
.sweepRepairIncomingBuffer(async (message) => {
629+
// Rebroadcast the repair message
630+
const wakuMessage = { payload: message.encode() };
631+
const result = await this._send(this.encoder, wakuMessage);
632+
return result.failures.length === 0;
633+
})
634+
.catch((err) => {
635+
log.error("error encountered when sweeping repair buffer", err);
636+
});
637+
}, DEFAULT_SWEEP_REPAIR_INTERVAL_MS);
638+
}
639+
640+
private stopRepairSweepLoop(): void {
641+
if (this.sweepRepairInterval) {
642+
clearInterval(this.sweepRepairInterval);
643+
this.sweepInBufInterval = undefined;
644+
}
645+
}
646+
647+
private shouldUseStore(): boolean {
648+
return (
649+
this.retrievalStrategy === "both" ||
650+
this.retrievalStrategy === "store-only"
651+
);
652+
}
653+
654+
private shouldUseSdsR(): boolean {
655+
return (
656+
this.retrievalStrategy === "both" ||
657+
this.retrievalStrategy === "sds-r-only"
658+
);
659+
}
660+
585661
private restartSync(multiplier: number = 1): void {
586-
this.syncRandomTimeout.restart(multiplier);
662+
// Adaptive sync: use shorter interval when repairs are pending
663+
const hasPendingRepairs =
664+
this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests();
665+
const effectiveMultiplier = hasPendingRepairs
666+
? multiplier * SYNC_INTERVAL_REPAIR_MULTIPLIER
667+
: multiplier;
668+
669+
this.syncRandomTimeout.restart(effectiveMultiplier);
587670
}
588671

589672
private stopSync(): void {
@@ -731,6 +814,8 @@ export class ReliableChannel<
731814
);
732815

733816
for (const { messageId, retrievalHint } of event.detail) {
817+
// Store retrieval (for 'both' and 'store-only' strategies)
818+
// SDS-R repair happens automatically via RepairManager for 'both' and 'sds-r-only'
734819
if (retrievalHint && this.missingMessageRetriever) {
735820
this.missingMessageRetriever.addMissingMessage(
736821
messageId,

packages/sds/src/message_channel/events.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ export enum MessageChannelEvent {
1212
InMessageLost = "sds:in:message-irretrievably-lost",
1313
ErrorTask = "sds:error-task",
1414
// SDS-R Repair Events
15-
RepairRequestQueued = "sds:repair:request-queued",
1615
RepairRequestSent = "sds:repair:request-sent",
1716
RepairRequestReceived = "sds:repair:request-received",
18-
RepairResponseQueued = "sds:repair:response-queued",
1917
RepairResponseSent = "sds:repair:response-sent"
2018
}
2119

@@ -33,10 +31,6 @@ export type MessageChannelEvents = {
3331
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
3432
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
3533
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
36-
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
37-
messageId: MessageId;
38-
tReq: number;
39-
}>;
4034
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
4135
messageIds: MessageId[];
4236
carrierMessageId: MessageId;
@@ -45,10 +39,6 @@ export type MessageChannelEvents = {
4539
messageIds: MessageId[];
4640
fromSenderId?: ParticipantId;
4741
}>;
48-
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
49-
messageId: MessageId;
50-
tResp: number;
51-
}>;
5242
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
5343
messageId: MessageId;
5444
}>;

packages/sds/src/message_channel/message_channel.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
128128

129129
// Only construct RepairManager if repair is enabled (default: true)
130130
if (options.enableRepair ?? true) {
131-
this.repairManager = new RepairManager(
132-
senderId,
133-
options.repairConfig,
134-
(event: string, detail: unknown) => {
135-
this.safeSendEvent(event as MessageChannelEvent, { detail });
136-
}
137-
);
131+
this.repairManager = new RepairManager(senderId, options.repairConfig);
138132
}
139133
}
140134

141135
public static getMessageId(payload: Uint8Array): MessageId {
142136
return bytesToHex(sha256(payload));
143137
}
144138

139+
/**
140+
* Check if there are pending repair requests that need to be sent.
141+
* Useful for adaptive sync intervals - increase frequency when repairs pending.
142+
*/
143+
public hasPendingRepairRequests(currentTime = Date.now()): boolean {
144+
return this.repairManager?.hasRequestsReady(currentTime) ?? false;
145+
}
146+
145147
/**
146148
* Processes all queued tasks sequentially to ensure proper message ordering.
147149
*

packages/sds/src/message_channel/repair/repair.ts

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,6 @@ const log = new Logger("sds:repair:manager");
2020
*/
2121
const PARTICIPANTS_PER_RESPONSE_GROUP = 128;
2222

23-
/**
24-
* Event emitter callback for repair events
25-
*/
26-
export type RepairEventEmitter = (event: string, detail: unknown) => void;
27-
2823
/**
2924
* Configuration for SDS-R repair protocol
3025
*/
@@ -58,16 +53,10 @@ export class RepairManager {
5853
private readonly config: Required<RepairConfig>;
5954
private readonly outgoingBuffer: OutgoingRepairBuffer;
6055
private readonly incomingBuffer: IncomingRepairBuffer;
61-
private readonly eventEmitter?: RepairEventEmitter;
6256

63-
public constructor(
64-
participantId: ParticipantId,
65-
config: RepairConfig = {},
66-
eventEmitter?: RepairEventEmitter
67-
) {
57+
public constructor(participantId: ParticipantId, config: RepairConfig = {}) {
6858
this.participantId = participantId;
6959
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
70-
this.eventEmitter = eventEmitter;
7160

7261
this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
7362
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
@@ -142,19 +131,13 @@ export class RepairManager {
142131
// Calculate when to request this repair
143132
const tReq = this.calculateTReq(entry.messageId, currentTime);
144133

145-
// Add to outgoing buffer - only log and emit event if actually added
134+
// Add to outgoing buffer - only log if actually added
146135
const wasAdded = this.outgoingBuffer.add(entry, tReq);
147136

148137
if (wasAdded) {
149138
log.info(
150139
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
151140
);
152-
153-
// Emit event
154-
this.eventEmitter?.("RepairRequestQueued", {
155-
messageId: entry.messageId,
156-
tReq
157-
});
158141
}
159142
}
160143
}
@@ -238,19 +221,13 @@ export class RepairManager {
238221
currentTime
239222
);
240223

241-
// Add to incoming buffer - only log and emit event if actually added
224+
// Add to incoming buffer - only log if actually added
242225
const wasAdded = this.incomingBuffer.add(request, tResp);
243226

244227
if (wasAdded) {
245228
log.info(
246229
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
247230
);
248-
249-
// Emit event
250-
this.eventEmitter?.("RepairResponseQueued", {
251-
messageId: request.messageId,
252-
tResp
253-
});
254231
}
255232
}
256233
}
@@ -328,4 +305,12 @@ export class RepairManager {
328305
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
329306
);
330307
}
308+
309+
/**
310+
* Check if there are repair requests ready to be sent
311+
*/
312+
public hasRequestsReady(currentTime = Date.now()): boolean {
313+
const items = this.outgoingBuffer.getItems();
314+
return items.length > 0 && items[0].tReq <= currentTime;
315+
}
331316
}

0 commit comments

Comments
 (0)