diff --git a/app/core/BackgroundBridge/BackgroundBridge.js b/app/core/BackgroundBridge/BackgroundBridge.js index 37cfb114843..f3b926ede01 100644 --- a/app/core/BackgroundBridge/BackgroundBridge.js +++ b/app/core/BackgroundBridge/BackgroundBridge.js @@ -99,6 +99,7 @@ import { isRelaySupported } from '../../util/transactions/transaction-relay'; import { selectSmartTransactionsEnabled } from '../../selectors/smartTransactionsController'; import { AccountTreeController } from '@metamask/account-tree-controller'; import { createTrustSignalsMiddleware } from '../RPCMethods/TrustSignalsMiddleware'; +import createDupeReqFilterStream from './createDupeReqFilterStream'; const legacyNetworkId = () => { const { networksMetadata, selectedNetworkClientId } = @@ -551,7 +552,9 @@ export class BackgroundBridge extends EventEmitter { // setup connection const providerStream = createEngineStream({ engine: this.engine }); - pump(outStream, providerStream, outStream, (err) => { + const filterStream = createDupeReqFilterStream(); + + pump(outStream, filterStream, providerStream, outStream, (err) => { // handle any middleware cleanup this.engine.destroy(); if (err) Logger.log('Error with provider stream conn', err); @@ -582,7 +585,9 @@ export class BackgroundBridge extends EventEmitter { this.notifyTronAccountChangedForCurrentAccount(); ///: END:ONLY_INCLUDE_IF - pump(outStream, providerStream, outStream, (err) => { + const filterStream = createDupeReqFilterStream(); + + pump(outStream, filterStream, providerStream, outStream, (err) => { // handle any middleware cleanup this.multichainEngine.destroy(); if (err) Logger.log('Error with provider stream conn', err); diff --git a/app/core/BackgroundBridge/createDupeReqFilterStream.test.ts b/app/core/BackgroundBridge/createDupeReqFilterStream.test.ts new file mode 100644 index 00000000000..a6c40c0a16b --- /dev/null +++ b/app/core/BackgroundBridge/createDupeReqFilterStream.test.ts @@ -0,0 +1,324 @@ +import { Transform } from 'readable-stream'; + +import type { JsonRpcNotification, JsonRpcRequest } from '@metamask/utils'; +import createDupeReqFilterStream, { + THREE_MINUTES, +} from './createDupeReqFilterStream'; + +function createTestStream(output: JsonRpcRequest[] = [], S = Transform) { + const transformStream = createDupeReqFilterStream(); + const testOutStream = new S({ + transform: (chunk: JsonRpcRequest, _, cb) => { + output.push(chunk); + cb(); + }, + objectMode: true, + }); + + transformStream.pipe(testOutStream); + + return transformStream; +} + +function runStreamTest( + requests: (JsonRpcRequest | JsonRpcNotification)[] = [], + advanceTimersTime = 10, + S = Transform, +) { + return new Promise((resolve, reject) => { + const output: JsonRpcRequest[] = []; + const testStream = createTestStream(output, S); + + testStream + .on('finish', () => resolve(output)) + .on('error', (err) => reject(err)); + + requests.forEach((request) => testStream.write(request)); + testStream.end(); + + jest.advanceTimersByTime(advanceTimersTime); + }); +} + +describe('createDupeReqFilterStream', () => { + beforeEach(() => { + jest.useFakeTimers({ now: 10 }); + }); + + it('lets through requests with ids being seen for the first time', async () => { + const requests = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + ].map((request) => ({ ...request, jsonrpc: '2.0' as const })); + + const expectedOutput = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + ].map((output) => ({ ...output, jsonrpc: '2.0' })); + + const output = await runStreamTest(requests); + expect(output).toEqual(expectedOutput); + }); + + it('does not let through the request if the id has been seen before', async () => { + const requests = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, // duplicate + ].map((request) => ({ ...request, jsonrpc: '2.0' as const })); + + const expectedOutput = [{ id: 1, method: 'foo' }].map((output) => ({ + ...output, + jsonrpc: '2.0', + })); + + const output = await runStreamTest(requests); + expect(output).toEqual(expectedOutput); + }); + + it("lets through requests if they don't have an id", async () => { + const requests = [{ method: 'notify1' }, { method: 'notify2' }].map( + (request) => ({ ...request, jsonrpc: '2.0' as const }), + ); + + const expectedOutput = [{ method: 'notify1' }, { method: 'notify2' }].map( + (output) => ({ ...output, jsonrpc: '2.0' }), + ); + + const output = await runStreamTest(requests); + expect(output).toEqual(expectedOutput); + }); + + it('handles a mix of request types', async () => { + const requests = [ + { id: 1, method: 'foo' }, + { method: 'notify1' }, + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + { method: 'notify2' }, + { id: 2, method: 'bar' }, + { id: 3, method: 'baz' }, + ].map((request) => ({ ...request, jsonrpc: '2.0' as const })); + + const expectedOutput = [ + { id: 1, method: 'foo' }, + { method: 'notify1' }, + { id: 2, method: 'bar' }, + { method: 'notify2' }, + { id: 3, method: 'baz' }, + ].map((output) => ({ ...output, jsonrpc: '2.0' })); + + const output = await runStreamTest(requests); + expect(output).toEqual(expectedOutput); + }); + + it('expires single id after three minutes', () => { + const output: JsonRpcRequest[] = []; + const testStream = createTestStream(output); + + const requests1 = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + const expectedOutputBeforeExpiryTime = [{ id: 1, method: 'foo' }]; + + requests1.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputBeforeExpiryTime); + + const requests2 = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + const expectedOutputAfterExpiryTime = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + + jest.advanceTimersByTime(THREE_MINUTES); + + requests2.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputAfterExpiryTime); + }); + + it('does not expire single id after less than three', () => { + const output: JsonRpcRequest[] = []; + const testStream = createTestStream(output); + + const requests1 = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + const expectedOutputBeforeTimeElapses = [{ id: 1, method: 'foo' }]; + + requests1.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputBeforeTimeElapses); + + const requests2 = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + const expectedOutputAfterTimeElapses = expectedOutputBeforeTimeElapses; + + jest.advanceTimersByTime(THREE_MINUTES - 1); + + requests2.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputAfterTimeElapses); + }); + + it('expires multiple ids after three minutes', () => { + const output: JsonRpcRequest[] = []; + const testStream = createTestStream(output); + + const requests1 = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + { id: 2, method: 'bar' }, + { id: 3, method: 'baz' }, + { id: 3, method: 'baz' }, + ]; + const expectedOutputBeforeExpiryTime = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + { id: 3, method: 'baz' }, + ]; + + requests1.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputBeforeExpiryTime); + + const requests2 = [ + { id: 3, method: 'baz' }, + { id: 3, method: 'baz' }, + { id: 2, method: 'bar' }, + { id: 2, method: 'bar' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + const expectedOutputAfterExpiryTime = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + { id: 3, method: 'baz' }, + { id: 3, method: 'baz' }, + { id: 2, method: 'bar' }, + { id: 1, method: 'foo' }, + ]; + + jest.advanceTimersByTime(THREE_MINUTES); + + requests2.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputAfterExpiryTime); + }); + + it('expires single id in three minute intervals', () => { + const output: JsonRpcRequest[] = []; + const testStream = createTestStream(output); + + const requests1 = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + const expectedOutputBeforeExpiryTime = [{ id: 1, method: 'foo' }]; + + requests1.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputBeforeExpiryTime); + + const requests2 = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + const expectedOutputAfterFirstExpiryTime = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + + jest.advanceTimersByTime(THREE_MINUTES); + + requests2.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputAfterFirstExpiryTime); + + const requests3 = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + const expectedOutputAfterSecondExpiryTime = [ + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + { id: 1, method: 'foo' }, + ]; + + jest.advanceTimersByTime(THREE_MINUTES); + + requests3.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputAfterSecondExpiryTime); + }); + + it('expires somes ids at intervals while not expiring others', () => { + const output: JsonRpcRequest[] = []; + const testStream = createTestStream(output); + + const requests1 = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + ]; + const expectedOutputBeforeExpiryTime = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + ]; + + requests1.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputBeforeExpiryTime); + + const requests2 = [{ id: 3, method: 'baz' }]; + const expectedOutputAfterFirstExpiryTime = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + { id: 3, method: 'baz' }, + ]; + + jest.advanceTimersByTime(THREE_MINUTES - 1); + + requests2.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputAfterFirstExpiryTime); + + const requests3 = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + { id: 3, method: 'baz' }, + { id: 4, method: 'buzz' }, + ]; + const expectedOutputAfterSecondExpiryTime = [ + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + { id: 3, method: 'baz' }, + { id: 1, method: 'foo' }, + { id: 2, method: 'bar' }, + { id: 4, method: 'buzz' }, + ]; + + jest.advanceTimersByTime(THREE_MINUTES - 1); + + requests3.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputAfterSecondExpiryTime); + }); + + it('handles running expiry job without seeing any ids', () => { + const output: JsonRpcRequest[] = []; + const testStream = createTestStream(output); + + const requests1 = [{ id: 1, method: 'foo' }]; + const expectedOutputBeforeExpiryTime = [{ id: 1, method: 'foo' }]; + + requests1.forEach((request) => testStream.write(request)); + expect(output).toEqual(expectedOutputBeforeExpiryTime); + + jest.advanceTimersByTime(THREE_MINUTES + 1); + + expect(output).toEqual(expectedOutputBeforeExpiryTime); + }); +}); diff --git a/app/core/BackgroundBridge/createDupeReqFilterStream.ts b/app/core/BackgroundBridge/createDupeReqFilterStream.ts new file mode 100644 index 00000000000..4af236f60a4 --- /dev/null +++ b/app/core/BackgroundBridge/createDupeReqFilterStream.ts @@ -0,0 +1,79 @@ +import { Transform } from 'readable-stream'; +import { Duration, inMilliseconds, type JsonRpcRequest } from '@metamask/utils'; + +export const THREE_MINUTES = inMilliseconds(3, Duration.Minute); + +/** + * Creates a set abstraction whose values expire after three minutes. + * + * @returns The expiry set. + */ +const makeExpirySet = () => { + const map: Map = new Map(); + + const timerId = setInterval(() => { + const cutoffTime = Date.now() - THREE_MINUTES; + + for (const [id, timestamp] of map.entries()) { + if (timestamp <= cutoffTime) { + map.delete(id); + } else { + break; + } + } + }, THREE_MINUTES); + + return { + /** + * Attempts to add a value to the set. + * + * @param value - The value to add. + * @returns `true` if the value was added, and `false` if it already existed. + */ + add(value: string | number | null) { + if (!map.has(value)) { + map.set(value, Date.now()); + return true; + } + return false; + }, + + /** + * Clear the map and tear down the underlying timer. + */ + destroy() { + map.clear(); + clearInterval(timerId); + }, + }; +}; + +/** + * Returns a transform stream that filters out requests whose ids we've already seen. + * Ignores JSON-RPC notifications, i.e. requests with an `undefined` id. + * + * @returns The stream object. + */ +export default function createDupeReqFilterStream() { + const seenRequestIds = makeExpirySet(); + return new Transform({ + transform(chunk: JsonRpcRequest, _, cb) { + // JSON-RPC notifications have no ids; our only recourse is to let them through. + const hasNoId = chunk.id === undefined; + const requestNotYetSeen = seenRequestIds.add(chunk.id); + + if (hasNoId || requestNotYetSeen) { + cb(null, chunk); + } else { + // eslint-disable-next-line no-console + console.debug(`RPC request with id "${chunk.id}" already seen.`); + cb(); + } + }, + destroy(error, cb) { + seenRequestIds.destroy(); + cb(error); + }, + objectMode: true, + }); +} diff --git a/app/core/Engine/controllers/snaps/execution-service-init.test.ts b/app/core/Engine/controllers/snaps/execution-service-init.test.ts index 70d970c6748..2f4c505cb53 100644 --- a/app/core/Engine/controllers/snaps/execution-service-init.test.ts +++ b/app/core/Engine/controllers/snaps/execution-service-init.test.ts @@ -66,6 +66,7 @@ describe('ExecutionServiceInit', () => { setupProviderConnection, })); + // @ts-expect-error The stream type doesn't match because of a version mismatch. setupSnapProvider(snapId, connectionStream); expect(setupProviderConnection).toHaveBeenCalled(); diff --git a/app/core/Engine/controllers/snaps/execution-service-init.ts b/app/core/Engine/controllers/snaps/execution-service-init.ts index 23bc3b12254..7f084b56d88 100644 --- a/app/core/Engine/controllers/snaps/execution-service-init.ts +++ b/app/core/Engine/controllers/snaps/execution-service-init.ts @@ -65,6 +65,7 @@ export const executionServiceInit: ControllerInitFunction< return { controller: new WebViewExecutionService({ messenger: controllerMessenger, + // @ts-expect-error The stream type doesn't match because of a version mismatch. setupSnapProvider, createWebView, removeWebView, diff --git a/package.json b/package.json index ab758cd80f1..79da48ae793 100644 --- a/package.json +++ b/package.json @@ -338,6 +338,7 @@ "@tradle/react-native-http": "2.0.1", "@types/he": "^1.2.3", "@types/react-test-renderer": "^18.0.0", + "@types/readable-stream": "^2.3.9", "@viem/anvil": "^0.0.10", "@walletconnect/client": "^1.8.0", "@walletconnect/core": "^2.19.2", diff --git a/yarn.lock b/yarn.lock index 649d3f65a15..351958f30e6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -18218,6 +18218,16 @@ __metadata: languageName: node linkType: hard +"@types/readable-stream@npm:^2.3.9": + version: 2.3.15 + resolution: "@types/readable-stream@npm:2.3.15" + dependencies: + "@types/node": "npm:*" + safe-buffer: "npm:~5.1.1" + checksum: 10/49b51e56f9cc401cb31c72973a7565ef4208d7e2465a789843104ec0fcbe609727b0b5bf4682fbec773c7f7bd14858e5dba739fd85e14d8a85e41185d65984d3 + languageName: node + linkType: hard + "@types/readdir-glob@npm:*": version: 1.1.1 resolution: "@types/readdir-glob@npm:1.1.1" @@ -34246,6 +34256,7 @@ __metadata: "@types/react-native-vector-icons": "npm:^6.4.13" "@types/react-native-video": "npm:^5.0.14" "@types/react-test-renderer": "npm:^18.0.0" + "@types/readable-stream": "npm:^2.3.9" "@types/redux-mock-store": "npm:^1.0.3" "@types/semver": "npm:^7" "@types/serve-handler": "npm:^6.1.4"