diff --git a/packages/cloud-api/v1/eliza/agents/[agentId]/resume/route.ts b/packages/cloud-api/v1/eliza/agents/[agentId]/resume/route.ts index b5806dae289b3..56d5c67d9671f 100644 --- a/packages/cloud-api/v1/eliza/agents/[agentId]/resume/route.ts +++ b/packages/cloud-api/v1/eliza/agents/[agentId]/resume/route.ts @@ -173,16 +173,25 @@ async function __hono_POST( } try { + // Distinct from `agent_provision` so the daemon can tell a user- + // initiated resume from a fresh provision in audit logs, and so a + // future `docker start` fast path can hook in without touching + // the route. Today executeResume always re-provisions to restore + // bridge/health URLs via the sandbox handle. const { job, created } = - await provisioningJobService.enqueueAgentProvisionOnce({ + await provisioningJobService.enqueueAgentResumeOnce({ agentId, organizationId: user.organization_id, userId: user.id, - agentName: agent.agent_name ?? agentId, webhookUrl, - expectedUpdatedAt: agent.updated_at, }); + // Best-effort wake of the orchestrator so the user does not wait for + // the next cron tick. Same pattern as provision/delete/suspend. + void provisioningJobService.triggerImmediate().catch(() => { + // Logged inside the service; nothing actionable here. + }); + return applyCorsHeaders( Response.json( { @@ -195,7 +204,7 @@ async function __hono_POST( jobId: job.id, status: job.status, message: created - ? "Resume job created. Agent will restore from latest snapshot." + ? "Resume job created. Poll the job endpoint for status." : "Resume is already in progress.", }, polling: { diff --git a/packages/cloud-api/v1/eliza/agents/[agentId]/route.ts b/packages/cloud-api/v1/eliza/agents/[agentId]/route.ts index d584c15c38e8e..9ebb97c03c555 100644 --- a/packages/cloud-api/v1/eliza/agents/[agentId]/route.ts +++ b/packages/cloud-api/v1/eliza/agents/[agentId]/route.ts @@ -232,43 +232,56 @@ app.patch("/", async (c) => { }); } - const result = await elizaSandboxService.shutdown( - agentId, - user.organization_id, - ); - if (!result.success) { - const status = - result.error === "Agent not found" - ? 404 - : result.error === "Agent provisioning is in progress" - ? 409 - : 400; + // Enqueue `agent_suspend` job — the orchestrator does the docker stop + // via SSH and flips the DB. Workers can't SSH the cores; the previous + // inline `shutdown()` path silently failed to stop the container and + // left a stale DB row claiming `stopped` while the container kept + // running. See suspend/route.ts for the same refactor. + if (agent.status === "provisioning") { return c.json( - { - success: false, - error: result.error ?? `${parsed.data.action} failed`, - }, - status, + { success: false, error: "Agent provisioning is in progress" }, + 409, ); } - logger.info(`[agent-api] Agent ${parsed.data.action} complete`, { + const enqueueResult = await provisioningJobService.enqueueAgentSuspendOnce({ agentId, - orgId: user.organization_id, + organizationId: user.organization_id, + userId: user.id, }); - return c.json({ - success: true, - data: { + void provisioningJobService.triggerImmediate().catch(() => { + // Logged inside the service. + }); + + logger.info( + `[agent-api] Agent ${parsed.data.action} enqueued (suspend job)`, + { agentId, - action: parsed.data.action, - message: - parsed.data.action === "shutdown" - ? "Agent shutdown complete" - : "Agent suspended with snapshot. Use resume or provision to restart.", - previousStatus: agent.status, + orgId: user.organization_id, + jobId: enqueueResult.job.id, + created: enqueueResult.created, }, - }); + ); + + return c.json( + { + success: true, + created: enqueueResult.created, + alreadyInProgress: !enqueueResult.created, + data: { + agentId, + action: parsed.data.action, + jobId: enqueueResult.job.id, + status: enqueueResult.job.status, + message: enqueueResult.created + ? `${parsed.data.action} job created. Poll the job endpoint for status.` + : `${parsed.data.action} is already in progress.`, + previousStatus: agent.status, + }, + }, + 202, + ); } catch (error) { logger.error("[agent-api] PATCH /agents/:agentId error", { error }); return failureResponse(c, error); diff --git a/packages/cloud-api/v1/eliza/agents/[agentId]/suspend/route.ts b/packages/cloud-api/v1/eliza/agents/[agentId]/suspend/route.ts index 7a7f31efd9952..37437b406494a 100644 --- a/packages/cloud-api/v1/eliza/agents/[agentId]/suspend/route.ts +++ b/packages/cloud-api/v1/eliza/agents/[agentId]/suspend/route.ts @@ -2,6 +2,7 @@ import { Hono } from "hono"; import { errorToResponse } from "@/lib/api/errors"; import { requireAuthOrApiKeyWithOrg } from "@/lib/auth"; import { elizaSandboxService } from "@/lib/services/eliza-sandbox"; +import { provisioningJobService } from "@/lib/services/provisioning-jobs"; import { applyCorsHeaders, handleCorsOptions } from "@/lib/services/proxy/cors"; import { logger } from "@/lib/utils/logger"; import type { AppEnv } from "@/types/cloud-worker-env"; @@ -11,14 +12,20 @@ const CORS_METHODS = "POST, OPTIONS"; /** * POST /api/v1/eliza/agents/[agentId]/suspend * - * Gracefully suspend a running agent: - * 1. Takes a pre-shutdown snapshot (backup) of the agent's state - * 2. Stops and removes the Docker container - * 3. Updates status to "stopped" in DB + * Enqueues an `agent_suspend` job. The Hetzner orchestrator (which has SSH + * access to the cores) picks it up, runs `docker stop` on the container, + * flips the DB row to `stopped`, and clears `bridge_url`/`health_url`. The + * `sandbox_id` is retained so a later `agent_resume` job can `docker start` + * the same container without a full re-provision. * - * The agent can be resumed later via POST /api/v1/eliza/agents/[agentId]/resume - * or POST /api/v1/eliza/agents/[agentId]/provision, which will restore from - * the latest backup automatically. The agent may resume on a different node. + * Previously this route called `elizaSandboxService.shutdown()` inline, + * which only worked from a Node sidecar — Cloudflare Workers can't SSH the + * Hetzner cores, so the inline path silently failed to stop the container + * and the DB row showed `stopped` while the container kept burning RAM. + * + * Returns 202 with the job id; clients poll `/api/v1/jobs/` for the + * final status. Idempotent: a second suspend on the same agent while a job + * is in flight returns the existing job. */ async function __hono_POST( request: Request, @@ -62,43 +69,57 @@ async function __hono_POST( ); } - const result = await elizaSandboxService.shutdown( - agentId, - user.organization_id, - ); - - if (!result.success) { - const status = - result.error === "Agent not found" - ? 404 - : result.error === "Agent provisioning is in progress" - ? 409 - : 500; + if (agent.status === "provisioning") { return applyCorsHeaders( Response.json( - { success: false, error: result.error ?? "Suspend failed" }, - { status }, + { + success: false, + error: "Agent provisioning is in progress", + }, + { status: 409 }, ), CORS_METHODS, ); } - logger.info("[agent-api] Agent suspended", { + const enqueueResult = await provisioningJobService.enqueueAgentSuspendOnce({ + agentId, + organizationId: user.organization_id, + userId: user.id, + }); + + // Best-effort wake of the orchestrator so the user does not wait for the + // next cron tick. Same pattern as provision + delete. + void provisioningJobService.triggerImmediate().catch(() => { + // Logged inside the service; nothing actionable here. + }); + + logger.info("[agent-api] Agent suspend enqueued", { agentId, orgId: user.organization_id, + jobId: enqueueResult.job.id, + created: enqueueResult.created, }); return applyCorsHeaders( - Response.json({ - success: true, - data: { - agentId, - action: "suspend", - message: - "Agent suspended with snapshot. Use resume or provision to restart.", - previousStatus: agent.status, + Response.json( + { + success: true, + created: enqueueResult.created, + alreadyInProgress: !enqueueResult.created, + message: enqueueResult.created + ? "Suspend job created. Poll the job endpoint for status." + : "Suspend is already in progress.", + data: { + agentId, + action: "suspend", + jobId: enqueueResult.job.id, + status: enqueueResult.job.status, + previousStatus: agent.status, + }, }, - }), + { status: 202 }, + ), CORS_METHODS, ); } catch (error) { diff --git a/packages/cloud-shared/src/lib/services/__tests__/provisioning-job-types.test.ts b/packages/cloud-shared/src/lib/services/__tests__/provisioning-job-types.test.ts new file mode 100644 index 0000000000000..a7fd58c1a46d2 --- /dev/null +++ b/packages/cloud-shared/src/lib/services/__tests__/provisioning-job-types.test.ts @@ -0,0 +1,39 @@ +/** + * Smoke tests for the job-type registry. These catch the cheap-to-make + * mistakes that the orchestrator daemon can't recover from at runtime: + * a typo on the wire value (DB rows reference the string, not the + * symbol), a missing entry (daemon sees a job type it doesn't recognize + * and the job rots in `pending`), or accidental duplicates (two symbols + * mapped to the same wire value silently route to the wrong executor). + * + * The actual executor logic (SSH, advisory locks, DB writes) needs a + * test harness that the cloud-shared package doesn't have yet — those + * tests are a follow-up. + */ +import { describe, expect, test } from "bun:test"; +import { JOB_TYPES, type ProvisioningJobType } from "../provisioning-job-types"; + +describe("JOB_TYPES", () => { + test("includes the four registered job types", () => { + expect(JOB_TYPES.AGENT_PROVISION).toBe("agent_provision"); + expect(JOB_TYPES.AGENT_DELETE).toBe("agent_delete"); + expect(JOB_TYPES.AGENT_SUSPEND).toBe("agent_suspend"); + expect(JOB_TYPES.AGENT_RESUME).toBe("agent_resume"); + }); + + test("wire values are unique (no two symbols share a string)", () => { + const values = Object.values(JOB_TYPES); + expect(new Set(values).size).toBe(values.length); + }); + + test("wire values are snake_case (matches DB convention)", () => { + for (const value of Object.values(JOB_TYPES)) { + expect(value).toMatch(/^[a-z]+(?:_[a-z]+)+$/); + } + }); + + test("ProvisioningJobType narrows to the registered set", () => { + const known: ProvisioningJobType = "agent_suspend"; + expect(Object.values(JOB_TYPES)).toContain(known); + }); +}); diff --git a/packages/cloud-shared/src/lib/services/eliza-sandbox.ts b/packages/cloud-shared/src/lib/services/eliza-sandbox.ts index 7f720bcb06df5..ae41af29f757c 100644 --- a/packages/cloud-shared/src/lib/services/eliza-sandbox.ts +++ b/packages/cloud-shared/src/lib/services/eliza-sandbox.ts @@ -2703,6 +2703,111 @@ export class ElizaSandboxService { return result; } + /** + * Daemon-side handler for the `agent_suspend` job. SSH-stops the + * container, flips the DB row to `stopped`, clears bridge/health URLs + * but keeps `sandbox_id` for a subsequent `agent_resume` to docker + * start. Replaces the Worker-callable `shutdown()` path which silently + * failed to stop the container (Workers can't SSH). + */ + async executeSuspend( + agentId: string, + orgId: string, + ): Promise<{ success: boolean; containerStopped: boolean; error?: string }> { + return await dbWrite.transaction(async (tx) => { + await this.lockLifecycle(tx, agentId, orgId); + const rec = await this.getAgentForLifecycleMutation(tx, agentId, orgId); + if (!rec) + return { success: false, containerStopped: false, error: "Agent not found" } as const; + + const hasActiveProvisionJob = await this.hasActiveProvisionJobTx(tx, agentId, orgId); + if (rec.status === "provisioning" || hasActiveProvisionJob) { + return { + success: false, + containerStopped: false, + error: "Agent provisioning is in progress", + } as const; + } + if (rec.status === "stopped") return { success: true, containerStopped: true } as const; + + let containerStopped = false; + if (rec.sandbox_id) { + try { + await (await this.getProvider()).stop(rec.sandbox_id); + containerStopped = true; + } catch (e) { + if (this.isIgnorableSandboxStopError(e)) { + containerStopped = true; + logger.info("[agent-sandbox] Sandbox already absent during suspend", { + sandboxId: rec.sandbox_id, + error: e instanceof Error ? e.message : String(e), + }); + } else { + return { + success: false, + containerStopped: false, + error: e instanceof Error ? e.message : String(e), + } as const; + } + } + } else { + containerStopped = true; + } + + await tx.execute(sql` + UPDATE ${agentSandboxes} + SET status = 'stopped', bridge_url = NULL, health_url = NULL, updated_at = NOW() + WHERE id = ${rec.id} + `); + return { success: true, containerStopped } as const; + }); + } + + /** + * Daemon-side handler for the `agent_resume` job. Delegates to + * `provision()` which restores `bridge_url` / `health_url` from the + * provider's sandbox handle and reuses the existing Neon DB + * (`sandbox_id` is retained across suspend). `provision()` acquires + * its own advisory lock, so two concurrent resume jobs serialize. + * + * A future fast path will `docker start` the existing container (~5s) + * when the provider exposes a standalone `start()` method that + * returns a fresh handle — today the only way to get `bridgeUrl` / + * `healthUrl` back is via the create-or-restart flow inside + * `provision()`, so we always pay that path. + */ + async executeResume( + agentId: string, + orgId: string, + ): Promise<{ + success: boolean; + containerStarted: boolean; + reprovisioned: boolean; + error?: string; + }> { + const rec = await agentSandboxesRepository.findByIdAndOrg(agentId, orgId); + if (!rec) + return { + success: false, + containerStarted: false, + reprovisioned: false, + error: "Agent not found", + }; + if (rec.status === "running") + return { success: true, containerStarted: true, reprovisioned: false }; + + const provisionResult = await this.provision(agentId, orgId); + if (!provisionResult.success) { + return { + success: false, + containerStarted: false, + reprovisioned: true, + error: provisionResult.error, + }; + } + return { success: true, containerStarted: true, reprovisioned: true }; + } + // Private helpers private async lockLifecycle(tx: LifecycleTx, agentId: string, orgId: string): Promise { diff --git a/packages/cloud-shared/src/lib/services/provisioning-job-types.ts b/packages/cloud-shared/src/lib/services/provisioning-job-types.ts index 624cb1701fe6c..4f5b7c960107d 100644 --- a/packages/cloud-shared/src/lib/services/provisioning-job-types.ts +++ b/packages/cloud-shared/src/lib/services/provisioning-job-types.ts @@ -1,6 +1,8 @@ export const JOB_TYPES = { AGENT_PROVISION: "agent_provision", AGENT_DELETE: "agent_delete", + AGENT_SUSPEND: "agent_suspend", + AGENT_RESUME: "agent_resume", } as const; export type ProvisioningJobType = (typeof JOB_TYPES)[keyof typeof JOB_TYPES]; diff --git a/packages/cloud-shared/src/lib/services/provisioning-jobs.ts b/packages/cloud-shared/src/lib/services/provisioning-jobs.ts index 0e181e5f1c522..e56e526eef946 100644 --- a/packages/cloud-shared/src/lib/services/provisioning-jobs.ts +++ b/packages/cloud-shared/src/lib/services/provisioning-jobs.ts @@ -48,6 +48,18 @@ export interface AgentDeleteJobData { userId: string; } +export interface AgentSuspendJobData { + agentId: string; + organizationId: string; + userId: string; +} + +export interface AgentResumeJobData { + agentId: string; + organizationId: string; + userId: string; +} + // --------------------------------------------------------------------------- // Job result shapes (stored in jobs.result JSONB) // --------------------------------------------------------------------------- @@ -67,6 +79,19 @@ export interface AgentDeleteJobResult { error?: string; } +export interface AgentSuspendJobResult { + cloudAgentId: string; + containerStopped: boolean; + error?: string; +} + +export interface AgentResumeJobResult { + cloudAgentId: string; + containerStarted: boolean; + reprovisioned: boolean; + error?: string; +} + function agentProvisionJobDataToRecord(data: AgentProvisionJobData): Record { return { ...data }; } @@ -83,6 +108,22 @@ function agentDeleteJobResultToRecord(result: AgentDeleteJobResult): Record { + return { ...data }; +} + +function agentSuspendJobResultToRecord(result: AgentSuspendJobResult): Record { + return { ...result }; +} + +function agentResumeJobDataToRecord(data: AgentResumeJobData): Record { + return { ...data }; +} + +function agentResumeJobResultToRecord(result: AgentResumeJobResult): Record { + return { ...result }; +} + function isAgentProvisionJobData(value: unknown): value is AgentProvisionJobData { return ( typeof value === "object" && @@ -118,6 +159,40 @@ function readAgentDeleteJobData(job: Job): AgentDeleteJobData { return job.data; } +function isAgentSuspendJobData(value: unknown): value is AgentSuspendJobData { + return ( + typeof value === "object" && + value !== null && + typeof (value as { agentId?: unknown }).agentId === "string" && + typeof (value as { organizationId?: unknown }).organizationId === "string" && + typeof (value as { userId?: unknown }).userId === "string" + ); +} + +function readAgentSuspendJobData(job: Job): AgentSuspendJobData { + if (!isAgentSuspendJobData(job.data)) { + throw new Error(`Invalid agent suspend job data for job ${job.id}`); + } + return job.data; +} + +function isAgentResumeJobData(value: unknown): value is AgentResumeJobData { + return ( + typeof value === "object" && + value !== null && + typeof (value as { agentId?: unknown }).agentId === "string" && + typeof (value as { organizationId?: unknown }).organizationId === "string" && + typeof (value as { userId?: unknown }).userId === "string" + ); +} + +function readAgentResumeJobData(job: Job): AgentResumeJobData { + if (!isAgentResumeJobData(job.data)) { + throw new Error(`Invalid agent resume job data for job ${job.id}`); + } + return job.data; +} + export interface EnqueueAgentProvisionResult { job: Job; created: boolean; @@ -128,6 +203,16 @@ export interface EnqueueAgentDeleteResult { created: boolean; } +export interface EnqueueAgentSuspendResult { + job: Job; + created: boolean; +} + +export interface EnqueueAgentResumeResult { + job: Job; + created: boolean; +} + // --------------------------------------------------------------------------- // Public API // --------------------------------------------------------------------------- @@ -357,6 +442,200 @@ export class ProvisioningJobService { }); } + /** + * Enqueue an Agent suspend job. + * + * Daemon-side execution: SSH `docker stop` on the assigned core, flip + * `agent_sandboxes.status` to "stopped", clear `bridge_url`/`health_url`, + * keep `sandbox_id` so the same container can be resumed. + * + * The Cloudflare Worker code path (cloud-api PATCH /eliza/agents/[id]) + * cannot SSH the Hetzner cores; this queue-based path moves the actual + * docker stop off the Worker so the container is reliably stopped instead + * of silently leaking with a stale DB row. + */ + async enqueueAgentSuspendOnce(params: { + agentId: string; + organizationId: string; + userId: string; + webhookUrl?: string; + }): Promise { + if (params.webhookUrl) { + await assertSafeOutboundUrl(params.webhookUrl); + } + + const jobData: AgentSuspendJobData = { + agentId: params.agentId, + organizationId: params.organizationId, + userId: params.userId, + }; + + const newJob: NewJob = { + type: JOB_TYPES.AGENT_SUSPEND, + status: "pending", + data: agentSuspendJobDataToRecord(jobData), + data_storage: "inline", + organization_id: params.organizationId, + user_id: params.userId, + webhook_url: params.webhookUrl, + max_attempts: 3, + // Same timing as delete: SSH stop is fast. + estimated_completion_at: new Date(Date.now() + 30_000), + }; + + return await dbWrite.transaction(async (tx) => { + await tx.execute(elizaProvisionAdvisoryLockSql(params.organizationId, params.agentId)); + + const [sandbox] = await tx + .select({ id: agentSandboxes.id, status: agentSandboxes.status }) + .from(agentSandboxes) + .where( + and( + eq(agentSandboxes.id, params.agentId), + eq(agentSandboxes.organization_id, params.organizationId), + ), + ) + .limit(1); + + if (!sandbox) { + throw new Error("Agent not found"); + } + + const [existing] = await tx + .select() + .from(jobs) + .where( + and( + eq(jobs.type, JOB_TYPES.AGENT_SUSPEND), + eq(jobs.organization_id, params.organizationId), + eq(jobs.agent_id, params.agentId), + sql`${jobs.status} IN ('pending', 'in_progress')`, + ), + ) + .orderBy(desc(jobs.created_at)) + .limit(1); + + if (existing) { + logger.info("[provisioning-jobs] Reusing active agent_suspend job", { + jobId: existing.id, + agentId: params.agentId, + orgId: params.organizationId, + }); + return { job: await hydrateJob(existing), created: false }; + } + + const [job] = await tx + .insert(jobs) + .values(await prepareJobInsertData(newJob)) + .returning(); + + logger.info("[provisioning-jobs] Enqueued agent_suspend job", { + jobId: job.id, + agentId: params.agentId, + orgId: params.organizationId, + }); + + return { job: await hydrateJob(job), created: true }; + }); + } + + /** + * Enqueue an Agent resume job. + * + * Daemon-side execution re-runs `provision()` against the existing + * sandbox row: this restores `bridge_url` / `health_url` from a fresh + * sandbox handle and reuses the existing Neon DB (the `sandbox_id` is + * retained across suspend). A faster `docker start` path will replace + * the re-provision once `DockerSandboxProvider` exposes a standalone + * `start()` that returns the handle. + */ + async enqueueAgentResumeOnce(params: { + agentId: string; + organizationId: string; + userId: string; + webhookUrl?: string; + }): Promise { + if (params.webhookUrl) { + await assertSafeOutboundUrl(params.webhookUrl); + } + + const jobData: AgentResumeJobData = { + agentId: params.agentId, + organizationId: params.organizationId, + userId: params.userId, + }; + + const newJob: NewJob = { + type: JOB_TYPES.AGENT_RESUME, + status: "pending", + data: agentResumeJobDataToRecord(jobData), + data_storage: "inline", + organization_id: params.organizationId, + user_id: params.userId, + webhook_url: params.webhookUrl, + max_attempts: 3, + // docker start is fast (~5s) unless the resume falls back to a full + // re-provision; budget the longer path so the UI doesn't show a + // misleading "stuck" estimate. + estimated_completion_at: new Date(Date.now() + 90_000), + }; + + return await dbWrite.transaction(async (tx) => { + await tx.execute(elizaProvisionAdvisoryLockSql(params.organizationId, params.agentId)); + + const [sandbox] = await tx + .select({ id: agentSandboxes.id, status: agentSandboxes.status }) + .from(agentSandboxes) + .where( + and( + eq(agentSandboxes.id, params.agentId), + eq(agentSandboxes.organization_id, params.organizationId), + ), + ) + .limit(1); + + if (!sandbox) { + throw new Error("Agent not found"); + } + + const [existing] = await tx + .select() + .from(jobs) + .where( + and( + eq(jobs.type, JOB_TYPES.AGENT_RESUME), + eq(jobs.organization_id, params.organizationId), + eq(jobs.agent_id, params.agentId), + sql`${jobs.status} IN ('pending', 'in_progress')`, + ), + ) + .orderBy(desc(jobs.created_at)) + .limit(1); + + if (existing) { + logger.info("[provisioning-jobs] Reusing active agent_resume job", { + jobId: existing.id, + agentId: params.agentId, + orgId: params.organizationId, + }); + return { job: await hydrateJob(existing), created: false }; + } + + const [job] = await tx + .insert(jobs) + .values(await prepareJobInsertData(newJob)) + .returning(); + + logger.info("[provisioning-jobs] Enqueued agent_resume job", { + jobId: job.id, + agentId: params.agentId, + orgId: params.organizationId, + }); + + return { job: await hydrateJob(job), created: true }; + }); + } + /** * Best-effort kick of the provisioning worker without waiting for the * next cron tick. Fire-and-forget — the cron is the safety net. @@ -585,11 +864,116 @@ export class ProvisioningJobService { case JOB_TYPES.AGENT_DELETE: await this.executeAgentDelete(job); break; + case JOB_TYPES.AGENT_SUSPEND: + await this.executeAgentSuspend(job); + break; + case JOB_TYPES.AGENT_RESUME: + await this.executeAgentResume(job); + break; default: throw new Error(`Unknown job type: ${job.type}`); } } + private async executeAgentSuspend(job: Job): Promise { + const data = readAgentSuspendJobData(job); + + if (data.organizationId !== job.organization_id) { + throw new Error( + `Organization ID mismatch: job.data.organizationId (${data.organizationId}) !== job.organization_id (${job.organization_id})`, + ); + } + + logger.info("[provisioning-jobs] Executing agent_suspend", { + jobId: job.id, + agentId: data.agentId, + }); + + const result = await elizaSandboxService.executeSuspend(data.agentId, data.organizationId); + + if (!result.success) { + await jobsRepository.update(job.id, { + result: agentSuspendJobResultToRecord({ + cloudAgentId: data.agentId, + containerStopped: result.containerStopped, + error: result.error, + }), + }); + throw new Error(result.error ?? "Unknown agent_suspend failure"); + } + + const jobResult: AgentSuspendJobResult = { + cloudAgentId: data.agentId, + containerStopped: result.containerStopped, + }; + + await jobsRepository.updateStatus(job.id, "completed", { + result: agentSuspendJobResultToRecord(jobResult), + completed_at: new Date(), + }); + + if (job.webhook_url) { + await this.fireWebhook(job, jobResult); + } + + logger.info("[provisioning-jobs] agent_suspend completed", { + jobId: job.id, + agentId: data.agentId, + containerStopped: result.containerStopped, + }); + } + + private async executeAgentResume(job: Job): Promise { + const data = readAgentResumeJobData(job); + + if (data.organizationId !== job.organization_id) { + throw new Error( + `Organization ID mismatch: job.data.organizationId (${data.organizationId}) !== job.organization_id (${job.organization_id})`, + ); + } + + logger.info("[provisioning-jobs] Executing agent_resume", { + jobId: job.id, + agentId: data.agentId, + }); + + const result = await elizaSandboxService.executeResume(data.agentId, data.organizationId); + + if (!result.success) { + await jobsRepository.update(job.id, { + result: agentResumeJobResultToRecord({ + cloudAgentId: data.agentId, + containerStarted: result.containerStarted, + reprovisioned: result.reprovisioned, + error: result.error, + }), + }); + throw new Error(result.error ?? "Unknown agent_resume failure"); + } + + const jobResult: AgentResumeJobResult = { + cloudAgentId: data.agentId, + containerStarted: result.containerStarted, + reprovisioned: result.reprovisioned, + }; + + await jobsRepository.updateStatus(job.id, "completed", { + result: agentResumeJobResultToRecord(jobResult), + completed_at: new Date(), + }); + + if (job.webhook_url) { + await this.fireWebhook(job, jobResult); + } + + logger.info("[provisioning-jobs] agent_resume completed", { + jobId: job.id, + agentId: data.agentId, + containerStarted: result.containerStarted, + reprovisioned: result.reprovisioned, + }); + } + private async executeAgentDelete(job: Job): Promise { const data = readAgentDeleteJobData(job); @@ -753,7 +1137,11 @@ export class ProvisioningJobService { private async fireWebhook( job: Job, - result: AgentProvisionJobResult | AgentDeleteJobResult, + result: + | AgentProvisionJobResult + | AgentDeleteJobResult + | AgentSuspendJobResult + | AgentResumeJobResult, ): Promise { if (!job.webhook_url) return;