Skip to content

Commit 9a13d3e

Browse files
authored
fix: Revert "fix: drop head after complete in multipart uploads (#1105)" (#1113)
1 parent 29672d1 commit 9a13d3e

5 files changed

Lines changed: 60 additions & 264 deletions

File tree

src/internal/streams/byte-counter.test.ts

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/internal/streams/monitor.test.ts

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/internal/streams/monitor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import { monitorStreamSpeed } from './stream-speed'
66
/**
77
* Monitor readable streams by tracking their speed and bytes read
88
* @param dataStream
9-
* @param byteCounter optional counter for callers that need the final byte count
109
*/
11-
export function monitorStream(dataStream: Readable, byteCounter = createByteCounterStream()) {
10+
export function monitorStream(dataStream: Readable) {
1211
const speedMonitor = monitorStreamSpeed(dataStream)
12+
const byteCounter = createByteCounterStream()
1313
const span = trace.getActiveSpan()
1414

1515
// Limit measures array to prevent unbounded growth during long uploads

src/storage/backend/s3/adapter.test.ts

Lines changed: 25 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { GetObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3'
1+
import { GetObjectCommand, HeadObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3'
22
import { Upload } from '@aws-sdk/lib-storage'
33
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
4-
import { ERRORS, ErrorCode, isStorageError } from '@internal/errors'
5-
import { PassThrough, Readable } from 'stream'
4+
import { ErrorCode, isStorageError } from '@internal/errors'
5+
import { Readable } from 'stream'
66
import { type Mock, vi } from 'vitest'
77
import { getConfig } from '../../../config'
88
import { withOptionalVersion } from '../adapter'
@@ -36,9 +36,6 @@ vi.mock('@aws-sdk/s3-request-presigner', () => ({
3636

3737
type UploadOptionsShape = {
3838
queueSize?: number
39-
params?: {
40-
Body?: AsyncIterable<unknown>
41-
}
4239
}
4340

4441
type MockUploadDoneResult = {
@@ -62,28 +59,14 @@ describe('S3Backend', () => {
6259
let mockUploadDone: Mock<(instance: MockUploadInstance) => Promise<MockUploadDoneResult>>
6360
let uploadInstances: MockUploadInstance[]
6461

65-
async function drainUploadBody(instance: MockUploadInstance) {
66-
const body = instance.options.params?.Body
67-
if (!body) {
68-
return
69-
}
70-
71-
for await (const _chunk of body) {
72-
// Drain the body to simulate what Upload does while sending multipart data.
73-
}
74-
}
75-
7662
beforeEach(() => {
7763
vi.clearAllMocks()
7864
mockSend = vi.fn()
79-
mockUploadDone = vi.fn(async (instance) => {
80-
await drainUploadBody(instance)
81-
return {
82-
ETag: '"multipart-etag"',
83-
$metadata: {
84-
httpStatusCode: 200,
85-
},
86-
}
65+
mockUploadDone = vi.fn().mockResolvedValue({
66+
ETag: '"multipart-etag"',
67+
$metadata: {
68+
httpStatusCode: 200,
69+
},
8770
})
8871
uploadInstances = []
8972

@@ -302,50 +285,29 @@ describe('S3Backend', () => {
302285
})
303286
})
304287

305-
test('uses source stream bytes for over-limit multipart upload metadata', async () => {
288+
test('falls back to multipart upload when content length exceeds the single-request limit', async () => {
306289
const overLimit = MAX_PUT_OBJECT_SIZE + 1
290+
const lastModified = new Date('2024-01-01T00:00:00.000Z')
307291

308-
// Emit a progress value that disagrees with both the declared length and the
309-
// request body; metadata should use the bytes read from the source stream.
310292
mockUploadDone.mockImplementationOnce(async (instance) => {
311293
instance.emit('httpUploadProgress', { loaded: 1 })
312-
await drainUploadBody(instance)
313294
return {
314295
ETag: '"multipart-etag"',
315296
$metadata: {
316297
httpStatusCode: 200,
317298
},
318299
}
319300
})
320-
321-
const backend = createBackend()
322-
const result = await backend.uploadObject(
323-
'test-bucket',
324-
'test-key',
325-
undefined,
326-
Readable.from(['hello']),
327-
'text/plain',
328-
'max-age=60',
329-
undefined,
330-
overLimit
331-
)
332-
333-
expect(Upload).toHaveBeenCalledTimes(1)
334-
expect(uploadInstances[0].options.queueSize).toBe(getConfig().storageS3UploadQueueSize)
335-
expect(mockSend).not.toHaveBeenCalled()
336-
expect(result).toMatchObject({
337-
httpStatusCode: 200,
338-
cacheControl: 'max-age=60',
339-
eTag: '"multipart-etag"',
340-
mimetype: 'text/plain',
341-
contentLength: 5,
342-
size: 5,
343-
lastModified: expect.any(Date),
301+
mockSend.mockResolvedValueOnce({
302+
CacheControl: 'max-age=60',
303+
ContentType: 'text/plain',
304+
ContentLength: overLimit,
305+
ETag: '"head-etag"',
306+
LastModified: lastModified,
307+
$metadata: {
308+
httpStatusCode: 200,
309+
},
344310
})
345-
})
346-
347-
test('uses source stream bytes for over-limit multipart upload without progress', async () => {
348-
const overLimit = MAX_PUT_OBJECT_SIZE + 1
349311

350312
const backend = createBackend()
351313
const result = await backend.uploadObject(
@@ -361,30 +323,20 @@ describe('S3Backend', () => {
361323

362324
expect(Upload).toHaveBeenCalledTimes(1)
363325
expect(uploadInstances[0].options.queueSize).toBe(getConfig().storageS3UploadQueueSize)
364-
expect(mockSend).not.toHaveBeenCalled()
326+
expect(mockSend).toHaveBeenCalledTimes(1)
327+
expect(mockSend.mock.calls[0][0]).toBeInstanceOf(HeadObjectCommand)
365328
expect(result).toMatchObject({
366329
httpStatusCode: 200,
367330
cacheControl: 'max-age=60',
368-
eTag: '"multipart-etag"',
331+
eTag: '"head-etag"',
369332
mimetype: 'text/plain',
370-
contentLength: 5,
371-
size: 5,
372-
lastModified: expect.any(Date),
333+
contentLength: overLimit,
334+
size: overLimit,
335+
lastModified,
373336
})
374337
})
375338

376339
test('uses multipart upload when content length is unknown', async () => {
377-
mockUploadDone.mockImplementationOnce(async (instance) => {
378-
instance.emit('httpUploadProgress', { loaded: 42 })
379-
await drainUploadBody(instance)
380-
return {
381-
ETag: '"multipart-etag"',
382-
$metadata: {
383-
httpStatusCode: 200,
384-
},
385-
}
386-
})
387-
388340
const backend = createBackend()
389341
const result = await backend.uploadObject(
390342
'test-bucket',
@@ -403,111 +355,8 @@ describe('S3Backend', () => {
403355
cacheControl: 'max-age=60',
404356
eTag: '"multipart-etag"',
405357
mimetype: 'text/plain',
406-
contentLength: 5,
407-
size: 5,
408-
lastModified: expect.any(Date),
409-
})
410-
})
411-
412-
test('removes multipart success listeners after upload completes', async () => {
413-
const abortController = new AbortController()
414-
const body = Readable.from(['hello'])
415-
416-
const backend = createBackend()
417-
await backend.uploadObject(
418-
'test-bucket',
419-
'test-key',
420-
undefined,
421-
body,
422-
'text/plain',
423-
'max-age=60',
424-
abortController.signal
425-
)
426-
427-
expect(body.listenerCount('error')).toBe(0)
428-
429-
abortController.abort()
430-
expect(uploadInstances[0].abort).not.toHaveBeenCalled()
431-
})
432-
433-
test('aborts multipart upload when the source stream errors after emitting bytes', async () => {
434-
const sourceError = ERRORS.InvalidRequest('Incomplete trailer section')
435-
const body = new Readable({
436-
read() {
437-
this.push(Buffer.from('hello'))
438-
this.destroy(sourceError)
439-
},
440-
})
441-
442-
mockUploadDone.mockImplementationOnce((instance) => {
443-
void drainUploadBody(instance).catch(() => undefined)
444-
445-
return new Promise((_resolve, reject) => {
446-
instance.abort.mockImplementation(() => reject(sourceError))
447-
})
448-
})
449-
450-
const backend = createBackend()
451-
const upload = backend.uploadObject(
452-
'test-bucket',
453-
'test-key',
454-
undefined,
455-
body,
456-
'text/plain',
457-
'max-age=60'
458-
)
459-
const uploadError = upload.catch((error: unknown) => error)
460-
461-
await vi.waitFor(() => {
462-
expect(uploadInstances[0].abort).toHaveBeenCalledTimes(1)
463-
})
464-
await expect(uploadError).resolves.toMatchObject({
465-
code: ErrorCode.InvalidRequest,
466-
message: 'Incomplete trailer section',
467-
})
468-
})
469-
470-
test('rejects an already-errored multipart source stream without starting upload', async () => {
471-
const sourceError = ERRORS.InvalidRequest('Incomplete trailer section')
472-
const body = new PassThrough()
473-
body.on('error', () => undefined)
474-
body.write(Buffer.from('hello'))
475-
body.destroy(sourceError)
476-
477-
const backend = createBackend()
478-
const upload = backend
479-
.uploadObject('test-bucket', 'test-key', undefined, body, 'text/plain', 'max-age=60')
480-
.catch((error: unknown) => error)
481-
482-
await expect(upload).resolves.toMatchObject({
483-
code: ErrorCode.InvalidRequest,
484-
message: 'Incomplete trailer section',
485-
})
486-
expect(Upload).not.toHaveBeenCalled()
487-
})
488-
489-
test('returns zero-byte metadata for unknown-size uploads without progress', async () => {
490-
const backend = createBackend()
491-
const result = await backend.uploadObject(
492-
'test-bucket',
493-
'empty-key',
494-
undefined,
495-
Readable.from([]),
496-
'application/octet-stream',
497-
'no-cache'
498-
)
499-
500-
expect(Upload).toHaveBeenCalledTimes(1)
501-
expect(uploadInstances[0].options.queueSize).toBe(getConfig().storageS3UploadQueueSize)
502-
expect(mockSend).not.toHaveBeenCalled()
503-
expect(result).toMatchObject({
504-
httpStatusCode: 200,
505-
cacheControl: 'no-cache',
506-
eTag: '"multipart-etag"',
507-
mimetype: 'application/octet-stream',
508358
contentLength: 0,
509359
size: 0,
510-
lastModified: expect.any(Date),
511360
})
512361
})
513362

0 commit comments

Comments
 (0)