Skip to content

Commit a5a4cd3

Browse files
fenosMurderlon
andauthored
@s3/store: add backpressure + download incomplete part first (#561)
Co-authored-by: Merlijn Vos <[email protected]>
1 parent ee2c2ea commit a5a4cd3

File tree

8 files changed

+251
-103
lines changed

8 files changed

+251
-103
lines changed

packages/s3-store/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,22 @@ you need to provide a cache implementation that is shared between all instances
9191

9292
See the exported [KV stores][kvstores] from `@tus/server` for more information.
9393

94+
#### `options.maxConcurrentPartUploads`
95+
96+
This setting determines the maximum number of simultaneous part uploads to an S3 storage service.
97+
The default value is 60. This default is chosen in conjunction with the typical partSize of 8MiB, aiming for an effective transfer rate of 3.84Gbit/s.
98+
99+
**Considerations:**
100+
The ideal value for `maxConcurrentPartUploads` varies based on your `partSize` and the upload bandwidth to your S3 bucket. A larger partSize means less overall upload bandwidth available for other concurrent uploads.
101+
102+
- **Lowering the Value**: Reducing `maxConcurrentPartUploads` decreases the number of simultaneous upload requests to S3. This can be beneficial for conserving memory, CPU, and disk I/O resources, especially in environments with limited system resources or where the upload speed it low or the part size is large.
103+
104+
105+
- **Increasing the Value**: A higher value potentially enhances the data transfer rate to the server, but at the cost of increased resource usage (memory, CPU, and disk I/O). This can be advantageous when the goal is to maximize throughput, and sufficient system resources are available.
106+
107+
108+
- **Bandwidth Considerations**: It's important to note that if your upload bandwidth to S3 is a limiting factor, increasing `maxConcurrentPartUploads` won’t lead to higher throughput. Instead, it will result in additional resource consumption without proportional gains in transfer speed.
109+
94110
## Extensions
95111

96112
The tus protocol supports optional [extensions][]. Below is a table of the supported extensions in `@tus/s3-store`.

packages/s3-store/index.ts

Lines changed: 118 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import os from 'node:os'
22
import fs, {promises as fsProm} from 'node:fs'
33
import stream, {promises as streamProm} from 'node:stream'
44
import type {Readable} from 'node:stream'
5-
import http from 'node:http'
65

76
import AWS, {NoSuchKey, NotFound, S3, S3ClientConfig} from '@aws-sdk/client-s3'
87
import debug from 'debug'
@@ -17,6 +16,11 @@ import {
1716
MemoryKvStore,
1817
} from '@tus/utils'
1918

19+
import {Semaphore, Permit} from '@shopify/semaphore'
20+
import MultiStream from 'multistream'
21+
import crypto from 'node:crypto'
22+
import path from 'node:path'
23+
2024
const log = debug('tus-node-server:stores:s3store')
2125

2226
type Options = {
@@ -25,6 +29,7 @@ type Options = {
2529
// but may increase it to not exceed the S3 10K parts limit.
2630
partSize?: number
2731
useTags?: boolean
32+
maxConcurrentPartUploads?: number
2833
cache?: KvStore<MetadataValue>
2934
expirationPeriodInMilliseconds?: number
3035
// Options to pass to the AWS S3 SDK.
@@ -82,6 +87,7 @@ export class S3Store extends DataStore {
8287
private preferredPartSize: number
8388
private expirationPeriodInMilliseconds = 0
8489
private useTags = true
90+
private partUploadSemaphore: Semaphore
8591
public maxMultipartParts = 10_000 as const
8692
public minPartSize = 5_242_880 as const // 5MiB
8793
public maxUploadSize = 5_497_558_138_880 as const // 5TiB
@@ -101,8 +107,9 @@ export class S3Store extends DataStore {
101107
this.preferredPartSize = partSize || 8 * 1024 * 1024
102108
this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0
103109
this.useTags = options.useTags ?? true
104-
this.client = new S3(restS3ClientConfig)
105110
this.cache = options.cache ?? new MemoryKvStore<MetadataValue>()
111+
this.client = new S3(restS3ClientConfig)
112+
this.partUploadSemaphore = new Semaphore(options.maxConcurrentPartUploads ?? 60)
106113
}
107114

108115
protected shouldUseExpirationTags() {
@@ -233,6 +240,61 @@ export class S3Store extends DataStore {
233240
return data.ETag as string
234241
}
235242

243+
private async downloadIncompletePart(id: string) {
244+
const incompletePart = await this.getIncompletePart(id)
245+
246+
if (!incompletePart) {
247+
return
248+
}
249+
const filePath = await this.uniqueTmpFileName('tus-s3-incomplete-part-')
250+
251+
try {
252+
let incompletePartSize = 0
253+
254+
const byteCounterTransform = new stream.Transform({
255+
transform(chunk, _, callback) {
256+
incompletePartSize += chunk.length
257+
callback(null, chunk)
258+
},
259+
})
260+
261+
// write to temporary file
262+
await streamProm.pipeline(
263+
incompletePart,
264+
byteCounterTransform,
265+
fs.createWriteStream(filePath)
266+
)
267+
268+
const createReadStream = (options: {cleanUpOnEnd: boolean}) => {
269+
const fileReader = fs.createReadStream(filePath)
270+
271+
if (options.cleanUpOnEnd) {
272+
fileReader.on('end', () => {
273+
fs.unlink(filePath, () => {
274+
// ignore
275+
})
276+
})
277+
278+
fileReader.on('error', (err) => {
279+
fileReader.destroy(err)
280+
fs.unlink(filePath, () => {
281+
// ignore
282+
})
283+
})
284+
}
285+
286+
return fileReader
287+
}
288+
289+
return {size: incompletePartSize, path: filePath, createReader: createReadStream}
290+
} catch (err) {
291+
fsProm.rm(filePath).catch(() => {
292+
/* ignore */
293+
})
294+
throw err
295+
}
296+
}
297+
236298
private async getIncompletePart(id: string): Promise<Readable | undefined> {
237299
try {
238300
const data = await this.client.getObject({
@@ -271,102 +333,50 @@ export class S3Store extends DataStore {
271333
})
272334
}
273335

274-
private async prependIncompletePart(
275-
newChunkPath: string,
276-
previousIncompletePart: Readable
277-
): Promise<number> {
278-
const tempPath = `${newChunkPath}-prepend`
279-
try {
280-
let incompletePartSize = 0
281-
282-
const byteCounterTransform = new stream.Transform({
283-
transform(chunk, _, callback) {
284-
incompletePartSize += chunk.length
285-
callback(null, chunk)
286-
},
287-
})
288-
289-
// write to temporary file, truncating if needed
290-
await streamProm.pipeline(
291-
previousIncompletePart,
292-
byteCounterTransform,
293-
fs.createWriteStream(tempPath)
294-
)
295-
// append to temporary file
296-
await streamProm.pipeline(
297-
fs.createReadStream(newChunkPath),
298-
fs.createWriteStream(tempPath, {flags: 'a'})
299-
)
300-
// overwrite existing file
301-
await fsProm.rename(tempPath, newChunkPath)
302-
303-
return incompletePartSize
304-
} catch (err) {
305-
fsProm.rm(tempPath).catch(() => {
306-
/* ignore */
307-
})
308-
throw err
309-
}
310-
}
311-
312336
/**
313337
* Uploads a stream to s3 using multiple parts
314338
*/
315-
private async processUpload(
339+
private async uploadParts(
316340
metadata: MetadataValue,
317-
readStream: http.IncomingMessage | fs.ReadStream,
341+
readStream: stream.Readable,
318342
currentPartNumber: number,
319343
offset: number
320344
): Promise<number> {
321345
const size = metadata.file.size
322346
const promises: Promise<void>[] = []
323347
let pendingChunkFilepath: string | null = null
324348
let bytesUploaded = 0
325-
let currentChunkNumber = 0
349+
let permit: Permit | undefined = undefined
326350

327351
const splitterStream = new StreamSplitter({
328352
chunkSize: this.calcOptimalPartSize(size),
329353
directory: os.tmpdir(),
330354
})
355+
.on('beforeChunkStarted', async () => {
356+
permit = await this.partUploadSemaphore.acquire()
357+
})
331358
.on('chunkStarted', (filepath) => {
332359
pendingChunkFilepath = filepath
333360
})
334361
.on('chunkFinished', ({path, size: partSize}) => {
335362
pendingChunkFilepath = null
336363

337364
const partNumber = currentPartNumber++
338-
const chunkNumber = currentChunkNumber++
365+
const acquiredPermit = permit
339366

340367
offset += partSize
341368

342-
const isFirstChunk = chunkNumber === 0
343369
const isFinalPart = size === offset
344370

345371
// eslint-disable-next-line no-async-promise-executor
346372
const deferred = new Promise<void>(async (resolve, reject) => {
347373
try {
348-
let incompletePartSize = 0
349374
// Only the first chunk of each PATCH request can prepend
350375
// an incomplete part (last chunk) from the previous request.
351-
if (isFirstChunk) {
352-
// If we received a chunk under the minimum part size in a previous iteration,
353-
// we used a regular S3 upload to save it in the bucket. We try to get the incomplete part here.
354-
355-
const incompletePart = await this.getIncompletePart(metadata.file.id)
356-
if (incompletePart) {
357-
// We found an incomplete part, prepend it to the chunk on disk we were about to upload,
358-
// and delete the incomplete part from the bucket. This can be done in parallel.
359-
incompletePartSize = await this.prependIncompletePart(
360-
path,
361-
incompletePart
362-
)
363-
await this.deleteIncompletePart(metadata.file.id)
364-
}
365-
}
366-
367376
const readable = fs.createReadStream(path)
368377
readable.on('error', reject)
369-
if (partSize + incompletePartSize >= this.minPartSize || isFinalPart) {
378+
379+
if (partSize >= this.minPartSize || isFinalPart) {
370380
await this.uploadPart(metadata, readable, partNumber)
371381
} else {
372382
await this.uploadIncompletePart(metadata.file.id, readable)
@@ -380,11 +390,15 @@ export class S3Store extends DataStore {
380390
fsProm.rm(path).catch(() => {
381391
/* ignore */
382392
})
393+
acquiredPermit?.release()
383394
}
384395
})
385396

386397
promises.push(deferred)
387398
})
399+
.on('chunkError', () => {
400+
permit?.release()
401+
})
388402

389403
try {
390404
await streamProm.pipeline(readStream, splitterStream)
@@ -533,26 +547,30 @@ export class S3Store extends DataStore {
533547
/**
534548
* Write to the file, starting at the provided offset
535549
*/
536-
public async write(
537-
readable: http.IncomingMessage | fs.ReadStream,
538-
id: string,
539-
offset: number
540-
): Promise<number> {
550+
public async write(src: stream.Readable, id: string, offset: number): Promise<number> {
541551
// Metadata request needs to happen first
542552
const metadata = await this.getMetadata(id)
543553
const parts = await this.retrieveParts(id)
544554
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
545555
const partNumber: number = parts.length > 0 ? parts[parts.length - 1].PartNumber! : 0
546556
const nextPartNumber = partNumber + 1
547557

548-
const bytesUploaded = await this.processUpload(
549-
metadata,
550-
readable,
551-
nextPartNumber,
552-
offset
553-
)
558+
const incompletePart = await this.downloadIncompletePart(id)
559+
const requestedOffset = offset
560+
561+
if (incompletePart) {
562+
// once the file is on disk, we delete the incomplete part
563+
await this.deleteIncompletePart(id)
564+
565+
offset = requestedOffset - incompletePart.size
566+
src = new MultiStream([incompletePart.createReader({cleanUpOnEnd: true}), src])
567+
}
568+
569+
const bytesUploaded = await this.uploadParts(metadata, src, nextPartNumber, offset)
554570

555-
const newOffset = offset + bytesUploaded
571+
// The size of the incomplete part should not be counted, because the
572+
// process of the incomplete part should be fully transparent to the user.
573+
const newOffset = requestedOffset + bytesUploaded - (incompletePart?.size ?? 0)
556574

557575
if (metadata.file.size === newOffset) {
558576
try {
@@ -741,4 +759,29 @@ export class S3Store extends DataStore {
741759

742760
return deleted
743761
}
762+
763+
private async uniqueTmpFileName(template: string): Promise<string> {
764+
let tries = 0
765+
const maxTries = 10
766+
767+
while (tries < maxTries) {
768+
const fileName =
769+
template + crypto.randomBytes(10).toString('base64url').slice(0, 10)
770+
const filePath = path.join(os.tmpdir(), fileName)
771+
772+
try {
773+
await fsProm.lstat(filePath)
774+
// If no error, file exists, so try again
775+
tries++
776+
} catch (e) {
777+
if (e.code === 'ENOENT') {
778+
// File does not exist, return the path
779+
return filePath
780+
}
781+
throw e // For other errors, rethrow
782+
}
783+
}
784+
785+
throw new Error(`Could not find a unique file name after ${maxTries} tries`)
786+
}
744787
}

packages/s3-store/package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
},
2323
"dependencies": {
2424
"@aws-sdk/client-s3": "^3.490.0",
25+
"@shopify/semaphore": "^3.0.2",
2526
"@tus/utils": "workspace:*",
26-
"debug": "^4.3.4"
27+
"debug": "^4.3.4",
28+
"multistream": "^4.1.0"
2729
},
2830
"devDependencies": {
2931
"@types/debug": "^4.1.12",
3032
"@types/mocha": "^10.0.6",
33+
"@types/multistream": "^4.1.3",
3134
"@types/node": "^20.11.5",
3235
"eslint": "^8.56.0",
3336
"eslint-config-custom": "workspace:*",

packages/s3-store/test.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import path from 'node:path'
2-
import fs from 'node:fs/promises'
32
import assert from 'node:assert/strict'
43
import {Readable} from 'node:stream'
54

@@ -39,17 +38,6 @@ describe('S3DataStore', function () {
3938
assert.strictEqual(Number.isFinite(store.calcOptimalPartSize(undefined)), true)
4039
})
4140

42-
it('should correctly prepend a buffer to a file', async function () {
43-
const p = path.resolve(fixturesPath, 'foo.txt')
44-
await fs.writeFile(p, 'world!')
45-
await this.datastore.prependIncompletePart(
46-
p,
47-
Readable.from([new TextEncoder().encode('Hello, ')])
48-
)
49-
assert.strictEqual(await fs.readFile(p, 'utf8'), 'Hello, world!')
50-
await fs.unlink(p)
51-
})
52-
5341
it('should store in between chunks under the minimum part size and prepend it to the next call', async function () {
5442
const store = this.datastore
5543
const size = 1024

packages/server/src/handlers/BaseHandler.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,11 @@ export class BaseHandler extends EventEmitter {
149149
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
150150
})
151151

152-
req.on('error', (err) => {
152+
req.on('error', () => {
153153
if (!proxy.closed) {
154-
proxy.destroy(err)
154+
// we end the stream gracefully here so that we can upload the remaining bytes to the store
155+
// as an incompletePart
156+
proxy.end()
155157
}
156158
})
157159

0 commit comments

Comments
 (0)