Skip to content

Commit be572c6

Browse files
committed
fix: handling consistent cancellation across stream and locks
1 parent b1c07bc commit be572c6

File tree

5 files changed

+96
-26
lines changed

5 files changed

+96
-26
lines changed

packages/server/src/handlers/BaseHandler.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import EventEmitter from 'node:events'
22
import stream from 'node:stream/promises'
3-
import {addAbortSignal, PassThrough} from 'node:stream'
3+
import {PassThrough, Readable} from 'node:stream'
44
import type http from 'node:http'
55

66
import type {ServerOptions} from '../types'
@@ -119,7 +119,7 @@ export class BaseHandler extends EventEmitter {
119119
) {
120120
const locker = await this.getLocker(req)
121121

122-
const lock = locker.newLock(id)
122+
const lock = locker.newLock(id, context.signal)
123123

124124
await lock.lock(() => {
125125
context.cancel()
@@ -129,7 +129,7 @@ export class BaseHandler extends EventEmitter {
129129
}
130130

131131
protected writeToStore(
132-
req: http.IncomingMessage,
132+
data: Readable,
133133
upload: Upload,
134134
maxFileSize: number,
135135
context: CancellationContext
@@ -145,16 +145,25 @@ export class BaseHandler extends EventEmitter {
145145
// Create a PassThrough stream as a proxy to manage the request stream.
146146
// This allows for aborting the write process without affecting the incoming request stream.
147147
const proxy = new PassThrough()
148-
addAbortSignal(context.signal, proxy)
148+
149+
// gracefully terminate the proxy stream when the request is aborted
150+
const onAbort = () => {
151+
data.unpipe(proxy)
152+
153+
if (!proxy.closed) {
154+
proxy.end()
155+
}
156+
}
157+
context.signal.addEventListener('abort', onAbort, { once: true })
149158

150159
proxy.on('error', (err) => {
151-
req.unpipe(proxy)
160+
data.unpipe(proxy)
152161
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
153162
})
154163

155164
const postReceive = throttle(
156165
(offset: number) => {
157-
this.emit(EVENTS.POST_RECEIVE_V2, req, {...upload, offset})
166+
this.emit(EVENTS.POST_RECEIVE_V2, data, {...upload, offset})
158167
},
159168
this.options.postReceiveInterval,
160169
{leading: false}
@@ -166,23 +175,18 @@ export class BaseHandler extends EventEmitter {
166175
postReceive(tempOffset)
167176
})
168177

169-
req.on('error', () => {
170-
if (!proxy.closed) {
171-
// we end the stream gracefully here so that we can upload the remaining bytes to the store
172-
// as an incompletePart
173-
proxy.end()
174-
}
175-
})
176-
177178
// Pipe the request stream through the proxy. We use the proxy instead of the request stream directly
178179
// to ensure that errors in the pipeline do not cause the request stream to be destroyed,
179180
// which would result in a socket hangup error for the client.
180181
stream
181-
.pipeline(req.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
182+
.pipeline(data.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
182183
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
183184
})
184185
.then(resolve)
185186
.catch(reject)
187+
.finally(() => {
188+
context.signal.removeEventListener('abort', onAbort)
189+
})
186190
})
187191
}
188192

packages/server/src/lockers/MemoryLocker.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,27 @@ export class MemoryLocker implements Locker {
3737
this.timeout = options?.acquireLockTimeout ?? 1000 * 30
3838
}
3939

40-
newLock(id: string) {
41-
return new MemoryLock(id, this, this.timeout)
40+
newLock(id: string, signal: AbortSignal) {
41+
return new MemoryLock(id, this, this.timeout, signal)
4242
}
4343
}
4444

4545
class MemoryLock implements Lock {
4646
constructor(
4747
private id: string,
4848
private locker: MemoryLocker,
49-
private timeout: number = 1000 * 30
49+
private timeout: number = 1000 * 30,
50+
private abortSignal: AbortSignal
5051
) {}
5152

5253
async lock(requestRelease: RequestRelease): Promise<void> {
5354
const abortController = new AbortController()
55+
56+
const abortSignal = AbortSignal.any([this.abortSignal, abortController.signal])
57+
5458
const lock = await Promise.race([
55-
this.waitTimeout(abortController.signal),
56-
this.acquireLock(this.id, requestRelease, abortController.signal),
59+
this.waitTimeout(abortSignal),
60+
this.acquireLock(this.id, requestRelease, abortSignal),
5761
])
5862

5963
abortController.abort()
@@ -104,6 +108,12 @@ class MemoryLock implements Lock {
104108
this.locker.locks.delete(this.id)
105109
}
106110

111+
protected waitAbortSignal(signal: AbortSignal) {
112+
return new Promise((resolve, reject) => {
113+
114+
})
115+
}
116+
107117
protected waitTimeout(signal: AbortSignal) {
108118
return new Promise<boolean>((resolve) => {
109119
const timeout = setTimeout(() => {

packages/server/src/server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ export class Server extends EventEmitter {
151151
): Promise<http.ServerResponse | stream.Writable | void> {
152152
const context = this.createContext(req)
153153

154+
// Once the request is closed we abort the context to clean up underline resources
155+
req.on('close', () => {
156+
context.abort()
157+
})
158+
154159
log(`[TusServer] handle: ${req.method} ${req.url}`)
155160
// Allow overriding the HTTP method. The reason for this is
156161
// that some libraries/environments to not support PATCH and
@@ -289,6 +294,17 @@ export class Server extends EventEmitter {
289294

290295
res.writeHead(status, headers)
291296
res.write(body)
297+
298+
// Abort the context once the response is sent.
299+
// Useful for clean-up when the server uses keep-alive
300+
if (!isAborted) {
301+
res.on('finish', () => {
302+
if (!req.closed) {
303+
context.abort()
304+
}
305+
})
306+
}
307+
292308
return res.end()
293309
}
294310

packages/server/test/Locker.test.ts

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ describe('MemoryLocker', () => {
66
it('will acquire a lock by notifying another to release it', async () => {
77
const locker = new MemoryLocker()
88
const lockId = 'upload-id-1'
9+
const abortController = new AbortController()
910

1011
const cancel = sinon.spy()
1112
const cancel2 = sinon.spy()
1213

13-
const lock1 = locker.newLock(lockId)
14-
const lock2 = locker.newLock(lockId)
14+
const lock1 = locker.newLock(lockId, abortController.signal)
15+
const lock2 = locker.newLock(lockId, abortController.signal)
1516

1617
await lock1.lock(async () => {
1718
await lock1.unlock()
@@ -32,8 +33,10 @@ describe('MemoryLocker', () => {
3233
const locker = new MemoryLocker({
3334
acquireLockTimeout: 500,
3435
})
36+
const abortController = new AbortController()
37+
3538
const lockId = 'upload-id-1'
36-
const lock = locker.newLock(lockId)
39+
const lock = locker.newLock(lockId, abortController.signal)
3740

3841
const cancel = sinon.spy()
3942

@@ -57,8 +60,10 @@ describe('MemoryLocker', () => {
5760
it('request lock and unlock', async () => {
5861
const locker = new MemoryLocker()
5962
const lockId = 'upload-id-1'
60-
const lock = locker.newLock(lockId)
61-
const lock2 = locker.newLock(lockId)
63+
const abortController = new AbortController()
64+
65+
const lock = locker.newLock(lockId, abortController.signal)
66+
const lock2 = locker.newLock(lockId, abortController.signal)
6267

6368
const cancel = sinon.spy()
6469
await lock.lock(() => {
@@ -79,4 +84,39 @@ describe('MemoryLocker', () => {
7984
`request released called more times than expected - ${cancel.callCount}`
8085
)
8186
})
87+
88+
it('will stop trying to acquire the lock if the abort signal is aborted', async () => {
89+
const locker = new MemoryLocker()
90+
const lockId = 'upload-id-1'
91+
const abortController = new AbortController()
92+
93+
const cancel = sinon.spy()
94+
const cancel2 = sinon.spy()
95+
96+
const lock1 = locker.newLock(lockId, abortController.signal)
97+
const lock2 = locker.newLock(lockId, abortController.signal)
98+
99+
await lock1.lock(async () => {
100+
// do not unlock when requested
101+
cancel()
102+
})
103+
104+
// Abort signal is aborted after lock2 tries to acquire the lock
105+
setTimeout(() => {
106+
abortController.abort()
107+
}, 100)
108+
109+
try {
110+
await lock2.lock(async () => {
111+
cancel2()
112+
})
113+
assert(false, 'lock2 should not have been acquired')
114+
} catch (e) {
115+
assert(e === ERRORS.ERR_LOCK_TIMEOUT, `error returned is not correct ${e}`)
116+
}
117+
118+
119+
assert(cancel.callCount > 1, `calls count dont match ${cancel.callCount} !== 1`)
120+
assert(cancel2.callCount === 0, `calls count dont match ${cancel.callCount} !== 1`)
121+
})
82122
})

packages/utils/src/models/Locker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ export type RequestRelease = () => Promise<void> | void
44
* The Locker interface creates a Lock instance for a given resource identifier.
55
*/
66
export interface Locker {
7-
newLock(id: string): Lock
7+
newLock(id: string, signal: AbortSignal): Lock
88
}
99

1010
/**

0 commit comments

Comments
 (0)