Skip to content

Commit b076d4d

Browse files
committed
fix: handling consistent cancellation across stream and locks
1 parent b1c07bc commit b076d4d

File tree

4 files changed

+47
-18
lines changed

4 files changed

+47
-18
lines changed

packages/server/src/handlers/BaseHandler.ts

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import EventEmitter from 'node:events'
22
import stream from 'node:stream/promises'
3-
import {addAbortSignal, PassThrough} from 'node:stream'
3+
import {PassThrough, Readable} from 'node:stream'
44
import type http from 'node:http'
55

66
import type {ServerOptions} from '../types'
@@ -119,7 +119,7 @@ export class BaseHandler extends EventEmitter {
119119
) {
120120
const locker = await this.getLocker(req)
121121

122-
const lock = locker.newLock(id)
122+
const lock = locker.newLock(id, context.signal)
123123

124124
await lock.lock(() => {
125125
context.cancel()
@@ -129,7 +129,7 @@ export class BaseHandler extends EventEmitter {
129129
}
130130

131131
protected writeToStore(
132-
req: http.IncomingMessage,
132+
data: Readable,
133133
upload: Upload,
134134
maxFileSize: number,
135135
context: CancellationContext
@@ -145,10 +145,19 @@ export class BaseHandler extends EventEmitter {
145145
// Create a PassThrough stream as a proxy to manage the request stream.
146146
// This allows for aborting the write process without affecting the incoming request stream.
147147
const proxy = new PassThrough()
148-
addAbortSignal(context.signal, proxy)
148+
149+
// gracefully terminate the proxy stream when the request is aborted
150+
const onAbort = () => {
151+
data.unpipe(proxy)
152+
153+
if (!proxy.closed) {
154+
proxy.end()
155+
}
156+
}
157+
context.signal.addEventListener('abort', onAbort, { once: true })
149158

150159
proxy.on('error', (err) => {
151-
req.unpipe(proxy)
160+
data.unpipe(proxy)
152161
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
153162
})
154163

@@ -166,23 +175,18 @@ export class BaseHandler extends EventEmitter {
166175
postReceive(tempOffset)
167176
})
168177

169-
req.on('error', () => {
170-
if (!proxy.closed) {
171-
// we end the stream gracefully here so that we can upload the remaining bytes to the store
172-
// as an incompletePart
173-
proxy.end()
174-
}
175-
})
176-
177178
// Pipe the request stream through the proxy. We use the proxy instead of the request stream directly
178179
// to ensure that errors in the pipeline do not cause the request stream to be destroyed,
179180
// which would result in a socket hangup error for the client.
180181
stream
181-
.pipeline(req.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
182+
.pipeline(data.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
182183
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
183184
})
184185
.then(resolve)
185186
.catch(reject)
187+
.finally(() => {
188+
context.signal.removeEventListener('abort', onAbort)
189+
})
186190
})
187191
}
188192

packages/server/src/lockers/MemoryLocker.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@ class MemoryLock implements Lock {
4949
private timeout: number = 1000 * 30
5050
) {}
5151

52-
async lock(requestRelease: RequestRelease): Promise<void> {
52+
async lock(requestRelease: RequestRelease, signal: AbortSignal): Promise<void> {
5353
const abortController = new AbortController()
54+
55+
const abortSignal = AbortSignal.any([signal, abortController.signal])
56+
5457
const lock = await Promise.race([
55-
this.waitTimeout(abortController.signal),
56-
this.acquireLock(this.id, requestRelease, abortController.signal),
58+
this.waitTimeout(abortSignal),
59+
this.acquireLock(this.id, requestRelease, abortSignal),
5760
])
5861

5962
abortController.abort()
@@ -104,6 +107,12 @@ class MemoryLock implements Lock {
104107
this.locker.locks.delete(this.id)
105108
}
106109

110+
protected waitAbortSignal(signal: AbortSignal) {
111+
return new Promise((resolve, reject) => {
112+
113+
})
114+
}
115+
107116
protected waitTimeout(signal: AbortSignal) {
108117
return new Promise<boolean>((resolve) => {
109118
const timeout = setTimeout(() => {

packages/server/src/server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ export class Server extends EventEmitter {
151151
): Promise<http.ServerResponse | stream.Writable | void> {
152152
const context = this.createContext(req)
153153

154+
// Once the request is closed we abort the context to clean up underline resources
155+
req.on('close', () => {
156+
context.abort()
157+
})
158+
154159
log(`[TusServer] handle: ${req.method} ${req.url}`)
155160
// Allow overriding the HTTP method. The reason for this is
156161
// that some libraries/environments to not support PATCH and
@@ -289,6 +294,17 @@ export class Server extends EventEmitter {
289294

290295
res.writeHead(status, headers)
291296
res.write(body)
297+
298+
// Abort the context once the response is sent.
299+
// Useful for clean-up when the server uses keep-alive
300+
if (!isAborted) {
301+
res.on('finish', () => {
302+
if (!req.closed) {
303+
context.abort()
304+
}
305+
})
306+
}
307+
292308
return res.end()
293309
}
294310

packages/utils/src/models/Locker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@ export interface Locker {
2626
*
2727
*/
2828
export interface Lock {
29-
lock(cancelReq: RequestRelease): Promise<void>
29+
lock(cancelReq: RequestRelease, signal: AbortSignal): Promise<void>
3030
unlock(): Promise<void>
3131
}

0 commit comments

Comments
 (0)