Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/cpp/server/model_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4294,6 +4294,79 @@ void ModelManager::download_from_manifest(const json& manifest, std::map<std::st
}
}

// Resume an interrupted download across an upstream commit-hash change without
// restarting from zero. The HF API returns the latest commit SHA on every call,
// so pausing a download and resuming after the repo received new commits yields
// a different commit_hash and a fresh snapshots/<new_hash>/ — orphaning the
// previous attempt's .partial / completed files. refs/main is sticky (advanced
// only on successful completion), so it does not point at an interrupted
// snapshot; the interrupted attempt is located by its in-progress
// .download_manifest.json marker instead (at most one per repo cache, removed on
// completion). Rename that snapshot forward to the new hash so
// download_from_manifest resumes its files in place. Carrying bytes forward is
// safe because download_from_manifest verifies each file against the new
// commit's hash and re-fetches any that changed (see expected_hash handling).
//
// Note: when the selected artifacts are unchanged across the commit, the later
// reuse path (can_reuse_previous_hf_snapshot) may instead keep the completed
// previous snapshot and drop this migrated copy — that is fine (the previous
// snapshot is complete and verified); this migration simply becomes a no-op.
//
// Scope: this is applied to the MAIN repo only. The single download manifest is
// written to the main snapshot, so auxiliary checkpoint repos (text encoder, VAE,
// mmproj, ...) have no .download_manifest.json marker in their own snapshot for
// this to find — an interrupted auxiliary snapshot is therefore not migrated and
// is re-downloaded fresh under the new hash (safe; #1950 still verifies what is
// fetched). Resuming interrupted auxiliary snapshots would require detecting them
// by a recursive .partial scan instead and is left as a follow-up.
static void resume_snapshot_across_commit(const fs::path& cache_path,
const std::string& new_hash) {
if (new_hash.empty() || new_hash == "main") return;

fs::path snapshots_dir = cache_path / "snapshots";
if (!safe_exists(snapshots_dir)) return;

fs::path new_snapshot = snapshots_dir / new_hash;
// A same-commit resume already lands in the right directory — nothing to do.
if (safe_exists(new_snapshot / ".download_manifest.json")) return;

// Find the interrupted snapshot (a different hash dir carrying an in-progress
// manifest). In normal operation at most one exists, since the manifest is
// written before the download and removed on completion; if a crash ever left
// more than one, the first by iteration order is taken (still safe — per-file
// hash verification re-fetches anything that does not match the new commit).
fs::path interrupted;
std::error_code ec;
for (const auto& entry : fs::directory_iterator(snapshots_dir, safe_dir_options, ec)) {
if (ec) break;
if (!safe_is_directory(entry.path())) continue;
if (entry.path().filename() == new_hash) continue;
if (safe_exists(entry.path() / ".download_manifest.json")) {
interrupted = entry.path();
break;
}
}
if (interrupted.empty()) return;

// Take the new-hash directory's place. download_from_huggingface may have
// just created it empty; remove it so the rename can land. Never clobber a
// new snapshot that already has content.
if (safe_exists(new_snapshot)) {
if (!fs::is_empty(new_snapshot, ec) || ec) return;
fs::remove(new_snapshot, ec);
if (ec) return;
}
fs::rename(interrupted, new_snapshot, ec);
if (ec) {
LOG(WARNING, "ModelManager") << "Could not migrate interrupted snapshot "
<< interrupted.filename().string() << " -> " << new_hash
<< " to resume download: " << ec.message() << std::endl;
return;
}
LOG(INFO, "ModelManager") << "Resuming interrupted download: migrated snapshot "
<< interrupted.filename().string() << " -> " << new_hash << std::endl;
}

// Download model files from HuggingFace
// =====================================
// IMPORTANT: This function ALWAYS queries the HuggingFace API to get the repository
Expand Down Expand Up @@ -4369,6 +4442,11 @@ void ModelManager::download_from_huggingface(const ModelInfo& info,
LOG(INFO, "ModelManager") << "Warning: No commit hash found in API response, using 'main'" << std::endl;
}

