diff --git a/packages/block-brokers/.aegir.js b/packages/block-brokers/.aegir.js index 150d748ed..59033d126 100644 --- a/packages/block-brokers/.aegir.js +++ b/packages/block-brokers/.aegir.js @@ -47,26 +47,6 @@ const options = { res.end(Uint8Array.from([104, 101, 108, 108, 111])) }) - goodGateway.all('/ipfs/bafkreig7p6kzwgg4hp3n7wpnnn3kkjmpzxds5rmwhphyueilbzabvyexvq', async (req, res) => { - // if 'delay' header is set, delay the response - const delay = req.headers['delay'] - if (delay) { - await new Promise((resolve) => setTimeout(resolve, delay)) - res.writeHead(200, { - 'content-type': 'application/octet-stream', - 'content-length': 4 - }) - res.end(Uint8Array.from([0, 1, 2, 0])) - } else { - // no reason to use this in tests without the delay configured. - res.writeHead(400, { - 'content-type': 'text/plain', - 'content-length': 13 - }) - res.end('No delay set') - } - }) - goodGateway.all('/ipfs/*', (req, res) => { // succeeds with empty block for any other CID res.writeHead(200) @@ -109,21 +89,6 @@ const options = { // fails validation res.end(Uint8Array.from([0, 1, 2, 1])) }) - badGateway.all('/ipfs/bafkreig7p6kzwgg4hp3n7wpnnn3kkjmpzxds5rmwhphyueilbzabvyexvq', (req, res) => { - // wait for 70 seconds to simulate a slow response. Do not query this cid unless you plan to abort the request. - // typical test timeout is 60 seconds, so if you reach 60 seconds, there is a bug in handling your abort signal. - // if you don't reach whatever timeout you set (e.g. `AbortSignal.timeout(1000)`), there is likely a bug in your test. - const timeoutId = setTimeout(() => { - res.writeHead(200, { - 'content-type': 'application/octet-stream', - 'content-length': 4 - }) - res.end(Uint8Array.from([0, 1, 2, 1])) - }, 70000) - req.on('close', () => { - clearTimeout(timeoutId) - }) - }) badGateway.all('/ipfs/*', (req, res) => { // fails diff --git a/packages/block-brokers/package.json b/packages/block-brokers/package.json index fc51c06d6..6d2e33d69 100644 --- a/packages/block-brokers/package.json +++ b/packages/block-brokers/package.json @@ -77,6 +77,7 @@ "aegir": "^45.1.1", "cors": "^2.8.5", "polka": "^0.5.2", + "race-signal": "^1.1.3", "sinon": "^19.0.2", "sinon-ts": "^2.0.0", "uint8arrays": "^5.1.0" diff --git a/packages/block-brokers/test/trustless-gateway-sessions.spec.ts b/packages/block-brokers/test/trustless-gateway-sessions.spec.ts index ebb921bd6..df723d547 100644 --- a/packages/block-brokers/test/trustless-gateway-sessions.spec.ts +++ b/packages/block-brokers/test/trustless-gateway-sessions.spec.ts @@ -7,9 +7,11 @@ import { multiaddr } from '@multiformats/multiaddr' import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr' import { expect } from 'aegir/chai' import { CID } from 'multiformats/cid' +import { raceSignal } from 'race-signal' import Sinon from 'sinon' import { type StubbedInstance, stubInterface } from 'sinon-ts' import { createTrustlessGatewaySession } from '../src/trustless-gateway/session.js' +import type { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js' import type { Routing } from '@helia/interface' import type { ComponentLogger } from '@libp2p/interface' @@ -117,7 +119,13 @@ describe('trustless-gateway sessions', () => { allowLocal: true }) - const queryProviderSpy = Sinon.spy(session, 'queryProvider') + const queryProviderStub = Sinon.stub(session, 'queryProvider') + + queryProviderStub.callsFake(async (_cid, _provider, options) => { + return raceSignal(new Promise(resolve => { + // never resolve + }), options.signal) + }) components.routing.findProviders.returns(async function * () { yield { @@ -128,9 +136,9 @@ describe('trustless-gateway sessions', () => { } }()) - await expect(session.retrieve(cid, { signal: AbortSignal.timeout(500) })).to.eventually.be.rejected() + await expect(session.retrieve(cid, { signal: AbortSignal.timeout(50) })).to.eventually.be.rejected() .with.property('name', 'AbortError') - expect(queryProviderSpy.callCount).to.equal(1) + expect(queryProviderStub.callCount).to.equal(1) }) it('should not abort the session when the signal is aborted if the block is found', async () => { @@ -138,34 +146,58 @@ describe('trustless-gateway sessions', () => { const block = Uint8Array.from([0, 1, 2, 0]) const session = createTrustlessGatewaySession(components, { allowInsecure: true, - allowLocal: true, - transformRequestInit: (requestInit) => { - requestInit.headers = { - // The difference here on my machine is 25ms.. if the difference between finding the block and the signal being aborted is less than 22ms, then the test will fail. - delay: '478' - } - return requestInit - } + allowLocal: true }) - - const queryProviderSpy = Sinon.spy(session, 'queryProvider') + const providers = [{ + id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), + multiaddrs: [ + uriToMultiaddr(process.env.BAD_TRUSTLESS_GATEWAY ?? '') + ] + }, + { + id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), + multiaddrs: [ + uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '') + ] + }] components.routing.findProviders.returns(async function * () { - yield { - id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - multiaddrs: [ - uriToMultiaddr(process.env.BAD_TRUSTLESS_GATEWAY ?? '') - ] - } - yield { - id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - multiaddrs: [ - uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '') - ] - } + yield providers[1] + yield providers[0] }()) - await expect(session.retrieve(cid, { signal: AbortSignal.timeout(500) })).to.eventually.deep.equal(block) - expect(queryProviderSpy.callCount).to.equal(2) + const controller = new AbortController() + const queryProviderStub = Sinon.stub(session, 'queryProvider') + + // a promise that will resolve at the exact moment we want both events to occur + const triggerMoment = new Promise(resolve => setTimeout(resolve, 50)) + + queryProviderStub.withArgs( + cid, + Sinon.match((provider: TrustlessGateway) => provider.url.toString().includes(process.env.BAD_TRUSTLESS_GATEWAY ?? '')) + ).callsFake(async (_cid, _provider, options) => { + const racedPromise = triggerMoment + .then(async () => new Promise(resolve => setTimeout(resolve, 0))) + .then(() => { return new Uint8Array([0, 1, 2, 3]) }) + return raceSignal(racedPromise, options.signal) + }) + + queryProviderStub.withArgs( + cid, + Sinon.match((provider: TrustlessGateway) => provider.url.toString().includes(process.env.TRUSTLESS_GATEWAY ?? '')) + ).callsFake(async () => { + return triggerMoment.then(() => block) + }) + + // abort the signal + void triggerMoment.then(async () => { + // slight delay to ensure this aborts after the block returning provider + await new Promise(resolve => setTimeout(resolve, 0)) + controller.abort() + }) + + await expect(session.retrieve(cid, { signal: controller.signal })) + .to.eventually.deep.equal(block) + expect(queryProviderStub.callCount).to.equal(2) }) }) diff --git a/packages/utils/src/abstract-session.ts b/packages/utils/src/abstract-session.ts index c5a252306..54c9b8ce2 100644 --- a/packages/utils/src/abstract-session.ts +++ b/packages/utils/src/abstract-session.ts @@ -99,12 +99,17 @@ export abstract class AbstractSession { - if (foundBlock || options.signal?.aborted === true) { + if (foundBlock) { this.log.trace('session idle, found block') // we either found the block or the user gave up return } + if (options.signal?.aborted === true) { + this.log.trace('session idle, signal aborted') + return + } + // find more session peers and retry Promise.resolve() .then(async () => {