Skip to content

Commit 7573b8d

Browse files
fix: stabilize sdk e2e bootstrap downloads
Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent ce7d3d8 commit 7573b8d

5 files changed

Lines changed: 250 additions & 44 deletions

File tree

packages/sdk/e2e/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
build/
22
.qvac-cache/
33
qvac.config.json
4+
qvac.config.e2e.generated.json
45
.env
56
.env.bak-*
67
rag-hyperdb/

packages/sdk/e2e/tests/desktop/consumer.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import {
4242
GEMMA4_2B_MULTIMODAL_Q4_K_M,
4343
BCI_WINDOWED,
4444
} from "@qvac/sdk";
45+
import * as fs from "node:fs";
4546
import * as path from "node:path";
4647
import { ResourceManager } from "../shared/resource-manager.js";
4748
import { collectTestDeps } from "../shared/collect-test-deps.js";
@@ -82,7 +83,9 @@ import { NoLingeringBareExecutor } from "./executors/no-lingering-bare-executor.
8283
import { MultiGpuExecutor } from "../shared/executors/multi-gpu-executor.js";
8384
import { DesktopCancellationExecutor } from "./executors/cancellation-executor.js";
8485

85-
const resources = new ResourceManager();
86+
const resources = new ResourceManager({
87+
downloadTarget: "desktop",
88+
});
8689
const isMacosCi = process.platform === "darwin" && process.env["GITHUB_ACTIONS"] === "true";
8790

