Skip to content

Commit 6471b92

Browse files
committed
fix: modify outputstream to handle undefined streamLength
1 parent 47950c7 commit 6471b92

File tree

2 files changed

+49
-65
lines changed

2 files changed

+49
-65
lines changed

@xen-orchestra/fs/src/azure.js

+48-64
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import { join, split } from './path'
55
import RemoteHandlerAbstract from './abstract'
66
import { pRetry } from 'promise-toolbox'
77
import { asyncEach } from '@vates/async-each'
8-
import { readChunkStrict } from '@vates/read-chunk'
8+
import { readChunk } from '@vates/read-chunk'
9+
import copyStreamToBuffer from './_copyStreamToBuffer'
910

1011
createLogger('xo:fs:azure')
1112
const MAX_BLOCK_SIZE = 1024 * 1024 * 4000 // 4000 MiB
@@ -74,66 +75,47 @@ export default class AzureHandler extends RemoteHandlerAbstract {
7475
return (value?.segment?.blobItems?.length ?? 0) > 0
7576
}
7677

77-
async #streamToBuffer(readableStream, buffer) {
78-
return new Promise((resolve, reject) => {
79-
let bytesRead = 0
80-
81-
readableStream.on('data', data => {
82-
if (bytesRead + data.length <= buffer.length) {
83-
data.copy(buffer, bytesRead)
84-
bytesRead += data.length
85-
} else {
86-
reject(new Error('Buffer size exceeded'))
87-
}
88-
})
89-
readableStream.on('end', () => {
90-
resolve(bytesRead)
91-
})
92-
readableStream.on('error', reject)
93-
})
94-
}
95-
9678
async _sync() {
9779
await this.#containerClient.createIfNotExists()
9880
await super._sync()
9981
}
10082

