Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 104 additions & 32 deletions admin/app/jobs/download_model_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ export class DownloadModelJob {
return createHash('sha256').update(modelName).digest('hex').slice(0, 16)
}

/** In-memory registry of abort controllers for active model download jobs */
static abortControllers: Map<string, AbortController> = new Map()

/**
* Redis key used to signal cancellation across processes. Uses a `model-cancel` prefix
* so it cannot collide with content download cancel signals (`nomad:download:cancel:*`).
*/
static cancelKey(jobId: string): string {
return `nomad:download:model-cancel:${jobId}`
}

/** Signal cancellation via Redis so the worker process can pick it up on its next poll tick */
static async signalCancel(jobId: string): Promise<void> {
const queueService = new QueueService()
const queue = queueService.getQueue(this.queue)
const client = await queue.client
await client.set(this.cancelKey(jobId), '1', 'EX', 300) // 5 min TTL
}

async handle(job: Job) {
const { modelName } = job.data as DownloadModelJobParams

Expand All @@ -41,43 +60,96 @@ export class DownloadModelJob {
`[DownloadModelJob] Ollama service is ready. Initiating download for ${modelName}`
)

// Services are ready, initiate the download with progress tracking
const result = await ollamaService.downloadModel(modelName, (progressPercent) => {
if (progressPercent) {
job.updateProgress(Math.floor(progressPercent)).catch((err) => {
if (err?.code !== -1) throw err
})
logger.info(
`[DownloadModelJob] Model ${modelName}: ${progressPercent}%`
)
}
// Register abort controller for this job — used both by in-process cancels (same process
// as the API server) and as the target of the Redis poll loop below.
const abortController = new AbortController()
DownloadModelJob.abortControllers.set(job.id!, abortController)

// Store detailed progress in job data for clients to query
job.updateData({
...job.data,
status: 'downloading',
progress: progressPercent,
progress_timestamp: new Date().toISOString(),
}).catch((err) => {
if (err?.code !== -1) throw err
})
})
// Get Redis client for checking cancel signals from the API process
const queueService = new QueueService()
const cancelRedis = await queueService.getQueue(DownloadModelJob.queue).client

// Track whether cancellation was explicitly requested by the user. Only user-initiated
// cancels become UnrecoverableError — other failures (e.g., transient network errors)
// should still benefit from BullMQ's retry logic.
let userCancelled = false

// Poll Redis for cancel signal every 2s — independent of progress events so cancellation
// works even when the pull is mid-blob and not emitting progress updates.
let cancelPollInterval: ReturnType<typeof setInterval> | null = setInterval(async () => {
try {
const val = await cancelRedis.get(DownloadModelJob.cancelKey(job.id!))
if (val) {
await cancelRedis.del(DownloadModelJob.cancelKey(job.id!))
userCancelled = true
abortController.abort('user-cancel')
}
} catch {
// Redis errors are non-fatal; in-process AbortController covers same-process cancels
}
}, 2000)

if (!result.success) {
logger.error(
`[DownloadModelJob] Failed to initiate download for model ${modelName}: ${result.message}`
try {
// Services are ready, initiate the download with progress tracking
const result = await ollamaService.downloadModel(
modelName,
(progressPercent, bytes) => {
if (progressPercent) {
job.updateProgress(Math.floor(progressPercent)).catch((err) => {
if (err?.code !== -1) throw err
})
}

// Store detailed progress in job data for clients to query
job.updateData({
...job.data,
status: 'downloading',
progress: progressPercent,
downloadedBytes: bytes?.downloadedBytes,
totalBytes: bytes?.totalBytes,
progress_timestamp: new Date().toISOString(),
}).catch((err) => {
if (err?.code !== -1) throw err
})
},
abortController.signal,
job.id!
)
// Don't retry errors that will never succeed (e.g., Ollama version too old)
if (result.retryable === false) {
throw new UnrecoverableError(result.message)

if (!result.success) {
logger.error(
`[DownloadModelJob] Failed to initiate download for model ${modelName}: ${result.message}`
)
// User-initiated cancel — must be unrecoverable to avoid the 40-attempt retry storm.
// The downloadModel() catch block returns retryable: false for cancels, so this branch
// catches both Ollama version mismatches (existing) AND user cancels (new).
if (result.retryable === false) {
throw new UnrecoverableError(result.message)
}
throw new Error(`Failed to initiate download for model: ${result.message}`)
}
throw new Error(`Failed to initiate download for model: ${result.message}`)
}

logger.info(`[DownloadModelJob] Successfully completed download for model ${modelName}`)
return {
modelName,
message: result.message,
logger.info(`[DownloadModelJob] Successfully completed download for model ${modelName}`)
return {
modelName,
message: result.message,
}
} catch (error: any) {
// Belt-and-suspenders: if downloadModel didn't recognize the cancel (e.g., the abort
// fired after the response stream completed but before our code returned), the cancel
// flag tells us this was a user action and should be unrecoverable.
if (userCancelled || abortController.signal.reason === 'user-cancel') {
if (!(error instanceof UnrecoverableError)) {
throw new UnrecoverableError(`Model download cancelled: ${error.message ?? error}`)
}
}
throw error
} finally {
if (cancelPollInterval !== null) {
clearInterval(cancelPollInterval)
cancelPollInterval = null
}
DownloadModelJob.abortControllers.delete(job.id!)
}
}

Expand Down
150 changes: 108 additions & 42 deletions admin/app/services/download_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { DownloadModelJob } from '#jobs/download_model_job'
import { DownloadJobWithProgress, DownloadProgressData } from '../../types/downloads.js'
import { normalize } from 'path'
import { deleteFileIfExists } from '../utils/fs.js'
import transmit from '@adonisjs/transmit/services/main'
import { BROADCAST_CHANNELS } from '../../constants/broadcast.js'

@inject()
export class DownloadService {
Expand Down Expand Up @@ -111,14 +113,32 @@ export class DownloadService {
}

async cancelJob(jobId: string): Promise<{ success: boolean; message: string }> {
// Try the file download queue first (the original PR #554 path)
const queue = this.queueService.getQueue(RunDownloadJob.queue)
const job = await queue.getJob(jobId)

if (!job) {
// Job already completed (removeOnComplete: true) or doesn't exist
return { success: true, message: 'Job not found (may have already completed)' }
if (job) {
return await this._cancelFileDownloadJob(jobId, job, queue)
}

// Fall through to the model download queue
const modelQueue = this.queueService.getQueue(DownloadModelJob.queue)
const modelJob = await modelQueue.getJob(jobId)

if (modelJob) {
return await this._cancelModelDownloadJob(jobId, modelJob, modelQueue)
}

// Not found in either queue
return { success: true, message: 'Job not found (may have already completed)' }
}

/** Cancel a content download (zim, map, pmtiles, etc.) — original PR #554 logic */
private async _cancelFileDownloadJob(
jobId: string,
job: any,
queue: any
): Promise<{ success: boolean; message: string }> {
const filepath = job.data.filepath

// Signal the worker process to abort the download via Redis
Expand All @@ -128,45 +148,8 @@ export class DownloadService {
RunDownloadJob.abortControllers.get(jobId)?.abort('user-cancel')
RunDownloadJob.abortControllers.delete(jobId)

// Poll for terminal state (up to 4s at 250ms intervals) — cooperates with BullMQ's lifecycle
// instead of force-removing an active job and losing the worker's failure/cleanup path.
const POLL_INTERVAL_MS = 250
const POLL_TIMEOUT_MS = 4000
const deadline = Date.now() + POLL_TIMEOUT_MS
let reachedTerminal = false

while (Date.now() < deadline) {
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
try {
const state = await job.getState()
if (state === 'failed' || state === 'completed' || state === 'unknown') {
reachedTerminal = true
break
}
} catch {
reachedTerminal = true // getState() throws if job is already gone
break
}
}

if (!reachedTerminal) {
console.warn(`[DownloadService] cancelJob: job ${jobId} did not reach terminal state within timeout, removing anyway`)
}

// Remove the BullMQ job
try {
await job.remove()
} catch {
// Lock contention fallback: clear lock and retry once
try {
const client = await queue.client
await client.del(`bull:${RunDownloadJob.queue}:${jobId}:lock`)
const updatedJob = await queue.getJob(jobId)
if (updatedJob) await updatedJob.remove()
} catch {
// Best effort - job will be cleaned up on next dismiss attempt
}
}
await this._pollForTerminalState(job, jobId)
await this._removeJobWithLockFallback(job, queue, RunDownloadJob.queue, jobId)

// Delete the partial file from disk
if (filepath) {
Expand Down Expand Up @@ -195,4 +178,87 @@ export class DownloadService {

return { success: true, message: 'Download cancelled and partial file deleted' }
}

/** Cancel an Ollama model download — mirrors the file cancel pattern but skips file cleanup */
private async _cancelModelDownloadJob(
jobId: string,
job: any,
queue: any
): Promise<{ success: boolean; message: string }> {
const modelName: string = job.data?.modelName ?? 'unknown'

// Signal the worker process to abort the pull via Redis
await DownloadModelJob.signalCancel(jobId)

// Also try in-memory abort (works if worker is in same process)
DownloadModelJob.abortControllers.get(jobId)?.abort('user-cancel')
DownloadModelJob.abortControllers.delete(jobId)

await this._pollForTerminalState(job, jobId)
await this._removeJobWithLockFallback(job, queue, DownloadModelJob.queue, jobId)

// Broadcast a cancelled event so the frontend hook clears the entry. We use percent: -2
// (distinct from -1 = error) so the hook can route it to a 2s auto-clear instead of the
// 15s error display. The frontend ALSO removes the entry optimistically from the API
// response, so this is belt-and-suspenders for cases where the SSE arrives first.
transmit.broadcast(BROADCAST_CHANNELS.OLLAMA_MODEL_DOWNLOAD, {
model: modelName,
jobId,
percent: -2,
status: 'cancelled',
timestamp: new Date().toISOString(),
})

// Note on partial blob cleanup: Ollama manages model blobs internally at
// /root/.ollama/models/blobs/. We deliberately do NOT call /api/delete here — Ollama's
// expected behavior is to retain partial blobs so a re-pull resumes from where it left
// off. If the user wants to reclaim that space, they can re-pull and let it complete,
// or delete the partially-downloaded model from the AI Settings page.
return { success: true, message: 'Model download cancelled' }
}

/** Wait up to 4s (250ms intervals) for the job to reach a terminal state */
private async _pollForTerminalState(job: any, jobId: string): Promise<void> {
const POLL_INTERVAL_MS = 250
const POLL_TIMEOUT_MS = 4000
const deadline = Date.now() + POLL_TIMEOUT_MS

while (Date.now() < deadline) {
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
try {
const state = await job.getState()
if (state === 'failed' || state === 'completed' || state === 'unknown') {
return
}
} catch {
return // getState() throws if job is already gone
}
}

console.warn(
`[DownloadService] cancelJob: job ${jobId} did not reach terminal state within timeout, removing anyway`
)
}

/** Remove a BullMQ job, clearing a stale worker lock if the first attempt fails */
private async _removeJobWithLockFallback(
job: any,
queue: any,
queueName: string,
jobId: string
): Promise<void> {
try {
await job.remove()
} catch {
// Lock contention fallback: clear lock and retry once
try {
const client = await queue.client
await client.del(`bull:${queueName}:${jobId}:lock`)
const updatedJob = await queue.getJob(jobId)
if (updatedJob) await updatedJob.remove()
} catch {
// Best effort - job will be cleaned up on next dismiss attempt
}
}
}
}
Loading