diff --git a/index.js b/index.js index 2514d51..4259be1 100644 --- a/index.js +++ b/index.js @@ -93,7 +93,7 @@ module.exports = fp( if (raw.socket.destroyed) { throw new Errors.SOCKET_CLOSED(reqId) } else { - raw.once( + raw.socket.once( 'close', function () { if (controllers.has(this)) { diff --git a/test/index.test.js b/test/index.test.js index 4cc2983..6581a2d 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -1,4 +1,7 @@ 'use strict' +const { pipeline } = require('node:stream/promises') +const { Readable, PassThrough } = require('node:stream') + const { promisify } = require('util') const tap = require('tap') @@ -86,7 +89,7 @@ tap.test('fastify-racing#decoration', subtest => { }) tap.test('fastify-racing#promise', { only: true }, subtest => { - subtest.plan(4) + subtest.plan(6) subtest.test('Should handle a request aborted', t => { t.plan(3) @@ -124,13 +127,111 @@ tap.test('fastify-racing#promise', { only: true }, subtest => { t.ok(err) } ) - // Allow a full event loop cycle await sleep(5) abtCtlr.abort() }) }) + subtest.test('Should not incorrectly abort request when body stream consumed', t => { + t.plan(4) + + const app = fastify() + app.register(plugin) + app.addContentTypeParser('application/octet-stream', {}, (_req, payload, done) => done(null, payload)) + + t.teardown(() => app.close()) + + app.post('/', async (req, _reply) => { + const signal = req.race() + + // consume stream + await pipeline(req.body, new PassThrough()) + await sleep(5) // Allow a full event loop cycle + t.equal(signal.aborted, false) + + const result = await Promise.race([signal, dummy(signal)]) + t.equal(typeof result, 'string') + + if (result.type === 'aborted') return '' + else return `${result}-world` + }) + + app + .ready() + .then(() => app.listen({ port: 0 })) + .then(async () => { + request( + `http://localhost:${app.server.address().port}`, + { + method: 'POST', + path: '/', + signal: undefined, + body: Readable.from('stream data'), + headers: { + 'Content-Type': 'application/octet-stream' + } + }, + (err, res) => { + t.error(err) + t.equal(res.statusCode, 200) + } + ) + }) + }) + + subtest.test('Should handle a stream request aborted', t => { + t.plan(5) + + const app = fastify() + const abtCtlr = new AbortController() + app.register(plugin) + app.addContentTypeParser('application/octet-stream', {}, (_req, payload, done) => done(null, payload)) + + t.teardown(() => app.close()) + + app.post('/', async (req, _reply) => { + const signal = req.race() + + // consume stream + await pipeline(req.body, new PassThrough()) + await sleep(5) // Allow a full event loop cycle + t.equal(signal.aborted, false) + + const result = await Promise.race([signal, dummy(signal)]) + t.equal(signal.aborted, true) + t.equal(typeof result, 'object') + t.equal(result.type, 'abort') + + if (result.type === 'aborted') return '' + else return `${result}-world` + }) + + app + .ready() + .then(() => app.listen({ port: 0 })) + .then(async () => { + request( + `http://localhost:${app.server.address().port}`, + { + method: 'POST', + path: '/', + signal: abtCtlr.signal, + body: Readable.from('stream data'), + headers: { + 'Content-Type': 'application/octet-stream' + } + }, + err => { + t.ok(err) + } + ) + // Allow multiple event loop cycles + await sleep(20) + abtCtlr.abort() + }) + }) + subtest.test( 'Should be able to handle more than one race check within a request', t => {