Skip to content

Commit 27e370f

Browse files
authored
Merge pull request #63 from s1seven/62-fixfile-storage-allow-to-properly-handle-upload-error-with-s3
fix: file storage allow to properly handle upload error with S3
2 parents 7e85d28 + 952b34a commit 27e370f

6 files changed

+37
-13
lines changed

packages/file-storage/src/file-storage-fs.class.ts

+6-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import {
1313
} from 'fs';
1414
import { readdir, readFile, rm, writeFile } from 'node:fs/promises';
1515
import { resolve as resolvePath } from 'node:path';
16-
import { Readable, Writable } from 'node:stream';
16+
import { finished, Readable } from 'node:stream';
1717

1818
import { MethodTypes } from './constants';
1919
import {
@@ -23,6 +23,7 @@ import {
2323
FileStorageConfigFactory,
2424
FileStorageDirBaseArgs,
2525
} from './file-storage.class';
26+
import { FileStorageWritable } from './types';
2627

2728
export type StreamOptions = {
2829
flags?: string;
@@ -115,10 +116,12 @@ export class FileStorageLocal implements FileStorage {
115116
return writeFile(fileName, content, options);
116117
}
117118

118-
async uploadStream(args: FileStorageLocalUploadStream): Promise<Writable> {
119+
async uploadStream(args: FileStorageLocalUploadStream): Promise<FileStorageWritable> {
119120
const { filePath, options, request } = args;
120121
const fileName = await this.transformFilePath(filePath, MethodTypes.WRITE, request, options);
121-
return createWriteStream(fileName, options);
122+
const writeStream = createWriteStream(fileName, options);
123+
finished(writeStream, (err) => writeStream.emit('done', err));
124+
return writeStream;
122125
}
123126

124127
downloadFile(args: {

packages/file-storage/src/file-storage-s3.class.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
} from '@aws-sdk/client-s3';
1010
import { Upload } from '@aws-sdk/lib-storage';
1111
import type { Request } from 'express';
12-
import { PassThrough, Readable, Writable } from 'node:stream';
12+
import { PassThrough, Readable } from 'node:stream';
1313

1414
import { MethodTypes } from './constants';
1515
import {
@@ -19,6 +19,7 @@ import {
1919
FileStorageConfigFactory,
2020
FileStorageDirBaseArgs,
2121
} from './file-storage.class';
22+
import { FileStorageWritable } from './types';
2223

2324
/**
2425
* Either region or endpoint must be provided
@@ -140,7 +141,7 @@ export class FileStorageS3 implements FileStorage {
140141
}).done();
141142
}
142143

143-
async uploadStream(args: FileStorageS3UploadStream): Promise<Writable> {
144+
async uploadStream(args: FileStorageS3UploadStream): Promise<FileStorageWritable> {
144145
const { filePath, options = {}, request } = args;
145146
const Key = await this.transformFilePath(filePath, MethodTypes.WRITE, request, options);
146147
const { s3, bucket: Bucket } = this.config;
@@ -155,8 +156,11 @@ export class FileStorageS3 implements FileStorage {
155156
},
156157
})
157158
.done()
159+
.then(() => {
160+
writeStream.emit('done');
161+
})
158162
.catch((err) => {
159-
writeStream.destroy(err);
163+
writeStream.emit('done', err);
160164
});
161165
return writeStream;
162166
}

packages/file-storage/src/file-storage.class.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
/* eslint-disable @typescript-eslint/no-unused-vars */
22
import type { Request } from 'express';
3-
import type { Readable, Writable } from 'node:stream';
3+
import type { Readable } from 'node:stream';
44

55
import { MethodTypes } from './constants';
6+
import { FileStorageWritable } from './types';
67

78
// TODO: extend configuration
89
export interface FileStorageConfig {
@@ -71,7 +72,7 @@ export abstract class FileStorage {
7172
args: FileStorageBaseArgs & {
7273
options?: string | any;
7374
},
74-
): Promise<Writable> {
75+
): Promise<FileStorageWritable> {
7576
throw new Error(defaultErrorMessage);
7677
}
7778

packages/file-storage/src/file-storage.service.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Injectable } from '@nestjs/common';
2-
import type { Readable, Writable } from 'node:stream';
2+
import type { Readable } from 'node:stream';
33

44
import { InjectFileStorageStrategy } from './decorators';
55
import { FileStorage, FileStorageBaseArgs, FileStorageDirBaseArgs } from './file-storage.class';
@@ -18,6 +18,7 @@ import {
1818
FileStorageS3UploadFile,
1919
FileStorageS3UploadStream,
2020
} from './file-storage-s3.class';
21+
import type { FileStorageWritable } from './types';
2122

2223
@Injectable()
2324
export class FileStorageService implements Omit<FileStorage, 'transformFilePath'> {
@@ -31,7 +32,7 @@ export class FileStorageService implements Omit<FileStorage, 'transformFilePath'
3132
return this.fileStorage.uploadFile(args);
3233
}
3334

34-
uploadStream(args: FileStorageLocalUploadStream | FileStorageS3UploadStream): Promise<Writable> {
35+
uploadStream(args: FileStorageLocalUploadStream | FileStorageS3UploadStream): Promise<FileStorageWritable> {
3536
return this.fileStorage.uploadStream(args);
3637
}
3738

packages/file-storage/src/types.ts

+13
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { InjectionToken, ModuleMetadata } from '@nestjs/common';
2+
import { Writable } from 'node:stream';
23

34
import { FileStorage, FileStorageConfigFactory } from './file-storage.class';
45
import { FileStorageLocalSetup } from './file-storage-fs.class';
@@ -30,3 +31,15 @@ export interface FileStorageModuleAsyncOptions extends Pick<ModuleMetadata, 'imp
3031
// useExisting?: FileStorage;
3132
inject?: InjectionToken[];
3233
}
34+
35+
interface WritableWithDoneEvent {
36+
emit(event: 'done', error?: Error): boolean;
37+
addListener(event: 'done', listener: (error?: Error) => void): this;
38+
on(event: 'done', listener: (error?: Error) => void): this;
39+
once(event: 'done', listener: (error?: Error) => void): this;
40+
prependOnceListener(event: 'done', listener: (error?: Error) => void): this;
41+
prependListener(event: 'done', listener: (error?: Error) => void): this;
42+
removeListener(event: 'done', listener: () => void): this;
43+
}
44+
45+
export type FileStorageWritable = Writable & WritableWithDoneEvent;

packages/file-storage/test/file-storage.e2e-spec.ts

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Test, TestingModule } from '@nestjs/testing';
44
import * as dotenv from 'dotenv';
55
import { mkdir, rm } from 'node:fs/promises';
66
import { resolve } from 'node:path';
7-
import { Readable } from 'node:stream';
7+
import { once, Readable } from 'node:stream';
88
import { pipeline } from 'node:stream/promises';
99

1010
import { FileStorage, FileStorageModule, FileStorageModuleOptions, StorageType } from '../src';
@@ -110,9 +110,11 @@ testMap.forEach((testSuite) => {
110110
it('uploadStream uploads a file', async () => {
111111
const upload = await fileStorage.uploadStream({ filePath: testFileName });
112112
const entry = Readable.from(Buffer.from(testFileContent));
113+
const ac = new AbortController();
114+
const t = setTimeout(() => ac.abort(), 300);
115+
const listener = once(upload, 'done', { signal: ac.signal });
113116
await pipeline(entry, upload);
114-
// add delay, otherwise test is flaky
115-
await new Promise<void>((resolve) => setTimeout(resolve, 100));
117+
await listener.finally(() => clearTimeout(t));
116118
const result = await fileStorage.readDir({ dirPath });
117119
expect(result.length).toBe(1);
118120
});

0 commit comments

Comments
 (0)