8891
resources.define("llm", {
@@ -449,16 +452,31 @@ resources.define("upscaler-cpu", {
449452
},
450453
});
451454

455+
function readJsonConfig(configPath: string) {
456+
return JSON.parse(fs.readFileSync(configPath, "utf8")) as Record<string, unknown>;
457+
}
458+
452459
// Exercises registryDownloadMaxRetries + registryStreamTimeoutMs end-to-end (see config-tests.ts).
453460
function ensureDesktopE2EConfig() {
454-
if (!process.env["QVAC_CONFIG_PATH"]) {
455-
process.env["QVAC_CONFIG_PATH"] = path.resolve(
456-
process.cwd(),
457-
"fixtures/qvac.config.e2e.json",
461+
const fixturePath = path.resolve(process.cwd(), "fixtures/qvac.config.e2e.json");
462+
const existingPath = process.env["QVAC_CONFIG_PATH"];
463+
const fixtureConfig = readJsonConfig(fixturePath);
464+
const existingConfig = existingPath ? readJsonConfig(existingPath) : {};
465+
const mergedConfig = {
466+
...fixtureConfig,
467+
...existingConfig,
468+
};
469+
const generatedPath = path.resolve(process.cwd(), "qvac.config.e2e.generated.json");
470+
471+
fs.writeFileSync(generatedPath, `${JSON.stringify(mergedConfig, null, 2)}\n`);
472+
process.env["QVAC_CONFIG_PATH"] = generatedPath;
473+
474+
if (existingPath) {
475+
console.log(
476+
`📦 Desktop e2e config merged ${fixturePath} with ${existingPath}; using ${generatedPath}`,
458477
);
459-
console.log(`📦 Desktop e2e config set to ${process.env["QVAC_CONFIG_PATH"]}`);
460478
} else {
461-
console.log(`📦 Desktop e2e config: QVAC_CONFIG_PATH already set to ${process.env["QVAC_CONFIG_PATH"]}, skipping`);
479+
console.log(`📦 Desktop e2e config set to ${generatedPath}`);
462480
}
463481
}
464482

packages/sdk/e2e/tests/mobile/consumer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ import { ConfigExecutor } from "../shared/executors/config-executor.js";
6464
import { MobileCancellationExecutor } from "./executors/cancellation-executor.js";
6565

6666
const resources = new ResourceManager({
67+
downloadTarget: "mobile",
6768
// Mobile (iOS + Android) needs a tick after each unloadModel for the
6869
// kernel to actually release pages / reclaim mmap regions — without
6970
// it, the next test's load arrives while the previous model's RSS is
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
export type BootstrapDownloadTarget = "desktop" | "mobile";
2+
3+
export interface BootstrapDownloadEnv {
4+
QVAC_E2E_DOWNLOAD_CONCURRENCY?: string;
5+
}
6+
7+
export interface BootstrapDownloadItem {
8+
id: string;
9+
name: string;
10+
ownerLabel: string;
11+
run: () => Promise<void>;
12+
}
13+
14+
export interface BootstrapDownloadOptions {
15+
concurrency: number;
16+
retryConcurrency: number;
17+
log?: (message: string) => void;
18+
}
19+
20+
export interface BootstrapDownloadResult {
21+
maxConcurrent: number;
22+
}
23+
24+
interface FailedDownload {
25+
item: BootstrapDownloadItem;
26+
reason: unknown;
27+
}
28+
29+
interface QueueState {
30+
nextIndex: number;
31+
}
32+
33+
const DEFAULT_DOWNLOAD_CONCURRENCY: Record<BootstrapDownloadTarget, number> = {
34+
desktop: 6,
35+
mobile: 4,
36+
};
37+
38+
function positiveIntegerOrNull(value: string | undefined): number | null {
39+
if (!value) return null;
40+
const parsed = Number(value);
41+
if (!Number.isInteger(parsed) || parsed < 1) return null;
42+
return parsed;
43+
}
44+
45+
function normalizeConcurrency(value: number, fallback: number) {
46+
if (!Number.isInteger(value) || value < 1) return fallback;
47+
return value;
48+
}
49+
50+
export function resolveBootstrapDownloadConcurrency(
51+
env: BootstrapDownloadEnv = {},
52+
target: BootstrapDownloadTarget = "desktop",
53+
) {
54+
return (
55+
positiveIntegerOrNull(env.QVAC_E2E_DOWNLOAD_CONCURRENCY) ??
56+
DEFAULT_DOWNLOAD_CONCURRENCY[target]
57+
);
58+
}
59+
60+
export function resolveBootstrapRetryConcurrency(concurrency: number) {
61+
return Math.min(2, Math.max(1, concurrency));
62+
}
63+
64+
async function runQueueWorker<T>(
65+
items: readonly T[],
66+
results: PromiseSettledResult<void>[],
67+
state: QueueState,
68+
worker: (item: T) => Promise<void>,
69+
) {
70+
while (state.nextIndex < items.length) {
71+
const index = state.nextIndex;
72+
state.nextIndex++;
73+
try {
74+
await worker(items[index]);
75+
results[index] = { status: "fulfilled", value: undefined };
76+
} catch (reason) {
77+
results[index] = { status: "rejected", reason };
78+
}
79+
}
80+
}
81+
82+
async function mapSettledWithConcurrency<T>(
83+
items: readonly T[],
84+
concurrency: number,
85+
worker: (item: T) => Promise<void>,
86+
) {
87+
const results = new Array<PromiseSettledResult<void>>(items.length);
88+
const state: QueueState = { nextIndex: 0 };
89+
90+
const workerCount = Math.min(concurrency, items.length);
91+
await Promise.all(
92+
Array.from({ length: workerCount }, () =>
93+
runQueueWorker(items, results, state, worker),
94+
),
95+
);
96+
return results;
97+
}
98+
99+
function collectFailures(
100+
items: readonly BootstrapDownloadItem[],
101+
results: readonly PromiseSettledResult<void>[],
102+
) {
103+
const failed: FailedDownload[] = [];
104+
for (let i = 0; i < results.length; i++) {
105+
const result = results[i];
106+
if (result.status === "rejected") {
107+
failed.push({ item: items[i], reason: result.reason });
108+
}
109+
}
110+
return failed;
111+
}
112+
113+
function formatReason(reason: unknown) {
114+
if (reason instanceof Error) return reason.message;
115+
return String(reason);
116+
}
117+
118+
export async function runBootstrapDownloads(
119+
items: readonly BootstrapDownloadItem[],
120+
options: BootstrapDownloadOptions,
121+
) {
122+
const concurrency = normalizeConcurrency(
123+
options.concurrency,
124+
DEFAULT_DOWNLOAD_CONCURRENCY.desktop,
125+
);
126+
const retryConcurrency = normalizeConcurrency(
127+
options.retryConcurrency,
128+
resolveBootstrapRetryConcurrency(concurrency),
129+
);
130+
const log = options.log;
131+
let active = 0;
132+
let maxConcurrent = 0;
133+
let leftToCheck = items.length;
134+
let parallelDetected = false;
135+
136+
async function runItem(item: BootstrapDownloadItem, retry: boolean) {
137+
const prefix = retry ? "🔁 retry" : "📥";
138+
log?.(`${prefix} ${item.name} (used by: ${item.ownerLabel})...`);
139+
active++;
140+
maxConcurrent = Math.max(maxConcurrent, active);
141+
if (!parallelDetected && active >= 2) {
142+
parallelDetected = true;
143+
log?.(`🔀 Parallel downloads confirmed (active: ${active})`);
144+
}
145+
try {
146+
await item.run();
147+
leftToCheck--;
148+
log?.(`✅ ${item.name} cached - still processing: ${leftToCheck}`);
149+
} finally {
150+
active--;
151+
}
152+
}
153+
154+
const firstPassResults = await mapSettledWithConcurrency(
155+
items,
156+
concurrency,
157+
(item) => runItem(item, false),
158+
);
159+
const firstPassFailed = collectFailures(items, firstPassResults);
160+
if (firstPassFailed.length === 0) return { maxConcurrent };
161+
162+
for (const failure of firstPassFailed) {
163+
log?.(
164+
`❌ download failed: ${failure.item.name}: ${formatReason(failure.reason)}`,
165+
);
166+
}
167+
168+
log?.(
169+
`🔁 Retrying ${firstPassFailed.length} failed download(s) with concurrency ${retryConcurrency}`,
170+
);
171+
172+
const retryItems = firstPassFailed.map((failure) => failure.item);
173+
const retryResults = await mapSettledWithConcurrency(
174+
retryItems,
175+
retryConcurrency,
176+
(item) => runItem(item, true),
177+
);
178+
const finalFailed = collectFailures(retryItems, retryResults);
179+
if (finalFailed.length > 0) {
180+
for (const failure of finalFailed) {
181+
log?.(
182+
`❌ retry failed: ${failure.item.name}: ${formatReason(failure.reason)}`,
183+
);
184+
}
185+
throw new Error(
186+
`${finalFailed.length}/${items.length} downloads failed after retry pass`,
187+
);
188+
}
189+
190+
return { maxConcurrent };
191+
}

packages/sdk/e2e/tests/shared/resource-manager.ts

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
import { loadModel, downloadAsset, unloadModel, cancel } from "@qvac/sdk";
22
import type { ModelConstant } from "@qvac/sdk";
3+
import {
4+
resolveBootstrapDownloadConcurrency,
5+
resolveBootstrapRetryConcurrency,
6+
runBootstrapDownloads,
7+
type BootstrapDownloadItem,
8+
type BootstrapDownloadTarget,
9+
} from "./bootstrap-downloads.js";
310

411
type ModelConfig = Record<string, unknown>;
512
type ModelConfigResolver = () => Promise<ModelConfig>;
@@ -80,6 +87,7 @@ export interface ResourceManagerOptions {
8087
* Default 0 (off).
8188
*/
8289
unloadSettleMs?: number;
90+
downloadTarget?: BootstrapDownloadTarget;
8391
}
8492

8593
export class ResourceManager {
@@ -89,9 +97,11 @@ export class ResourceManager {
8997
private testCount = 0;
9098
private downloaded = false;
9199
private readonly unloadSettleMs: number;
100+
private readonly downloadTarget: BootstrapDownloadTarget;
92101

93102
constructor(options: ResourceManagerOptions = {}) {
94103
this.unloadSettleMs = options.unloadSettleMs ?? 0;
104+
this.downloadTarget = options.downloadTarget ?? "desktop";
95105
}
96106

97107
private async resolveConfig(dep: string, def: ModelDefinition): Promise<ModelConfig | undefined> {
@@ -176,52 +186,37 @@ export class ResourceManager {
176186
}
177187

178188
const downloadList = Array.from(constants.values());
189+
const env = typeof process !== "undefined" ? process.env : {};
190+
const concurrency = resolveBootstrapDownloadConcurrency(
191+
env,
192+
this.downloadTarget,
193+
);
194+
const retryConcurrency = resolveBootstrapRetryConcurrency(concurrency);
179195
log?.(
180-
`📥 Pre-downloading ${downloadList.length} unique model constant(s) from ${contributors.length} def(s) in parallel...`,
196+
`📥 Pre-downloading ${downloadList.length} unique model constant(s) from ${contributors.length} def(s) ` +
197+
`(concurrency=${concurrency}, retryConcurrency=${retryConcurrency})...`,
181198
);
182199

183-
const active = new Set<string>();
184-
let leftToCheck = downloadList.length;
185-
let maxConcurrent = 0;
186-
let parallelDetected = false;
187-
188-
const results = await Promise.allSettled(
189-
downloadList.map(async (constant) => {
190-
const ownerLabel = (owners.get(constant.modelId) ?? []).join(",") || "?";
191-
log?.(`📥 ${constant.name} (used by: ${ownerLabel})...`);
200+
const downloadItems: BootstrapDownloadItem[] = downloadList.map((constant) => ({
201+
id: constant.modelId,
202+
name: constant.name,
203+
ownerLabel: (owners.get(constant.modelId) ?? []).join(",") || "?",
204+
run: async () => {
192205
await downloadAsset({
193206
assetSrc: constant as never,
194-
onProgress: () => {
195-
active.add(constant.modelId);
196-
if (active.size > maxConcurrent) {
197-
maxConcurrent = active.size;
198-
}
199-
if (!parallelDetected && active.size >= 2) {
200-
parallelDetected = true;
201-
const names = Array.from(active)
202-
.map((id) => constants.get(id)?.name ?? id)
203-
.join(", ");
204-
log?.(`🔀 Parallel downloads confirmed (active: ${names})`);
205-
}
206-
},
207+
onProgress: () => {},
207208
});
208-
active.delete(constant.modelId);
209-
leftToCheck--;
210-
log?.(`✅ ${constant.name} cached - still processing: ${leftToCheck}`);
211-
return constant.modelId;
212-
}),
213-
);
209+
},
210+
}));
214211

215-
const failed = results.filter((r) => r.status === "rejected");
216-
if (failed.length > 0) {
217-
for (const f of failed) {
218-
log?.(`❌ download failed: ${(f as PromiseRejectedResult).reason}`);
219-
}
220-
throw new Error(`${failed.length}/${downloadList.length} downloads failed`);
221-
}
212+
const result = await runBootstrapDownloads(downloadItems, {
213+
concurrency,
214+
retryConcurrency,
215+
log,
216+
});
222217

223218
log?.(
224-
`📦 All ${downloadList.length} constant(s) pre-cached (max concurrent: ${maxConcurrent})`,
219+
`📦 All ${downloadList.length} constant(s) pre-cached (max concurrent: ${result.maxConcurrent})`,
225220
);
226221
}
227222

0 commit comments

Comments
 (0)