Skip to content

Commit 8e147f9

Browse files
Murderlonmitjap
andauthored
Implement file stream splitter for S3 (#245)
Co-authored-by: Mitja Puzigaća <[email protected]>
1 parent 78d8225 commit 8e147f9

File tree

4 files changed

+372
-242
lines changed

4 files changed

+372
-242
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121

2222
strategy:
2323
matrix:
24-
node-version: [12.x, 14.x, 16.x]
24+
node-version: [14.x, 16.x, 18.x]
2525
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/
2626

2727
steps:

lib/models/StreamSplitter.js

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
const crypto = require('crypto');
2+
const fs = require('fs');
3+
const path = require('path');
4+
const stream = require('stream');
5+
6+
function randomString(size) {
7+
return crypto.randomBytes(size).toString('base64url').slice(0, size);
8+
}
9+
10+
class FileStreamSplitter extends stream.Writable {
11+
constructor({ maxChunkSize, directory }, options) {
12+
super(options);
13+
14+
this.maxChunkSize = maxChunkSize;
15+
16+
this.currentChunkPath = null;
17+
this.currentChunkSize = null;
18+
this.fileDescriptor = null;
19+
20+
this.directory = directory;
21+
this.filenameTemplate = randomString(10);
22+
23+
this.part = 0;
24+
25+
this.on('error', this._finishChunk.bind(this));
26+
}
27+
28+
_write(chunk, encoding, callback) {
29+
Promise.resolve()
30+
.then(() => {
31+
// In order to start writing a chunk, we must first create
32+
// a file system reference for it
33+
if (this.fileDescriptor === null) {
34+
return this._newChunk();
35+
}
36+
return undefined;
37+
})
38+
.then(() => {
39+
const overflow = this.currentChunkSize + chunk.length - this.maxChunkSize;
40+
41+
// If the chunk is bigger than the defined max chunk size,
42+
// we need two passes to process the chunk
43+
if (overflow > 0) {
44+
return this._writeChunk(chunk.slice(0, chunk.length - overflow))
45+
.then(this._finishChunk.bind(this))
46+
.then(this._newChunk.bind(this))
47+
.then(() => {
48+
return this._writeChunk(
49+
chunk.slice(chunk.length - overflow, chunk.length)
50+
);
51+
})
52+
.then(() => callback())
53+
.catch(callback);
54+
}
55+
56+
// The chunk fits in the max chunk size
57+
return this._writeChunk(chunk)
58+
.then(() => callback())
59+
.catch(callback);
60+
})
61+
.catch(callback);
62+
}
63+
64+
_final(callback) {
65+
if (this.fileDescriptor === null) {
66+
callback();
67+
}
68+
else {
69+
this._finishChunk()
70+
.then(() => callback())
71+
.catch(callback);
72+
}
73+
}
74+
75+
_writeChunk(chunk) {
76+
return new Promise((resolve, reject) => {
77+
fs.write(this.fileDescriptor, chunk, (err) => {
78+
if (err) {
79+
return reject(err);
80+
}
81+
82+
this.currentChunkSize += chunk.length;
83+
return resolve();
84+
});
85+
});
86+
}
87+
88+
_finishChunk() {
89+
if (this.fileDescriptor === null) {
90+
return Promise.resolve();
91+
}
92+
93+
return new Promise((resolve, reject) => {
94+
fs.close(this.fileDescriptor, (err) => {
95+
if (err) {
96+
return reject(err);
97+
}
98+
99+
this.emit('chunkFinished', this.currentChunkPath);
100+
101+
this.currentChunkPath = null;
102+
this.fileDescriptor = null;
103+
this.currentChunkSize = null;
104+
105+
this.part += 1;
106+
107+
return resolve();
108+
});
109+
});
110+
}
111+
112+
_newChunk() {
113+
return new Promise((resolve, reject) => {
114+
this.currentChunkPath = path.join(this.directory, `${this.filenameTemplate}-${this.part}`);
115+
fs.open(this.currentChunkPath, 'w', (err, fd) => {
116+
if (err) {
117+
return reject(err);
118+
}
119+
120+
this.emit('chunkStarted', this.currentChunkPath);
121+
122+
this.currentChunkSize = 0;
123+
this.fileDescriptor = fd;
124+
125+
return resolve();
126+
});
127+
});
128+
}
129+
}
130+
131+
module.exports = { FileStreamSplitter };

0 commit comments

Comments
 (0)