From a700352767f9e599cb615216cdd45c99325445a3 Mon Sep 17 00:00:00 2001 From: Gancho Radkov Date: Fri, 31 Oct 2025 11:53:36 +0200 Subject: [PATCH 1/2] chore: waits for result on request, if not received in n seconds, it resubscribes and waits again --- packages/sign-client/src/constants/engine.ts | 4 + .../sign-client/src/controllers/engine.ts | 92 +++++++++++++++++-- .../sign-client/test/canary/canary.spec.ts | 22 ++++- 3 files changed, 106 insertions(+), 12 deletions(-) diff --git a/packages/sign-client/src/constants/engine.ts b/packages/sign-client/src/constants/engine.ts index 33530287b0..e03e6edfa0 100644 --- a/packages/sign-client/src/constants/engine.ts +++ b/packages/sign-client/src/constants/engine.ts @@ -3,6 +3,10 @@ import { EngineTypes } from "@walletconnect/types"; export const ENGINE_CONTEXT = "engine"; +export const MAX_RESUBSCRIBE_ATTEMPTS_ON_PENDING_RESULTS = 3; + +export const RESUBSCRIBE_INTERVAL = 30; + export const ENGINE_RPC_OPTS: EngineTypes.RpcOptsMap = { wc_sessionPropose: { req: { diff --git a/packages/sign-client/src/controllers/engine.ts b/packages/sign-client/src/controllers/engine.ts index de2e879a71..524098276e 100644 --- a/packages/sign-client/src/controllers/engine.ts +++ b/packages/sign-client/src/controllers/engine.ts @@ -26,7 +26,7 @@ import { ErrorResponse, getBigIntRpcId, } from "@walletconnect/jsonrpc-utils"; -import { FIVE_MINUTES, ONE_SECOND, toMiliseconds } from "@walletconnect/time"; +import { FIVE_MINUTES, fromMiliseconds, ONE_SECOND, toMiliseconds } from "@walletconnect/time"; import { EnginePrivate, EngineTypes, @@ -122,7 +122,10 @@ import { ENGINE_QUEUE_STATES, AUTH_PUBLIC_KEY_NAME, TVF_METHODS, + RESUBSCRIBE_INTERVAL, + MAX_RESUBSCRIBE_ATTEMPTS_ON_PENDING_RESULTS, } from "../constants/index.js"; +import { HEARTBEAT_EVENTS } from "@walletconnect/heartbeat"; export class Engine extends IEngine { public name = ENGINE_CONTEXT; @@ -172,6 +175,18 @@ export class Engine extends IEngine { } > = new Map(); + // most flows depend on sending a request and receiving a result from the peer sign-client sdk + // this map is used to store the messages that are waiting for a result + // the key is the clientRpcId of the request, the value is the topic of the request + private requestWaitingForResult: Map< + number, + { + topic: string; + waitingSince: number; + resubscribeAttempts?: number; + } + > = new Map(); + constructor(client: IEngine["client"]) { super(client); } @@ -179,6 +194,7 @@ export class Engine extends IEngine { public init: IEngine["init"] = async () => { if (!this.initialized) { await this.cleanup(); + this.registerHeartbeatEvents(); this.registerRelayerEvents(); this.registerExpirerEvents(); this.registerPairingEvents(); @@ -365,7 +381,7 @@ export class Engine extends IEngine { }); await this.setProposal(proposal.id, proposal); - + this.requestWaitingForResult.set(proposal.id, { topic, waitingSince: Date.now() }); await this.sendProposeSession({ proposal, publishOpts: { @@ -378,6 +394,7 @@ export class Engine extends IEngine { }, }).catch((error) => { this.deleteProposal(proposal.id); + this.requestWaitingForResult.delete(proposal.id); throw error; }); @@ -741,6 +758,8 @@ export class Engine extends IEngine { chainId, }; + this.requestWaitingForResult.set(clientRpcId, { topic, waitingSince: Date.now() }); + return await Promise.all([ new Promise(async (resolve) => { await this.sendRequest({ @@ -752,7 +771,10 @@ export class Engine extends IEngine { expiry, throwOnFailedPublish: true, tvf: this.getTVFParams(clientRpcId, protocolRequestParams), - }).catch((error) => reject(error)); + }).catch((error) => { + this.requestWaitingForResult.delete(clientRpcId); + reject(error); + }); this.client.events.emit("session_request_sent", { topic, request, @@ -847,13 +869,20 @@ export class Engine extends IEngine { else resolve(); }); await Promise.all([ - this.sendRequest({ - topic, - method: "wc_sessionPing", - params: {}, - throwOnFailedPublish: true, - clientRpcId, - relayRpcId, + new Promise(async (resolve) => { + this.requestWaitingForResult.set(clientRpcId, { topic, waitingSince: Date.now() }); + await this.sendRequest({ + topic, + method: "wc_sessionPing", + params: {}, + throwOnFailedPublish: true, + clientRpcId, + relayRpcId, + }).catch((error) => { + this.requestWaitingForResult.delete(clientRpcId); + reject(error); + }); + resolve(); }), done(), ]); @@ -1884,6 +1913,47 @@ export class Engine extends IEngine { await this.client.core.relayer.confirmOnlineStateOrThrow(); } + // ---------- Heartbeat Event ----------------------------------- // + + /** + * This function is used to register the heartbeat events for the engine + * It checks if any requests are still waiting for a result and resubscribes to the topic if needed + * It also logs a warning if the request is still waiting for a result after the maximum number of attempts + * It also logs a debug message if the request is resubscribed to the topic + * It also logs a warning if the request is still waiting for a result after the maximum number of attempts + */ + private registerHeartbeatEvents() { + this.client.core.heartbeat.on(HEARTBEAT_EVENTS.pulse, async () => { + for (const [clientRpcId, request] of this.requestWaitingForResult.entries()) { + const attempts = (request.resubscribeAttempts || 0) + 1; + if (fromMiliseconds(Date.now() - request.waitingSince) > attempts * RESUBSCRIBE_INTERVAL) { + if (attempts <= MAX_RESUBSCRIBE_ATTEMPTS_ON_PENDING_RESULTS) { + this.client.logger.debug( + `Resubscribing to topic ${request.topic} for request ${clientRpcId}, attempt ${attempts}`, + ); + try { + this.requestWaitingForResult.set(clientRpcId, { + ...request, + resubscribeAttempts: attempts, + }); + await this.client.core.relayer.subscribe(request.topic, { + internal: { throwOnFailedPublish: true }, + }); + } catch (error) { + this.client.logger.warn( + `Failed to resubscribe to topic ${request.topic} for request ${clientRpcId}, attempt ${attempts}`, + ); + } + } else { + this.client.logger.warn( + `Request ${clientRpcId} still hasn't received a result after ${attempts * RESUBSCRIBE_INTERVAL}s, giving up`, + ); + } + } + } + }); + } + // ---------- Relay Events Router ----------------------------------- // private registerRelayerEvents() { @@ -2020,6 +2090,8 @@ export class Engine extends IEngine { const record = await this.client.core.history.get(topic, payload.id); const resMethod = record.request.method as JsonRpcTypes.WcMethod; + this.requestWaitingForResult.delete(payload.id); + switch (resMethod) { case "wc_sessionPropose": return this.onSessionProposeResponse(topic, payload, transportType); diff --git a/packages/sign-client/test/canary/canary.spec.ts b/packages/sign-client/test/canary/canary.spec.ts index 1ee6da9dec..828b42424b 100644 --- a/packages/sign-client/test/canary/canary.spec.ts +++ b/packages/sign-client/test/canary/canary.spec.ts @@ -16,13 +16,14 @@ import { describe, it, expect, afterEach } from "vitest"; import { SignClient } from "../../src"; import { formatJsonRpcResult } from "@walletconnect/jsonrpc-utils"; import { RELAYER_EVENTS } from "@walletconnect/core"; +import { ISignClient } from "@walletconnect/types"; const environment = process.env.ENVIRONMENT || "dev"; const region = process.env.REGION || "unknown"; const logger = process.env.LOGGER || "error"; -const log = (log: string) => { +const log = (log: string, level = "log") => { // eslint-disable-next-line no-console - console.log(log); + console[level](log); }; describe("Canary", () => { @@ -221,6 +222,23 @@ describe("Canary", () => { await publishToStatusPage(latencyMs); } + // @ts-expect-error- private property + const clientARequestsWaitingForResult = A.engine.requestWaitingForResult?.values() || []; + // @ts-expect-error- private property + const clientBRequestsWaitingForResult = B.engine.requestWaitingForResult?.values() || []; + if (clientARequestsWaitingForResult.length > 0) { + log( + `Client A (${await A.core.crypto.getClientId()}) has ${clientARequestsWaitingForResult.length} requests waiting for result, ${JSON.stringify(clientARequestsWaitingForResult)}`, + "error", + ); + } + if (clientBRequestsWaitingForResult.length > 0) { + log( + `Client B (${await B.core.crypto.getClientId()}) has ${clientBRequestsWaitingForResult.length} requests waiting for result, ${JSON.stringify(clientBRequestsWaitingForResult)}`, + "error", + ); + } + const clientDisconnect = new Promise((resolve, reject) => { try { clients.B.on("session_delete", (event: any) => { From d8ac7366107ed8da22964a66d58122ac9c54f3a4 Mon Sep 17 00:00:00 2001 From: Gancho Radkov Date: Fri, 31 Oct 2025 12:02:28 +0200 Subject: [PATCH 2/2] fix: array from --- packages/sign-client/test/canary/canary.spec.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/sign-client/test/canary/canary.spec.ts b/packages/sign-client/test/canary/canary.spec.ts index 828b42424b..8e8c554a9a 100644 --- a/packages/sign-client/test/canary/canary.spec.ts +++ b/packages/sign-client/test/canary/canary.spec.ts @@ -222,10 +222,14 @@ describe("Canary", () => { await publishToStatusPage(latencyMs); } - // @ts-expect-error- private property - const clientARequestsWaitingForResult = A.engine.requestWaitingForResult?.values() || []; - // @ts-expect-error- private property - const clientBRequestsWaitingForResult = B.engine.requestWaitingForResult?.values() || []; + const clientARequestsWaitingForResult = Array.from( + // @ts-expect-error- private property + A.engine.requestWaitingForResult?.values() || [], + ); + const clientBRequestsWaitingForResult = Array.from( + // @ts-expect-error- private property + B.engine.requestWaitingForResult?.values() || [], + ); if (clientARequestsWaitingForResult.length > 0) { log( `Client A (${await A.core.crypto.getClientId()}) has ${clientARequestsWaitingForResult.length} requests waiting for result, ${JSON.stringify(clientARequestsWaitingForResult)}`,