// If a download under a previous commit hash was interrupted, move it forward
// to the current hash so its partial/complete files resume instead of being
// orphaned. Must run before the snapshot dir below is created.
resume_snapshot_across_commit(model_cache_path, commit_hash);

// Create snapshot directory using commit hash
fs::path snapshot_path = model_cache_path / "snapshots" / commit_hash;
ensure_create_directories(snapshot_path);
Expand Down
165 changes: 165 additions & 0 deletions test/server_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3259,6 +3259,171 @@ def test_033_pull_variants_valid_checkpoint_returns_variant_list(self):
f"{[v['name'] for v in variants]}"
)

def _get_snapshot_hash_from_refs(self, repo_cache_dir):
"""Read the snapshot hash recorded in refs/main, if present."""
refs_path = os.path.join(repo_cache_dir, "refs", "main")
if os.path.isfile(refs_path):
with open(refs_path, encoding="utf-8") as f:
return f.read().strip()
return None

def test_034_pull_resumes_interrupted_snapshot_across_commit_change(self):
"""An interrupted download whose snapshot sits under a stale commit hash
is migrated forward to the current hash and resumed in place, not
re-downloaded from zero.

Reproduces the HuggingFace "sha changed during a paused download" case:
the API returns the latest commit SHA on every call, so a pause/resume
after the repo received new commits yields a different commit_hash. The
interrupted attempt is located by its .download_manifest.json marker
(refs/main is sticky and does not point at it), renamed to the new hash,
and its files resume in place — proven by an unchanged file mtime.
"""
self._ensure_model_pulled()

repo_id = "unsloth/gemma-3-270m-it-GGUF" # main repo for ENDPOINT_TEST_MODEL (Tiny-Test-Model-GGUF)
hf_cache = get_hf_cache_dir()
repo_cache_dir = os.path.join(hf_cache, "models--" + repo_id.replace("/", "--"))
snapshots_dir = os.path.join(repo_cache_dir, "snapshots")
if not os.path.isdir(snapshots_dir):
self.skipTest("HF cache snapshots directory not found")

real_hash = self._get_snapshot_hash_from_refs(repo_cache_dir)
if not real_hash or not os.path.isdir(os.path.join(snapshots_dir, real_hash)):
self.skipTest("current snapshot not found")
real_snapshot = os.path.join(snapshots_dir, real_hash)

# Largest real file is the "resumed in place" witness (rename preserves
# mtime; a re-download would rewrite it).
witness_rel, witness_size = None, -1
for root, _dirs, files in os.walk(real_snapshot):
for f in files:
if f.startswith(".") or f.endswith(".partial"):
continue
size = os.path.getsize(os.path.join(root, f))
if size > witness_size:
witness_size = size
witness_rel = os.path.relpath(os.path.join(root, f), real_snapshot)
if witness_rel is None:
self.skipTest("no witness file in snapshot")

fake_old = "0" * 40
old_snapshot = os.path.join(snapshots_dir, fake_old)
# Clear any leftover fake dir from a previously interrupted run so the
# rename below starts clean.
if os.path.isdir(old_snapshot):
shutil.rmtree(old_snapshot, ignore_errors=True)

# All cache mutation happens inside the try so the finally always restores
# the active snapshot if anything fails before the migration completes.
try:
# Simulate an interrupted download under an older commit hash: move the
# snapshot to a fake hash and drop an in-progress manifest marker into
# it. (No .partial needed — the manifest is the interrupted gate.)
os.rename(real_snapshot, old_snapshot)
with open(
os.path.join(old_snapshot, ".download_manifest.json"),
"w",
encoding="utf-8",
) as mf:
json.dump(
{"repo_id": repo_id, "commit_hash": fake_old, "files": []}, mf
)
witness_mtime_before = os.stat(
os.path.join(old_snapshot, witness_rel)
).st_mtime

