Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/sdk/e2e/.gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
build/
.qvac-cache/
qvac.config.json
qvac.config.e2e.generated.json
.env
.env.bak-*
rag-hyperdb/
32 changes: 25 additions & 7 deletions packages/sdk/e2e/tests/desktop/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
GEMMA4_2B_MULTIMODAL_Q4_K_M,
BCI_WINDOWED,
} from "@qvac/sdk";
import * as fs from "node:fs";
import * as path from "node:path";
import { ResourceManager } from "../shared/resource-manager.js";
import { collectTestDeps } from "../shared/collect-test-deps.js";
Expand Down Expand Up @@ -82,7 +83,9 @@ import { NoLingeringBareExecutor } from "./executors/no-lingering-bare-executor.
import { MultiGpuExecutor } from "../shared/executors/multi-gpu-executor.js";
import { DesktopCancellationExecutor } from "./executors/cancellation-executor.js";

const resources = new ResourceManager();
const resources = new ResourceManager({
downloadTarget: "desktop",
});
const isMacosCi = process.platform === "darwin" && process.env["GITHUB_ACTIONS"] === "true";

resources.define("llm", {
Expand Down Expand Up @@ -449,16 +452,31 @@ resources.define("upscaler-cpu", {
},
});

function readJsonConfig(configPath: string) {
return JSON.parse(fs.readFileSync(configPath, "utf8")) as Record<string, unknown>;
}

// Exercises registryDownloadMaxRetries + registryStreamTimeoutMs end-to-end (see config-tests.ts).
function ensureDesktopE2EConfig() {
if (!process.env["QVAC_CONFIG_PATH"]) {
process.env["QVAC_CONFIG_PATH"] = path.resolve(
process.cwd(),
"fixtures/qvac.config.e2e.json",
const fixturePath = path.resolve(process.cwd(), "fixtures/qvac.config.e2e.json");
const existingPath = process.env["QVAC_CONFIG_PATH"];
const fixtureConfig = readJsonConfig(fixturePath);
const existingConfig = existingPath ? readJsonConfig(existingPath) : {};
const mergedConfig = {
...fixtureConfig,
...existingConfig,
};
const generatedPath = path.resolve(process.cwd(), "qvac.config.e2e.generated.json");

fs.writeFileSync(generatedPath, `${JSON.stringify(mergedConfig, null, 2)}\n`);
process.env["QVAC_CONFIG_PATH"] = generatedPath;

if (existingPath) {
console.log(
`📦 Desktop e2e config merged ${fixturePath} with ${existingPath}; using ${generatedPath}`,
);
console.log(`📦 Desktop e2e config set to ${process.env["QVAC_CONFIG_PATH"]}`);
} else {
console.log(`📦 Desktop e2e config: QVAC_CONFIG_PATH already set to ${process.env["QVAC_CONFIG_PATH"]}, skipping`);
console.log(`📦 Desktop e2e config set to ${generatedPath}`);
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/sdk/e2e/tests/mobile/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import { ConfigExecutor } from "../shared/executors/config-executor.js";
import { MobileCancellationExecutor } from "./executors/cancellation-executor.js";

const resources = new ResourceManager({
downloadTarget: "mobile",
// Mobile (iOS + Android) needs a tick after each unloadModel for the
// kernel to actually release pages / reclaim mmap regions — without
// it, the next test's load arrives while the previous model's RSS is
Expand Down
191 changes: 191 additions & 0 deletions packages/sdk/e2e/tests/shared/bootstrap-downloads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
export type BootstrapDownloadTarget = "desktop" | "mobile";

export interface BootstrapDownloadEnv {
QVAC_E2E_DOWNLOAD_CONCURRENCY?: string;
}

export interface BootstrapDownloadItem {
id: string;
name: string;
ownerLabel: string;
run: () => Promise<void>;
}

export interface BootstrapDownloadOptions {
concurrency: number;
retryConcurrency: number;
log?: (message: string) => void;
}

export interface BootstrapDownloadResult {
maxConcurrent: number;
}

interface FailedDownload {
item: BootstrapDownloadItem;
reason: unknown;
}

interface QueueState {
nextIndex: number;
}

const DEFAULT_DOWNLOAD_CONCURRENCY: Record<BootstrapDownloadTarget, number> = {
desktop: 6,
mobile: 4,
};

function positiveIntegerOrNull(value: string | undefined): number | null {
if (!value) return null;
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1) return null;
return parsed;
}

function normalizeConcurrency(value: number, fallback: number) {
if (!Number.isInteger(value) || value < 1) return fallback;
return value;
}

export function resolveBootstrapDownloadConcurrency(
env: BootstrapDownloadEnv = {},
target: BootstrapDownloadTarget = "desktop",
) {
return (
positiveIntegerOrNull(env.QVAC_E2E_DOWNLOAD_CONCURRENCY) ??
DEFAULT_DOWNLOAD_CONCURRENCY[target]
);
}

export function resolveBootstrapRetryConcurrency(concurrency: number) {
return Math.min(2, Math.max(1, concurrency));
}

async function runQueueWorker<T>(
items: readonly T[],
results: PromiseSettledResult<void>[],
state: QueueState,
worker: (item: T) => Promise<void>,
) {
while (state.nextIndex < items.length) {
const index = state.nextIndex;
state.nextIndex++;
try {
await worker(items[index]);
results[index] = { status: "fulfilled", value: undefined };
} catch (reason) {
results[index] = { status: "rejected", reason };
}
}
}

async function mapSettledWithConcurrency<T>(
items: readonly T[],
concurrency: number,
worker: (item: T) => Promise<void>,
) {
const results = new Array<PromiseSettledResult<void>>(items.length);
const state: QueueState = { nextIndex: 0 };

const workerCount = Math.min(concurrency, items.length);
await Promise.all(
Array.from({ length: workerCount }, () =>
runQueueWorker(items, results, state, worker),
),
);
return results;
}

function collectFailures(
items: readonly BootstrapDownloadItem[],
results: readonly PromiseSettledResult<void>[],
) {
const failed: FailedDownload[] = [];
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (result.status === "rejected") {
failed.push({ item: items[i], reason: result.reason });
}
}
return failed;
}

function formatReason(reason: unknown) {
if (reason instanceof Error) return reason.message;
return String(reason);
}

export async function runBootstrapDownloads(
items: readonly BootstrapDownloadItem[],
options: BootstrapDownloadOptions,
) {
const concurrency = normalizeConcurrency(
options.concurrency,
DEFAULT_DOWNLOAD_CONCURRENCY.desktop,
);
const retryConcurrency = normalizeConcurrency(
options.retryConcurrency,
resolveBootstrapRetryConcurrency(concurrency),
);
const log = options.log;
let active = 0;
let maxConcurrent = 0;
let leftToCheck = items.length;
let parallelDetected = false;

async function runItem(item: BootstrapDownloadItem, retry: boolean) {
const prefix = retry ? "🔁 retry" : "📥";
log?.(`${prefix} ${item.name} (used by: ${item.ownerLabel})...`);
active++;
maxConcurrent = Math.max(maxConcurrent, active);
if (!parallelDetected && active >= 2) {
parallelDetected = true;
log?.(`🔀 Parallel downloads confirmed (active: ${active})`);
}
try {
await item.run();
leftToCheck--;
log?.(`✅ ${item.name} cached - still processing: ${leftToCheck}`);
} finally {
active--;
}
}

const firstPassResults = await mapSettledWithConcurrency(
items,
concurrency,
(item) => runItem(item, false),
);
const firstPassFailed = collectFailures(items, firstPassResults);
if (firstPassFailed.length === 0) return { maxConcurrent };

for (const failure of firstPassFailed) {
log?.(
`❌ download failed: ${failure.item.name}: ${formatReason(failure.reason)}`,
);
}

log?.(
`🔁 Retrying ${firstPassFailed.length} failed download(s) with concurrency ${retryConcurrency}`,
);

const retryItems = firstPassFailed.map((failure) => failure.item);
const retryResults = await mapSettledWithConcurrency(
retryItems,
retryConcurrency,
(item) => runItem(item, true),
);
const finalFailed = collectFailures(retryItems, retryResults);
if (finalFailed.length > 0) {
for (const failure of finalFailed) {
log?.(
`❌ retry failed: ${failure.item.name}: ${formatReason(failure.reason)}`,
);
}
throw new Error(
`${finalFailed.length}/${items.length} downloads failed after retry pass`,
);
}

return { maxConcurrent };
}
69 changes: 32 additions & 37 deletions packages/sdk/e2e/tests/shared/resource-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { loadModel, downloadAsset, unloadModel, cancel } from "@qvac/sdk";
import type { ModelConstant } from "@qvac/sdk";
import {
resolveBootstrapDownloadConcurrency,
resolveBootstrapRetryConcurrency,
runBootstrapDownloads,
type BootstrapDownloadItem,
type BootstrapDownloadTarget,
} from "./bootstrap-downloads.js";

type ModelConfig = Record<string, unknown>;
type ModelConfigResolver = () => Promise<ModelConfig>;
Expand Down Expand Up @@ -80,6 +87,7 @@ export interface ResourceManagerOptions {
* Default 0 (off).
*/
unloadSettleMs?: number;
downloadTarget?: BootstrapDownloadTarget;
}

export class ResourceManager {
Expand All @@ -89,9 +97,11 @@ export class ResourceManager {
private testCount = 0;
private downloaded = false;
private readonly unloadSettleMs: number;
private readonly downloadTarget: BootstrapDownloadTarget;

constructor(options: ResourceManagerOptions = {}) {
this.unloadSettleMs = options.unloadSettleMs ?? 0;
this.downloadTarget = options.downloadTarget ?? "desktop";
}

private async resolveConfig(dep: string, def: ModelDefinition): Promise<ModelConfig | undefined> {
Expand Down Expand Up @@ -176,52 +186,37 @@ export class ResourceManager {
}

const downloadList = Array.from(constants.values());
const env = typeof process !== "undefined" ? process.env : {};
const concurrency = resolveBootstrapDownloadConcurrency(
env,
this.downloadTarget,
);
const retryConcurrency = resolveBootstrapRetryConcurrency(concurrency);
log?.(
`📥 Pre-downloading ${downloadList.length} unique model constant(s) from ${contributors.length} def(s) in parallel...`,
`📥 Pre-downloading ${downloadList.length} unique model constant(s) from ${contributors.length} def(s) ` +
`(concurrency=${concurrency}, retryConcurrency=${retryConcurrency})...`,
);

const active = new Set<string>();
let leftToCheck = downloadList.length;
let maxConcurrent = 0;
let parallelDetected = false;

const results = await Promise.allSettled(
downloadList.map(async (constant) => {
const ownerLabel = (owners.get(constant.modelId) ?? []).join(",") || "?";
log?.(`📥 ${constant.name} (used by: ${ownerLabel})...`);
const downloadItems: BootstrapDownloadItem[] = downloadList.map((constant) => ({
id: constant.modelId,
name: constant.name,
ownerLabel: (owners.get(constant.modelId) ?? []).join(",") || "?",
run: async () => {
await downloadAsset({
assetSrc: constant as never,
onProgress: () => {
active.add(constant.modelId);
if (active.size > maxConcurrent) {
maxConcurrent = active.size;
}
if (!parallelDetected && active.size >= 2) {
parallelDetected = true;
const names = Array.from(active)
.map((id) => constants.get(id)?.name ?? id)
.join(", ");
log?.(`🔀 Parallel downloads confirmed (active: ${names})`);
}
},
onProgress: () => {},
});
active.delete(constant.modelId);
leftToCheck--;
log?.(`✅ ${constant.name} cached - still processing: ${leftToCheck}`);
return constant.modelId;
}),
);
},
}));

const failed = results.filter((r) => r.status === "rejected");
if (failed.length > 0) {
for (const f of failed) {
log?.(`❌ download failed: ${(f as PromiseRejectedResult).reason}`);
}
throw new Error(`${failed.length}/${downloadList.length} downloads failed`);
}
const result = await runBootstrapDownloads(downloadItems, {
concurrency,
retryConcurrency,
log,
});

log?.(
`📦 All ${downloadList.length} constant(s) pre-cached (max concurrent: ${maxConcurrent})`,
`📦 All ${downloadList.length} constant(s) pre-cached (max concurrent: ${result.maxConcurrent})`,
);
}

Expand Down
Loading