Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions @vates/generator-toolbox/src/throttle.mts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class Throttle {
} else {
speed = this.#bytesPerSecond
}
assert.ok(speed > 0, `speed must be greater than zero, ${speed} computed`)
assert.ok(speed >= 0, `speed must be positive, ${speed} computed`)
return speed
}
constructor(speed: number | (() => number)) {
Expand All @@ -29,7 +29,9 @@ export class Throttle {
getNextSlot(length: number): { timeout?: ReturnType<typeof setTimeout>; promise?: Promise<unknown> } {
assert.notStrictEqual(length, undefined, `throttled stream need to expose a length property }`)
assert.ok(length > 0, `throttled stream must expose a positive length property , ${length} given }`)

if (this.speed === 0) {
return {}
}
const previous = this.#previousSlot
const nextSlot = Math.round(previous + (length * 1000) / this.speed)
if (nextSlot < Date.now()) {
Expand Down
7 changes: 5 additions & 2 deletions @xen-orchestra/backups/_runners/VmsRemote.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { FullRemote } from './_vmRunners/FullRemote.mjs'
import { IncrementalRemote } from './_vmRunners/IncrementalRemote.mjs'
import { Throttle } from '@vates/generator-toolbox'
import createStreamThrottle from './_createStreamThrottle.mjs'

const noop = Function.prototype

Expand Down Expand Up @@ -41,7 +42,8 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
const schedule = this._schedule
const settings = this._settings

const throttleGenerator = new Throttle()
const throttleGenerator = new Throttle(settings.maxExportRate)
const throttleStream = createStreamThrottle(settings.maxExportRate)

const config = this._config

Expand Down Expand Up @@ -89,7 +91,8 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
schedule,
settings: vmSettings,
sourceRemoteAdapter,
throttleGenerator,
throttleGenerator, // for incrementals
throttleStream, // for full
vmUuid,
}
let vmBackup
Expand Down
5 changes: 4 additions & 1 deletion @xen-orchestra/backups/_runners/VmsXapi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { IncrementalXapi } from './_vmRunners/IncrementalXapi.mjs'
import { FullXapi } from './_vmRunners/FullXapi.mjs'
import { Throttle } from '@vates/generator-toolbox'
import createStreamThrottle from './_createStreamThrottle.mjs'

const noop = Function.prototype

Expand Down Expand Up @@ -55,7 +56,8 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
const schedule = this._schedule
const settings = this._settings

const throttleGenerator = new Throttle()
const throttleGenerator = new Throttle(settings.maxExportRate)
const throttleStream = createStreamThrottle(settings.maxExportRate)

const config = this._config

Expand Down Expand Up @@ -148,6 +150,7 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
settings: vmSettings,
srs,
throttleGenerator,
throttleStream,
vm,
}

Expand Down
2 changes: 1 addition & 1 deletion @xen-orchestra/backups/_runners/_vmRunners/FullRemote.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
const transferList = await this._computeTransferList(({ mode }) => mode === 'full')

for (const metadata of transferList) {
const stream = await this._sourceRemoteAdapter.readFullVmBackup(metadata)
const stream = this._throttleStream(await this._sourceRemoteAdapter.readFullVmBackup(metadata))
const sizeContainer = watchStreamSize(stream)

// @todo shouldn't transfer backup if it will be deleted by retention policy (higher retention on source than destination)
Expand Down
6 changes: 2 additions & 4 deletions @xen-orchestra/backups/_runners/_vmRunners/FullXapi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,14 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
const { compression } = this.job
const vm = this._vm
const exportedVm = this._exportedVm
// @todo put back throttle for full backup/Replication
const stream =
/* this._throttleStream( */
const stream = this._throttleStream(
(
await this._xapi.VM_export(exportedVm.$ref, {
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
useSnapshot: false,
})
).body
/* ) */
)

const vdis = await exportedVm.$getDisks()
let maxStreamLength = 1024 * 1024 // Ovf file and tar headers are a few KB, let's stay safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs
import { Disposable } from 'promise-toolbox'
import { openVhd } from 'vhd-lib'
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
import { SynchronizedDisk } from '@xen-orchestra/disk-transform'
import { SynchronizedDisk, ThrottledDisk } from '@xen-orchestra/disk-transform'

const { warn } = createLogger('xo:backups:Incrementalremote')
class IncrementalRemoteVmBackupRunner extends AbstractRemote {
Expand Down Expand Up @@ -71,8 +71,9 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
const isVhdDifferencing = {}

for (const key in incrementalExport.disks) {
const disk = incrementalExport.disks[key]
let disk = incrementalExport.disks[key]
isVhdDifferencing[key] = disk.isDifferencing()
disk = new ThrottledDisk(disk, this._throttleGenerator)
incrementalExport.disks[key] = new SynchronizedDisk(disk)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
setVmDeltaChainLength,
markExportSuccessfull,
} from '../../_otherConfig.mjs'
import { SynchronizedDisk } from '@xen-orchestra/disk-transform'
import { ThrottledDisk, SynchronizedDisk } from '@xen-orchestra/disk-transform'

const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')

Expand Down Expand Up @@ -43,13 +43,14 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
const isVhdDifferencing = {}
let useNbd = false
for (const key in deltaExport.disks) {
const disk = deltaExport.disks[key]
let disk = deltaExport.disks[key]
isVhdDifferencing[key] = disk.isDifferencing()
if (!isFull && !isVhdDifferencing[key] && key !== exportedVm.$suspend_VDI?.$ref) {
Task.warning('Backup fell back to a full')
}
deltaExport.disks[key] = new SynchronizedDisk(disk)
useNbd = useNbd || disk.useNbd()
disk = new ThrottledDisk(disk, this._throttleGenerator)
deltaExport.disks[key] = new SynchronizedDisk(disk)
}
if (useNbd) {
Task.info('Transfer data using NBD')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstrac
settings,
sourceRemoteAdapter,
throttleGenerator,
throttleStream,
vmUuid,
}) {
super()
Expand All @@ -37,6 +38,7 @@ export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstrac
this._healthCheckSr = healthCheckSr
this._sourceRemoteAdapter = sourceRemoteAdapter
this._throttleGenerator = throttleGenerator
this._throttleStream = throttleStream
this._vmUuid = vmUuid

const allSettings = job.settings
Expand Down
2 changes: 2 additions & 0 deletions @xen-orchestra/backups/_runners/_vmRunners/_AbstractXapi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
settings,
srs,
throttleGenerator,
throttleStream,
vm,
}) {
super()
Expand Down Expand Up @@ -63,6 +64,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
this._jobId = job.id
this._jobSnapshotVdis = undefined
this._throttleGenerator = throttleGenerator
this._throttleStream = throttleStream
this._xapi = vm.$xapi

// Base VM for the export
Expand Down
26 changes: 26 additions & 0 deletions @xen-orchestra/disk-transform/src/Throttled.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Throttle } from '@vates/generator-toolbox'
import { Disk, DiskBlock } from './Disk.mjs'
import { DiskPassthrough } from './DiskPassthrough.mjs'

export class ThrottledDisk extends DiskPassthrough {
#throttle: Throttle
constructor(source: Disk, throttle: Throttle) {
super(source)
this.#throttle = throttle
}
async buildDiskBlockGenerator(): Promise<AsyncGenerator<DiskBlock>> {
const generator = await this.source.buildDiskBlockGenerator()
//throttle want to be able to know the length of the data
async function* generatorWithLength() {
for await (const { index, data } of generator) {
yield {
index,
data,
length: data.length,
}
}
}
const throttledGenerator = this.#throttle.createThrottledGenerator(generatorWithLength())
return throttledGenerator as AsyncGenerator<DiskBlock>
}
}
16 changes: 16 additions & 0 deletions @xen-orchestra/disk-transform/src/Timeout.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Timeout } from '@vates/generator-toolbox'
import { Disk, DiskBlock } from './Disk.mjs'
import { DiskPassthrough } from './DiskPassthrough.mjs'

export class TimeoutDisk extends DiskPassthrough {
#timeout: number
constructor(source: Disk, timeout: number) {
super(source)
this.#timeout = timeout
}
async buildDiskBlockGenerator(): Promise<AsyncGenerator<DiskBlock>> {
const generator = await this.source.buildDiskBlockGenerator()
const timeoutedGenerator = new Timeout(generator, this.#timeout)
return timeoutedGenerator
}
}
2 changes: 2 additions & 0 deletions @xen-orchestra/disk-transform/src/index.mts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ export { DiskPassthrough, RandomDiskPassthrough } from './DiskPassthrough.mjs'
export { RawDisk } from './RawDisk.mjs'
export { ReadAhead } from './ReadAhead.mjs'
export { SynchronizedDisk } from './SynchronizedDisk.mjs'
export { TimeoutDisk } from './Timeout.mjs'
export { ThrottledDisk } from './Throttled.mjs'
export { ProgressHandler } from './ProgressHandler.mjs'
23 changes: 18 additions & 5 deletions @xen-orchestra/xapi/disks/Xapi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* @typedef {import('@xen-orchestra/disk-transform').Disk} Disk
*/

import { DiskLargerBlock, DiskPassthrough, ReadAhead } from '@xen-orchestra/disk-transform'
import { DiskLargerBlock, DiskPassthrough, ReadAhead, TimeoutDisk } from '@xen-orchestra/disk-transform'
import { createLogger } from '@xen-orchestra/log'
import { XapiVhdCbtSource } from './XapiVhdCbt.mjs'
import { XapiStreamNbdSource } from './XapiStreamNbd.mjs'
Expand Down Expand Up @@ -42,6 +42,8 @@ export class XapiDiskSource extends DiskPassthrough {
#blockSize
#useNbd = false
#useCbt = false
/** @type {number} */
#timeout

/**
* @param {Object} params
Expand All @@ -51,13 +53,23 @@ export class XapiDiskSource extends DiskPassthrough {
* @param {boolean} [params.preferNbd=true]
* @param {number} [params.nbdConcurrency=2]
* @param {number} [params.blockSize=2*1024*1024]
* @param {number} [params.timeout=20*60*1000]
*/
constructor({ xapi, vdiRef, baseRef, preferNbd = xapi._preferNbd, nbdConcurrency = 2, blockSize = 2 * 1024 * 1024 }) {
constructor({
xapi,
vdiRef,
baseRef,
preferNbd = xapi._preferNbd,
nbdConcurrency = 2,
blockSize = 2 * 1024 * 1024,
timeout = 20 * 60 * 1000,
}) {
super(undefined)
this.#baseRef = baseRef
this.#blockSize = blockSize
this.#nbdConcurrency = nbdConcurrency
this.#preferNbd = preferNbd
this.#timeout = timeout
this.#vdiRef = vdiRef
this.#xapi = xapi
}
Expand All @@ -84,7 +96,6 @@ export class XapiDiskSource extends DiskPassthrough {
}
source = new XapiStreamNbdSource(streamSource, { vdiRef, baseRef, xapi, nbdConcurrency: this.#nbdConcurrency })
await source.init()

if (source.getBlockSize() < this.#blockSize) {
source = new DiskLargerBlock(source, this.#blockSize)
}
Expand All @@ -101,6 +112,7 @@ export class XapiDiskSource extends DiskPassthrough {
}
this.#useNbd = true
const readAhead = new ReadAhead(source)
source = new TimeoutDisk(source, this.#timeout)
const label = await xapi.getField('VDI', vdiRef, 'name_label')
readAhead.progressHandler = new XapiProgressHandler(xapi, `Exporting content of VDI ${label} through NBD`)
return readAhead
Expand All @@ -110,7 +122,7 @@ export class XapiDiskSource extends DiskPassthrough {
* Create a disk source using stream export.
* On failure, fall back to a full export.
*
* @returns {Promise<XapiVhdStreamSource|XapiQcow2StreamSource | ReadAhead>}
* @returns {Promise<Disk>}
*/
async #openExportStream() {
const xapi = this.#xapi
Expand All @@ -126,6 +138,7 @@ export class XapiDiskSource extends DiskPassthrough {
source = new XapiQcow2StreamSource({ vdiRef, baseRef, xapi })
}
await source.init()
source = new TimeoutDisk(source, this.#timeout)
} catch (error) {
await source?.close()
if (baseRef !== undefined) {
Expand Down Expand Up @@ -158,7 +171,7 @@ export class XapiDiskSource extends DiskPassthrough {
this.#useNbd = true
this.#useCbt = true
const readAhead = new ReadAhead(source)

source = new TimeoutDisk(source, this.#timeout)
if (source.getBlockSize() < this.#blockSize) {
source = new DiskLargerBlock(source, this.#blockSize)
}
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,16 @@

<!--packages-start-->

- @vates/generator-toolbox minor
- @vates/types minor
- @xen-orchestra/backups minor
- @xen-orchestra/disk-transform minor
- @xen-orchestra/immutable-backups patch
- @xen-orchestra/rest-api minor
- @xen-orchestra/web minor
- @xen-orchestra/web-core minor
- @xen-orchestra/xapi patch
- vhd-lib patch
- xo-server minor
- xo-server-auth-saml minor
- xo-web patch
Expand Down
4 changes: 3 additions & 1 deletion packages/vhd-lib/disk-consumer/DiskConsumerVhdDirectory.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ export class DiskConsumerVhdDirectory extends BaseVhd {
const { handler, path, compression, flags, validator, concurrency } = this.#target
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
const uid = 'to stream ' + Math.random()
const generator = this.source.diskBlocks(uid)
let generator
try {
generator = this.source.diskBlocks(uid)
await handler.mktree(dataPath)
const vhd = new VhdDirectory(handler, dataPath, { flags, compression })
vhd.footer = unpackFooter(this.computeVhdFooter())
Expand All @@ -62,6 +63,7 @@ export class DiskConsumerVhdDirectory extends BaseVhd {
await validator(dataPath)
await VhdAbstract.createAlias(handler, path, dataPath)
} catch (err) {
await this.source.close().catch(() => {}) // close this disk in error
await handler.rmtree(dataPath).catch(() => {}) // data
await handler.unlink(path).catch(() => {}) // alias
throw err
Expand Down
Loading