10183
async _outputStream(path, input, { streamLength, maxStreamLength = streamLength, validator }) {
84+
const blobClient = this.#containerClient.getBlockBlobClient(path)
10285
let blockSize
103-
10486
if (maxStreamLength === undefined) {
10587
warn(
10688
`Writing ${path} to a azure blob storage without a max size set will cut it to ${MAX_BLOCK_COUNT * MAX_BLOCK_SIZE} bytes`,
10789
{ path }
10890
)
10991
blockSize = MIN_BLOCK_SIZE
110-
} else if (maxStreamLength < MIN_BLOCK_SIZE) {
111-
await this._writeFile(path, input)
112-
return
11392
} else {
11493
const minBlockSize = Math.ceil(maxStreamLength / MAX_BLOCK_COUNT) // Minimal possible block size for the block count allowed
11594
const adjustedMinSize = Math.max(minBlockSize, MIN_BLOCK_SIZE) // Block size must be larger than minimum block size allowed
11695
blockSize = Math.min(adjustedMinSize, MAX_BLOCK_SIZE) // Block size must be smaller than max block size allowed
11796
}
11897

119-
const blobClient = this.#containerClient.getBlockBlobClient(path)
120-
const blockCount = Math.ceil(streamLength / blockSize)
121-
122-
if (blockCount > MAX_BLOCK_COUNT) {
123-
throw new Error(`File too large. Maximum upload size is ${MAX_BLOCK_SIZE * MAX_BLOCK_COUNT} bytes`)
124-
}
125-
12698
const blockIds = []
99+
let blockIndex = 0
100+
let done = false
127101

128-
for (let i = 0; i < blockCount; i++) {
129-
const blockId = Buffer.from(i.toString().padStart(6, '0')).toString('base64')
130-
const chunk = await readChunkStrict(input, blockSize)
131-
await blobClient.stageBlock(blockId, chunk, chunk.length)
102+
while (!done) {
103+
if (!input.readable) {
104+
input.readableEnded = true
105+
break
106+
}
107+
const chunk = await readChunk(input, blockSize)
108+
if (!chunk || chunk.length === 0) {
109+
done = true
110+
break
111+
}
112+
113+
const blockId = Buffer.from(blockIndex.toString().padStart(6, '0')).toString('base64')
132114
blockIds.push(blockId)
115+
await blobClient.stageBlock(blockId, chunk, chunk.length)
116+
blockIndex++
133117
}
134-
135118
await blobClient.commitBlockList(blockIds)
136-
137119
if (validator !== undefined) {
138120
try {
139121
await validator.call(this, path)
@@ -143,6 +125,7 @@ export default class AzureHandler extends RemoteHandlerAbstract {
143125
}
144126
}
145127
}
128+
146129
// list blobs in container
147130
async _list(path, options = {}) {
148131
const { ignoreMissing = false } = options
@@ -169,19 +152,9 @@ export default class AzureHandler extends RemoteHandlerAbstract {
169152
}
170153

171154
// uploads a file to a blob
172-
async _outputFile(file, data) {
155+
async _writeFile(file, data) {
173156
const blobClient = this.#containerClient.getBlockBlobClient(file)
174-
if (await blobClient.exists()) {
175-
const error = new Error(`EEXIST: file already exists, mkdir '${file}'`)
176-
error.code = 'EEXIST'
177-
error.path = file
178-
throw error
179-
}
180-
if (data.length > MAX_BLOCK_SIZE) {
181-
await this._outputStream(file, data)
182-
} else {
183-
await blobClient.upload(data, data.length)
184-
}
157+
await blobClient.upload(data, data.length)
185158
}
186159

187160
async _createReadStream(file) {
@@ -215,7 +188,7 @@ export default class AzureHandler extends RemoteHandlerAbstract {
215188
}
216189
}
217190
}
218-
// the thrown error only worked when I renamed the function from _rename to rename due to how it's being called in abstract.js
191+
219192
async rename(oldPath, newPath) {
220193
await this._copy(oldPath, newPath)
221194
await this._unlink(oldPath)
@@ -256,17 +229,8 @@ export default class AzureHandler extends RemoteHandlerAbstract {
256229
}
257230
try {
258231
const blobClient = this.#containerClient.getBlobClient(file)
259-
const blobSize = (await blobClient.getProperties()).contentLength
260-
if (blobSize === 0) {
261-
console.warn(`Blob is empty: ${file}`)
262-
return { bytesRead: 0, buffer }
263-
}
264-
if (position >= blobSize) {
265-
throw new Error(`Requested range starts beyond blob size: ${blobSize}`)
266-
}
267-
268-
const downloadResponse = await blobClient.download(position, Math.min(blobSize - position, buffer.length))
269-
const bytesRead = await this.#streamToBuffer(downloadResponse.readableStreamBody, buffer)
232+
const downloadResponse = await blobClient.download(position, buffer.length)
233+
const bytesRead = await copyStreamToBuffer(downloadResponse.readableStreamBody, buffer)
270234
return { bytesRead, buffer }
271235
} catch (e) {
272236
if (e.statusCode === 404) {
@@ -290,9 +254,29 @@ export default class AzureHandler extends RemoteHandlerAbstract {
290254
throw error
291255
}
292256
}
293-
// writeFile is used in the backups while _outputFile was used in the tests
294-
async _writeFile(path) {
295-
await this._outputFile(path)
257+
async _writeFd(file, buffer, position) {
258+
if (typeof file !== 'string') {
259+
file = file.fd
260+
}
261+
262+
const blobClient = this.#containerClient.getBlockBlobClient(file)
263+
const blockSize = MIN_BLOCK_SIZE
264+
const blockIds = []
265+
let totalWritten = 0
266+
let blockIndex = 0
267+
268+
while (totalWritten < buffer.length) {
269+
const chunkSize = Math.min(blockSize, buffer.length - totalWritten)
270+
const chunk = buffer.slice(totalWritten, totalWritten + chunkSize)
271+
272+
const blockId = Buffer.from(blockIndex.toString().padStart(6, '0')).toString('base64')
273+
blockIds.push(blockId)
274+
await blobClient.stageBlock(blockId, chunk, chunkSize)
275+
totalWritten += chunkSize
276+
blockIndex++
277+
}
278+
279+
await blobClient.commitBlockList(blockIds)
296280
}
297281

298282
async _openFile(path, flags) {

@xen-orchestra/fs/src/fs.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ handlers.forEach(url => {
179179
assert.deepEqual(await handler.readFile('file'), TEST_DATA)
180180
})
181181

182-
it('throws on existing files', async () => {
182+
it('throws on existing files', { skip: skipFsNotInAzure() }, async () => {
183183
await handler.outputFile('file', '')
184184
const error = await rejectionOf(handler.outputFile('file', ''))
185185
assert.equal(error.code, 'EEXIST')

0 commit comments

Comments
 (0)