response = requests.post(
f"{self.base_url}/pull",
json={"model_name": ENDPOINT_TEST_MODEL, "stream": False},
timeout=TIMEOUT_MODEL_OPERATION,
)
self.assertEqual(
response.status_code, 200, f"Pull failed: {response.text[:500]}"
)
self.assertEqual(response.json().get("status"), "success")

# Interrupted snapshot migrated forward (the stale dir is gone).
self.assertFalse(
os.path.isdir(old_snapshot),
"interrupted snapshot should have been migrated forward",
)
new_hash = self._get_snapshot_hash_from_refs(repo_cache_dir)
self.assertEqual(new_hash, real_hash)
witness_after = os.path.join(snapshots_dir, new_hash, witness_rel)
self.assertTrue(os.path.isfile(witness_after))
# Resume proof: carried in place (mtime unchanged), not re-downloaded.
self.assertEqual(
os.stat(witness_after).st_mtime,
witness_mtime_before,
"witness file was re-downloaded instead of resumed in place",
)

models = requests.get(
f"{self.base_url}/models", timeout=TIMEOUT_DEFAULT
).json()
self.assertIn(ENDPOINT_TEST_MODEL, [m["id"] for m in models["data"]])
print("[OK] interrupted snapshot migrated forward and resumed in place")
finally:
if os.path.isdir(old_snapshot) and not os.path.isdir(
os.path.join(snapshots_dir, real_hash)
):
os.rename(old_snapshot, os.path.join(snapshots_dir, real_hash))

def test_035_pull_does_not_migrate_completed_stale_snapshot(self):
"""A *completed* snapshot under a stale hash (no .download_manifest.json
marker) is NOT migrated by the resume path — only genuinely interrupted
downloads are. Guards the gate so a completed copy is never moved/renamed
out from under a normal upgrade.
"""
self._ensure_model_pulled()

repo_id = "unsloth/gemma-3-270m-it-GGUF"
hf_cache = get_hf_cache_dir()
repo_cache_dir = os.path.join(hf_cache, "models--" + repo_id.replace("/", "--"))
snapshots_dir = os.path.join(repo_cache_dir, "snapshots")
if not os.path.isdir(snapshots_dir):
self.skipTest("HF cache snapshots directory not found")

real_hash = self._get_snapshot_hash_from_refs(repo_cache_dir)
if not real_hash or not os.path.isdir(os.path.join(snapshots_dir, real_hash)):
self.skipTest("current snapshot not found")
real_snapshot = os.path.join(snapshots_dir, real_hash)

fake_old = "1" * 40
old_snapshot = os.path.join(snapshots_dir, fake_old)

# Completed snapshot under a stale hash: copy the files, strip any
# manifest/partial, leave the real snapshot in place.
if os.path.isdir(old_snapshot):
shutil.rmtree(old_snapshot)
shutil.copytree(real_snapshot, old_snapshot)
for root, _dirs, files in os.walk(old_snapshot):
for f in files:
if f == ".download_manifest.json" or f.endswith(".partial"):
os.remove(os.path.join(root, f))

try:
response = requests.post(
f"{self.base_url}/pull",
json={"model_name": ENDPOINT_TEST_MODEL, "stream": False},
timeout=TIMEOUT_MODEL_OPERATION,
)
self.assertEqual(
response.status_code, 200, f"Pull failed: {response.text[:500]}"
)
self.assertEqual(response.json().get("status"), "success")

# The completed stale snapshot must be left intact — not migrated.
self.assertTrue(
os.path.isdir(old_snapshot),
"completed (manifest-less) snapshot must not be migrated by resume",
)
print("[OK] completed stale snapshot left intact (resume gate honored)")
finally:
if os.path.isdir(old_snapshot):
shutil.rmtree(old_snapshot, ignore_errors=True)


if __name__ == "__main__":
run_server_tests(EndpointTests, "ENDPOINT TESTS")
Loading