Skip to content

Commit 4184c93

Browse files
committed
fix: handling consistent cancellation across stream and locks
1 parent b1c07bc commit 4184c93

File tree

7 files changed

+182
-36
lines changed

7 files changed

+182
-36
lines changed

packages/server/src/handlers/BaseHandler.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import EventEmitter from 'node:events'
22
import stream from 'node:stream/promises'
3-
import {addAbortSignal, PassThrough} from 'node:stream'
3+
import {PassThrough, Readable} from 'node:stream'
44
import type http from 'node:http'
55

66
import type {ServerOptions} from '../types'
@@ -121,15 +121,15 @@ export class BaseHandler extends EventEmitter {
121121

122122
const lock = locker.newLock(id)
123123

124-
await lock.lock(() => {
124+
await lock.lock(context.signal, () => {
125125
context.cancel()
126126
})
127127

128128
return lock
129129
}
130130

131131
protected writeToStore(
132-
req: http.IncomingMessage,
132+
data: Readable,
133133
upload: Upload,
134134
maxFileSize: number,
135135
context: CancellationContext
@@ -145,16 +145,25 @@ export class BaseHandler extends EventEmitter {
145145
// Create a PassThrough stream as a proxy to manage the request stream.
146146
// This allows for aborting the write process without affecting the incoming request stream.
147147
const proxy = new PassThrough()
148-
addAbortSignal(context.signal, proxy)
148+
149+
// gracefully terminate the proxy stream when the request is aborted
150+
const onAbort = () => {
151+
data.unpipe(proxy)
152+
153+
if (!proxy.closed) {
154+
proxy.end()
155+
}
156+
}
157+
context.signal.addEventListener('abort', onAbort, {once: true})
149158

150159
proxy.on('error', (err) => {
151-
req.unpipe(proxy)
160+
data.unpipe(proxy)
152161
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
153162
})
154163

155164
const postReceive = throttle(
156165
(offset: number) => {
157-
this.emit(EVENTS.POST_RECEIVE_V2, req, {...upload, offset})
166+
this.emit(EVENTS.POST_RECEIVE_V2, data, {...upload, offset})
158167
},
159168
this.options.postReceiveInterval,
160169
{leading: false}
@@ -166,23 +175,18 @@ export class BaseHandler extends EventEmitter {
166175
postReceive(tempOffset)
167176
})
168177

169-
req.on('error', () => {
170-
if (!proxy.closed) {
171-
// we end the stream gracefully here so that we can upload the remaining bytes to the store
172-
// as an incompletePart
173-
proxy.end()
174-
}
175-
})
176-
177178
// Pipe the request stream through the proxy. We use the proxy instead of the request stream directly
178179
// to ensure that errors in the pipeline do not cause the request stream to be destroyed,
179180
// which would result in a socket hangup error for the client.
180181
stream
181-
.pipeline(req.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
182+
.pipeline(data.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
182183
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
183184
})
184185
.then(resolve)
185186
.catch(reject)
187+
.finally(() => {
188+
context.signal.removeEventListener('abort', onAbort)
189+
})
186190
})
187191
}
188192

packages/server/src/lockers/MemoryLocker.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@ class MemoryLock implements Lock {
4949
private timeout: number = 1000 * 30
5050
) {}
5151

52-
async lock(requestRelease: RequestRelease): Promise<void> {
52+
async lock(stopSignal: AbortSignal, requestRelease: RequestRelease): Promise<void> {
5353
const abortController = new AbortController()
54+
55+
const abortSignal = AbortSignal.any([stopSignal, abortController.signal])
56+
5457
const lock = await Promise.race([
55-
this.waitTimeout(abortController.signal),
56-
this.acquireLock(this.id, requestRelease, abortController.signal),
58+
this.waitTimeout(abortSignal),
59+
this.acquireLock(this.id, requestRelease, abortSignal),
5760
])
5861

5962
abortController.abort()
@@ -68,12 +71,12 @@ class MemoryLock implements Lock {
6871
requestRelease: RequestRelease,
6972
signal: AbortSignal
7073
): Promise<boolean> {
74+
const lock = this.locker.locks.get(id)
75+
7176
if (signal.aborted) {
72-
return false
77+
return typeof lock !== 'undefined'
7378
}
7479

75-
const lock = this.locker.locks.get(id)
76-
7780
if (!lock) {
7881
const lock = {
7982
requestRelease,

packages/server/src/server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ export class Server extends EventEmitter {
151151
): Promise<http.ServerResponse | stream.Writable | void> {
152152
const context = this.createContext(req)
153153

154+
// Once the request is closed we abort the context to clean up underline resources
155+
req.on('close', () => {
156+
context.abort()
157+
})
158+
154159
log(`[TusServer] handle: ${req.method} ${req.url}`)
155160
// Allow overriding the HTTP method. The reason for this is
156161
// that some libraries/environments to not support PATCH and
@@ -289,6 +294,17 @@ export class Server extends EventEmitter {
289294

290295
res.writeHead(status, headers)
291296
res.write(body)
297+
298+
// Abort the context once the response is sent.
299+
// Useful for clean-up when the server uses keep-alive
300+
if (!isAborted) {
301+
res.on('finish', () => {
302+
if (!req.closed) {
303+
context.abort()
304+
}
305+
})
306+
}
307+
292308
return res.end()
293309
}
294310

packages/server/test/Locker.test.ts

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@ describe('MemoryLocker', () => {
66
it('will acquire a lock by notifying another to release it', async () => {
77
const locker = new MemoryLocker()
88
const lockId = 'upload-id-1'
9+
const abortController = new AbortController()
910

1011
const cancel = sinon.spy()
1112
const cancel2 = sinon.spy()
1213

1314
const lock1 = locker.newLock(lockId)
1415
const lock2 = locker.newLock(lockId)
1516

16-
await lock1.lock(async () => {
17+
await lock1.lock(abortController.signal, async () => {
1718
await lock1.unlock()
1819
cancel()
1920
})
2021

21-
await lock2.lock(async () => {
22+
await lock2.lock(abortController.signal, async () => {
2223
cancel2()
2324
})
2425

@@ -32,19 +33,21 @@ describe('MemoryLocker', () => {
3233
const locker = new MemoryLocker({
3334
acquireLockTimeout: 500,
3435
})
36+
const abortController = new AbortController()
37+
3538
const lockId = 'upload-id-1'
3639
const lock = locker.newLock(lockId)
3740

3841
const cancel = sinon.spy()
3942

40-
await lock.lock(async () => {
43+
await lock.lock(abortController.signal, async () => {
4144
cancel()
4245
// We note that the function has been called, but do not
4346
// release the lock
4447
})
4548

4649
try {
47-
await lock.lock(async () => {
50+
await lock.lock(abortController.signal, async () => {
4851
throw new Error('panic should not be called')
4952
})
5053
} catch (e) {
@@ -57,18 +60,20 @@ describe('MemoryLocker', () => {
5760
it('request lock and unlock', async () => {
5861
const locker = new MemoryLocker()
5962
const lockId = 'upload-id-1'
63+
const abortController = new AbortController()
64+
6065
const lock = locker.newLock(lockId)
6166
const lock2 = locker.newLock(lockId)
6267

6368
const cancel = sinon.spy()
64-
await lock.lock(() => {
69+
await lock.lock(abortController.signal, () => {
6570
cancel()
6671
setTimeout(async () => {
6772
await lock.unlock()
6873
}, 50)
6974
})
7075

71-
await lock2.lock(() => {
76+
await lock2.lock(abortController.signal, () => {
7277
throw new Error('should not be called')
7378
})
7479

@@ -79,4 +84,38 @@ describe('MemoryLocker', () => {
7984
`request released called more times than expected - ${cancel.callCount}`
8085
)
8186
})
87+
88+
it('will stop trying to acquire the lock if the abort signal is aborted', async () => {
89+
const locker = new MemoryLocker()
90+
const lockId = 'upload-id-1'
91+
const abortController = new AbortController()
92+
93+
const cancel = sinon.spy()
94+
const cancel2 = sinon.spy()
95+
96+
const lock1 = locker.newLock(lockId)
97+
const lock2 = locker.newLock(lockId)
98+
99+
await lock1.lock(abortController.signal, async () => {
100+
// do not unlock when requested
101+
cancel()
102+
})
103+
104+
// Abort signal is aborted after lock2 tries to acquire the lock
105+
setTimeout(() => {
106+
abortController.abort()
107+
}, 100)
108+
109+
try {
110+
await lock2.lock(abortController.signal, async () => {
111+
cancel2()
112+
})
113+
assert(false, 'lock2 should not have been acquired')
114+
} catch (e) {
115+
assert(e === ERRORS.ERR_LOCK_TIMEOUT, `error returned is not correct ${e}`)
116+
}
117+
118+
assert(cancel.callCount > 1, `calls count dont match ${cancel.callCount} !== 1`)
119+
assert(cancel2.callCount === 0, `calls count dont match ${cancel.callCount} !== 1`)
120+
})
82121
})

packages/server/test/PatchHandler.test.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {EventEmitter} from 'node:events'
1212
import {addPipableStreamBody} from './utils'
1313
import {MemoryLocker} from '../src'
1414
import streamP from 'node:stream/promises'
15-
import stream from 'node:stream'
15+
import stream, {Duplex, PassThrough} from 'node:stream'
1616

1717
describe('PatchHandler', () => {
1818
const path = '/test/output'
@@ -245,4 +245,69 @@ describe('PatchHandler', () => {
245245
assert.equal(context.signal.aborted, true)
246246
}
247247
})
248+
249+
it('should gracefully terminate request stream when context is cancelled', async () => {
250+
handler = new PatchHandler(store, {path, locker: new MemoryLocker()})
251+
252+
const bodyStream = new PassThrough() // 20kb buffer
253+
const req = addPipableStreamBody(
254+
httpMocks.createRequest({
255+
method: 'PATCH',
256+
url: `${path}/1234`,
257+
body: bodyStream,
258+
})
259+
)
260+
261+
const abortController = new AbortController()
262+
context = {
263+
cancel: () => abortController.abort(),
264+
abort: () => abortController.abort(),
265+
signal: abortController.signal,
266+
}
267+
268+
const res = httpMocks.createResponse({req})
269+
req.headers = {
270+
'upload-offset': '0',
271+
'content-type': 'application/offset+octet-stream',
272+
}
273+
req.url = `${path}/file`
274+
275+
let accumulatedBuffer: Buffer = Buffer.alloc(0)
276+
277+
store.getUpload.resolves(new Upload({id: '1234', offset: 0}))
278+
store.write.callsFake(async (readable: http.IncomingMessage | stream.Readable) => {
279+
const writeStream = new stream.PassThrough()
280+
const chunks: Buffer[] = []
281+
282+
writeStream.on('data', (chunk) => {
283+
chunks.push(chunk) // Accumulate chunks in the outer buffer
284+
})
285+
286+
await streamP.pipeline(readable, writeStream)
287+
288+
accumulatedBuffer = Buffer.concat([accumulatedBuffer, ...chunks])
289+
290+
return writeStream.readableLength
291+
})
292+
store.declareUploadLength.resolves()
293+
294+
await new Promise((resolve, reject) => {
295+
handler.send(req, res, context).then(resolve).catch(reject)
296+
297+
// sends the first 20kb
298+
bodyStream.write(Buffer.alloc(1024 * 20))
299+
300+
// write 15kb
301+
bodyStream.write(Buffer.alloc(1024 * 15))
302+
303+
// simulate that the request was cancelled
304+
setTimeout(() => {
305+
context.abort()
306+
}, 200)
307+
})
308+
309+
// We expect that all the data was written to the store, 35kb
310+
assert.equal(accumulatedBuffer.byteLength, 35 * 1024)
311+
bodyStream.end()
312+
})
248313
})

packages/server/test/utils.ts

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type httpMocks from 'node-mocks-http'
2-
import stream from 'node:stream'
2+
import stream, {Readable, Transform, TransformCallback} from 'node:stream'
33
import type http from 'node:http'
44

55
export function addPipableStreamBody<
@@ -8,21 +8,40 @@ export function addPipableStreamBody<
88
// Create a Readable stream that simulates the request body
99
const bodyStream = new stream.Duplex({
1010
read() {
11-
this.push(
12-
mockRequest.body instanceof Buffer
13-
? mockRequest.body
14-
: JSON.stringify(mockRequest.body)
15-
)
16-
this.push(null)
11+
// This function is intentionally left empty since the data flow
12+
// is controlled by event listeners registered outside of this method.
1713
},
1814
})
1915

16+
// Handle cases where the body is a Readable stream
17+
if (mockRequest.body instanceof Readable) {
18+
// Pipe the mockRequest.body to the bodyStream
19+
mockRequest.body.on('data', (chunk) => {
20+
bodyStream.push(chunk) // Push the chunk to the bodyStream
21+
})
22+
23+
mockRequest.body.on('end', () => {
24+
bodyStream.push(null) // Signal the end of the stream
25+
})
26+
} else {
27+
// Handle cases where the body is not a stream (e.g., Buffer or plain object)
28+
const bodyBuffer =
29+
mockRequest.body instanceof Buffer
30+
? mockRequest.body
31+
: Buffer.from(JSON.stringify(mockRequest.body))
32+
33+
// Push the bodyBuffer and signal the end of the stream
34+
bodyStream.push(bodyBuffer)
35+
bodyStream.push(null)
36+
}
37+
2038
// Add the pipe method to the mockRequest
2139
// @ts-ignore
2240
mockRequest.pipe = (dest: stream.Writable) => bodyStream.pipe(dest)
2341

2442
// Add the unpipe method to the mockRequest
2543
// @ts-ignore
2644
mockRequest.unpipe = (dest: stream.Writable) => bodyStream.unpipe(dest)
45+
2746
return mockRequest
2847
}

packages/utils/src/models/Locker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@ export interface Locker {
2626
*
2727
*/
2828
export interface Lock {
29-
lock(cancelReq: RequestRelease): Promise<void>
29+
lock(signal: AbortSignal, cancelReq: RequestRelease): Promise<void>
3030
unlock(): Promise<void>
3131
}

0 commit comments

Comments
 (0)