From b555e5dd2eb4fd22c14019a630bcd9c1daa34b05 Mon Sep 17 00:00:00 2001 From: zozo123 Date: Sun, 7 Jun 2026 14:45:20 +0300 Subject: [PATCH] Add crabbox.sh by OpenClaw remote execution integration --- ai/examples/23-crabbox-mcp-run.json | 32 ++ ai/examples/24-crabbox-simple-worker.json | 31 ++ ai/examples/README.md | 49 ++++ docs/devguide/ai/crabbox-sandbox.md | 220 ++++++++++++++ docs/devguide/ai/index.md | 1 + integrations/crabbox/README.md | 136 +++++++++ .../crabbox/conductor_crabbox_worker.py | 186 ++++++++++++ integrations/crabbox/crabbox.islo.yaml | 25 ++ integrations/crabbox/crabbox_bridge.py | 277 ++++++++++++++++++ integrations/crabbox/crabbox_mcp_server.py | 89 ++++++ integrations/crabbox/requirements.txt | 1 + .../crabbox/taskdefs/crabbox_run_taskdef.json | 27 ++ mkdocs.yml | 1 + 13 files changed, 1075 insertions(+) create mode 100644 ai/examples/23-crabbox-mcp-run.json create mode 100644 ai/examples/24-crabbox-simple-worker.json create mode 100644 docs/devguide/ai/crabbox-sandbox.md create mode 100644 integrations/crabbox/README.md create mode 100644 integrations/crabbox/conductor_crabbox_worker.py create mode 100644 integrations/crabbox/crabbox.islo.yaml create mode 100644 integrations/crabbox/crabbox_bridge.py create mode 100644 integrations/crabbox/crabbox_mcp_server.py create mode 100644 integrations/crabbox/requirements.txt create mode 100644 integrations/crabbox/taskdefs/crabbox_run_taskdef.json diff --git a/ai/examples/23-crabbox-mcp-run.json b/ai/examples/23-crabbox-mcp-run.json new file mode 100644 index 0000000000..7848ea9ae5 --- /dev/null +++ b/ai/examples/23-crabbox-mcp-run.json @@ -0,0 +1,32 @@ +{ + "name": "crabbox_mcp_run_workflow", + "description": "Run a trusted command through a Crabbox-backed MCP tool.", + "version": 1, + "schemaVersion": 2, + "inputParameters": [ + "mcpServer", + "command", + "workspaceDir" + ], + "tasks": [ + { + "name": "run_with_crabbox", + "taskReferenceName": "crabbox_run", + "type": "CALL_MCP_TOOL", + "inputParameters": { + "mcpServer": "${workflow.input.mcpServer}", + "method": "crabbox_run_command", + "arguments": { + "provider": "local-container", + "command": "${workflow.input.command}", + "workspace_dir": "${workflow.input.workspaceDir}", + "shell": true, + "timeout_seconds": 900 + } + } + } + ], + "outputParameters": { + "result": "${crabbox_run.output.content}" + } +} diff --git a/ai/examples/24-crabbox-simple-worker.json b/ai/examples/24-crabbox-simple-worker.json new file mode 100644 index 0000000000..664f274384 --- /dev/null +++ b/ai/examples/24-crabbox-simple-worker.json @@ -0,0 +1,31 @@ +{ + "name": "crabbox_simple_worker_workflow", + "description": "Run a trusted command through an external Crabbox SIMPLE worker.", + "version": 1, + "schemaVersion": 2, + "inputParameters": [ + "command", + "workspaceDir" + ], + "tasks": [ + { + "name": "crabbox_run", + "taskReferenceName": "crabbox_run", + "type": "SIMPLE", + "inputParameters": { + "provider": "local-container", + "command": "${workflow.input.command}", + "workspaceDir": "${workflow.input.workspaceDir}", + "shell": true, + "timeoutSeconds": 900, + "keepOnFailure": true + } + } + ], + "outputParameters": { + "status": "${crabbox_run.output.status}", + "exitCode": "${crabbox_run.output.exitCode}", + "reason": "${crabbox_run.output.reason}", + "outputTail": "${crabbox_run.output.outputTail}" + } +} diff --git a/ai/examples/README.md b/ai/examples/README.md index aa9a1970e9..529cb63d81 100644 --- a/ai/examples/README.md +++ b/ai/examples/README.md @@ -56,6 +56,14 @@ mcp-testkit --transport http The server will be available at `http://localhost:3001/mcp`. +### 4. Crabbox Bridge (for Crabbox examples) + +Install Crabbox and start either the example MCP server or the example +`SIMPLE` worker from `integrations/crabbox`. The examples use Crabbox's +`local-container` provider by default, which needs Docker but no cloud +credentials. Set `provider` to `islo` only when `CRABBOX_ISLO_API_KEY` or +`ISLO_API_KEY` is available. + --- ## Available Examples @@ -84,6 +92,8 @@ The server will be available at `http://localhost:3001/mcp`. | `20-extended-thinking.json` | Extended thinking with token budget for reasoning | Anthropic | | `21-web-search-research-agent.json` | Research agent: web search → synthesize → PDF | OpenAI, Anthropic | | `22-multi-turn-chain.json` | Multi-turn conversation chaining with previousResponseId | OpenAI | +| `23-crabbox-mcp-run.json` | Run a trusted command through a Crabbox MCP tool | Crabbox, MCP Server | +| `24-crabbox-simple-worker.json` | Run a trusted command through an external Crabbox worker | Crabbox, SIMPLE Worker | --- @@ -438,6 +448,45 @@ curl -X POST 'http://localhost:8080/api/workflow/multi_turn_chain' \ -d '{"topic": "Real-time collaborative document editor"}' ``` +### 23. Crabbox MCP Run + +```bash +# Start integrations/crabbox/crabbox_mcp_server.py first + +# Register +curl -X POST 'http://localhost:8080/api/metadata/workflow' \ + -H 'Content-Type: application/json' \ + -d @23-crabbox-mcp-run.json + +# Execute +curl -X POST 'http://localhost:8080/api/workflow/crabbox_mcp_run_workflow' \ + -H 'Content-Type: application/json' \ + -d '{ + "mcpServer": "http://localhost:3001/mcp", + "command": "python3 --version", + "workspaceDir": "/path/to/repo" + }' +``` + +### 24. Crabbox SIMPLE Worker + +```bash +# Register the task definition and start integrations/crabbox/conductor_crabbox_worker.py first + +# Register +curl -X POST 'http://localhost:8080/api/metadata/workflow' \ + -H 'Content-Type: application/json' \ + -d @24-crabbox-simple-worker.json + +# Execute +curl -X POST 'http://localhost:8080/api/workflow/crabbox_simple_worker_workflow' \ + -H 'Content-Type: application/json' \ + -d '{ + "command": "python3 --version", + "workspaceDir": "/path/to/repo" + }' +``` + --- ## Register All Workflows at Once diff --git a/docs/devguide/ai/crabbox-sandbox.md b/docs/devguide/ai/crabbox-sandbox.md new file mode 100644 index 0000000000..b199c81d7f --- /dev/null +++ b/docs/devguide/ai/crabbox-sandbox.md @@ -0,0 +1,220 @@ +--- +description: "Run trusted Conductor agent, build, and test steps through Crabbox remote execution using an external worker or MCP bridge." +--- + +# Crabbox Remote Execution + +[Crabbox](https://crabbox.sh/) is a remote software testing and execution +control plane. A `crabbox run` command leases or delegates to a remote box, +syncs the tracked and non-ignored local checkout, executes a command, streams +output back, and releases the target. + +In Conductor, Crabbox fits best as an **external execution provider** for +trusted agent, build, and test workflows. Conductor owns durable orchestration, +state, retries, and task history. Crabbox owns remote execution capacity, +checkout sync, command streaming, and lease cleanup. + +Do not treat Crabbox as Conductor's hard security boundary for hostile code or +mutually untrusted tenants. Crabbox's security model is designed for trusted +operators on a shared team. Per-lease and per-tenant isolation is not the +current security boundary. + +## Architecture + +```mermaid +flowchart LR + workflow["Conductor Workflow"] --> simpleTask["SIMPLE Task"] + simpleTask --> worker["Crabbox Worker"] + worker --> crabboxCli["crabbox run"] + crabboxCli --> remoteRunner["Remote Runner"] + remoteRunner --> worker + worker --> workflow + + workflow --> mcpTask["CALL_MCP_TOOL"] + mcpTask --> mcpServer["Crabbox MCP Server"] + mcpServer --> crabboxCli +``` + +There are two integration modes: + +- Use a `SIMPLE` worker when a workflow has a known build, test, or command step + and the task definition should control timeouts and retries. +- Use an MCP server when an agent workflow already discovers and invokes tools + with `LIST_MCP_TOOLS` and `CALL_MCP_TOOL`. + +The example implementation lives in +[`integrations/crabbox`](https://github.com/conductor-oss/conductor/tree/main/integrations/crabbox). + +## Quick Start + +The simplest smoke test uses Crabbox's `local-container` provider. It requires a +Docker-compatible runtime, but no cloud credentials: + +```bash +crabbox run --provider local-container --shell 'python3 --version' +``` + +Use `provider: islo`, `aws`, `hetzner`, or another Crabbox provider when remote +capacity is configured. + +The bridge passes the `provider` value through to Crabbox, so it supports any +provider available in the installed `crabbox` binary. Provider-specific +credentials stay in the worker environment or Crabbox config files. + +## SIMPLE Worker + +The `conductor_crabbox_worker.py` example polls a task named `crabbox_run`. +For each task, it: + +1. Builds an allowlisted `crabbox run` command from task input. +2. Streams Crabbox output into Conductor task logs. +3. Sends `IN_PROGRESS` updates with `callbackAfterSeconds` while output is + streaming. +4. Maps the final Crabbox result to a Conductor task status. + +Register the task definition: + +```bash +curl -X POST 'http://localhost:8080/api/metadata/taskdefs' \ + -H 'Content-Type: application/json' \ + -d @integrations/crabbox/taskdefs/crabbox_run_taskdef.json +``` + +Start the worker: + +```bash +export CONDUCTOR_SERVER_URL=http://localhost:8080 +python3 integrations/crabbox/conductor_crabbox_worker.py +``` + +Route Crabbox tasks to dedicated worker capacity with task domains: + +```json +{ + "name": "crabbox_simple_worker_workflow", + "version": 1, + "input": { + "command": "python3 -m pytest", + "workspaceDir": "/path/to/repo" + }, + "taskToDomain": { + "crabbox_run": "remote-sandbox" + } +} +``` + +Then start the worker with `CONDUCTOR_CRABBOX_DOMAIN=remote-sandbox`. + +## MCP Bridge + +The `crabbox_mcp_server.py` example exposes two tools: + +| Tool | Purpose | +|---|---| +| `crabbox_run_command` | Run a trusted command through `crabbox run`. | +| `crabbox_doctor` | Check local Crabbox configuration for a provider. | + +Start the MCP server: + +```bash +python3 -m pip install -r integrations/crabbox/requirements.txt +python3 integrations/crabbox/crabbox_mcp_server.py --host 127.0.0.1 --port 3001 +``` + +Conductor can call it with a `CALL_MCP_TOOL` task: + +```json +{ + "name": "run_with_crabbox", + "taskReferenceName": "crabbox_run", + "type": "CALL_MCP_TOOL", + "inputParameters": { + "mcpServer": "http://localhost:3001/mcp", + "method": "crabbox_run_command", + "arguments": { + "provider": "local-container", + "command": "python3 -m pytest", + "workspace_dir": "/path/to/repo", + "shell": true, + "timeout_seconds": 900 + } + } +} +``` + +## Task Contract + +The `crabbox_run` worker accepts this input: + +| Field | Type | Default | Description | +|---|---|---|---| +| `command` | string or string array | required | Command to run remotely. Strings use `--shell` by default. | +| `provider` | string | `local-container` | Any provider supported by the installed `crabbox` binary. | +| `workspaceDir` | string | worker current directory | Checkout to sync. | +| `profile` | string | none | Crabbox profile name. | +| `leaseId` / `id` | string | none | Existing lease or sandbox to reuse. | +| `class` / `machineClass` | string | none | Crabbox machine class. | +| `target` | string | provider default | Crabbox target OS. | +| `ttl` | string | provider default | Maximum lease lifetime. | +| `idleTimeout` | string | provider default | Idle lease timeout. | +| `keep` | boolean | `false` | Keep the lease after success. | +| `keepOnFailure` | boolean | `false` | Keep a failed one-shot lease for debugging. | +| `noSync` | boolean | `false` | Skip sync when the provider supports it. | +| `fullResync` | boolean | `false` | Force fresh sync when the provider supports it. | +| `preflight` | boolean | `false` | Run Crabbox preflight before the command. | +| `timeoutSeconds` | number | none | Local timeout for the Crabbox process. | +| `env` | object | `{}` | Environment for the local Crabbox CLI process. | + +The worker output contains: + +| Field | Description | +|---|---| +| `status` | Conductor terminal status selected by the bridge. | +| `exitCode` | Crabbox process exit code. | +| `reason` | Human-readable completion or failure reason. | +| `retryable` | Whether the bridge classified the failure as retryable. | +| `provider` | Crabbox provider used for the run. | +| `durationSeconds` | Local wall-clock duration. | +| `timing` | Parsed `--timing-json` record when Crabbox emits one. | +| `output` | Captured output, bounded by the bridge. | +| `outputTail` | Last lines of output for workflow summaries. | +| `truncated` | Whether captured output hit the bridge limit. | + +## Failure And Retry Semantics + +The bridge maps failures conservatively: + +| Crabbox outcome | Conductor status | Retry guidance | +|---|---|---| +| Exit code `0` | `COMPLETED` | No retry. | +| Auth, config, usage, or invalid provider errors | `FAILED_WITH_TERMINAL_ERROR` | Fix configuration before rerun. | +| Capacity, provisioning, sync, stream, timeout, or remote command failures | `FAILED` | Conductor may retry if the task definition allows it. | + +Conductor retries can duplicate the remote command. Use retries only for +idempotent commands or commands that are safe to rerun from a fresh remote +sandbox. + +## Operational Guardrails + +- Keep Crabbox broker tokens and provider credentials in worker or user config, + not in workflow definitions. +- Use a self-hosted Crabbox broker for shared teams when using brokered cloud + providers so provider credentials, cost caps, usage accounting, and cleanup + are centralized. +- Make source upload explicit. `crabbox run` syncs tracked and non-ignored files + unless configuration narrows the manifest. +- Prefer narrow `sync.include`, `sync.exclude`, `env.allow`, TTL, and idle + timeout settings in a project-specific Crabbox config. +- Put Crabbox-backed tasks in a Conductor task domain or dedicated worker pool + so remote execution work does not starve normal workers. +- Do not forward secrets by default. Crabbox forwards remote environment values + by name allowlist; keep that allowlist short. +- Review captured logs, artifacts, and failure bundles before sharing them + outside the trusted team. + +## Examples + +- [`ai/examples/23-crabbox-mcp-run.json`](https://github.com/conductor-oss/conductor/blob/main/ai/examples/23-crabbox-mcp-run.json) + calls Crabbox through an MCP tool. +- [`ai/examples/24-crabbox-simple-worker.json`](https://github.com/conductor-oss/conductor/blob/main/ai/examples/24-crabbox-simple-worker.json) + schedules a `SIMPLE` task for the external Crabbox worker. diff --git a/docs/devguide/ai/index.md b/docs/devguide/ai/index.md index f7602419c2..cf9bc00127 100644 --- a/docs/devguide/ai/index.md +++ b/docs/devguide/ai/index.md @@ -82,6 +82,7 @@ Conductor provides all of this as infrastructure. Your agent code focuses on the - **[AI & LLM Recipes](../cookbook/ai-llm.md)** — Ready-to-use recipes: chat completion, RAG, MCP agents, web search, code execution, coding agents, extended thinking, and more. - **[LLM Orchestration](llm-orchestration.md)** — Native LLM providers, built-in tools, vector databases, and content generation. - **[MCP Integration](mcp-guide.md)** — Connect to any MCP server, expose workflows as MCP tools, multi-server agents. +- **[Crabbox Remote Execution](crabbox-sandbox.md)** — Run trusted agent, build, and test steps on remote Crabbox capacity through a worker or MCP bridge. - **[Production Agent Architecture](production-agent-architecture.md)** — The canonical reference architecture for a durable production agent. End-to-end pattern with every primitive mapped. - **[Failure Semantics for AI Agents](failure-semantics.md)** — The exact failure contract: what happens under crashes, retries, duplicates, long waits, and partial side effects. - **[Why Conductor for Agents](why-conductor.md)** — What Conductor gives you out of the box for agentic workflows. diff --git a/integrations/crabbox/README.md b/integrations/crabbox/README.md new file mode 100644 index 0000000000..db6708d271 --- /dev/null +++ b/integrations/crabbox/README.md @@ -0,0 +1,136 @@ +# Crabbox Integration + +This directory contains example bridges for running trusted Conductor workloads +through [Crabbox](https://crabbox.sh/). Crabbox leases or delegates to a remote +Linux sandbox, syncs the local checkout, runs a command, streams output back, +and cleans up. + +Use this integration when Conductor should orchestrate durable agent, build, or +test steps while Crabbox supplies remote execution capacity. Do not use it as a +hard security boundary for mutually untrusted tenants. + +## Quick Start + +The fastest smoke path uses Crabbox's `local-container` provider. It requires a +Docker-compatible runtime, but no cloud or Islo credentials. + +```bash +crabbox run --provider local-container --shell 'python3 --version' +``` + +Run the bridge validation through Crabbox: + +```bash +crabbox run --provider local-container --shell \ + 'python3 -m py_compile integrations/crabbox/*.py && python3 -m json.tool ai/examples/23-crabbox-mcp-run.json >/dev/null && python3 -m json.tool ai/examples/24-crabbox-simple-worker.json >/dev/null && python3 -m json.tool integrations/crabbox/taskdefs/crabbox_run_taskdef.json >/dev/null' +``` + +The bridge passes `provider` through to Crabbox, so it can use any provider the +installed `crabbox` binary supports. Keep provider-specific credentials and +config in the worker environment or Crabbox config files. + +## Trust Boundary + +Crabbox is built for trusted operators on a shared team. A task routed through +this bridge can run arbitrary commands on the remote box, and any environment +value allowed through Crabbox configuration may be visible to that box. Keep +provider tokens and broker credentials in the worker environment or Crabbox user +config, never in workflow definitions. + +Use Conductor task domains or dedicated worker pools for Crabbox tasks, and use +Crabbox `sync.include`, `sync.exclude`, `env.allow`, TTL, and idle-timeout +settings to keep the remote execution scope narrow. Conductor retries can rerun +the same command on a new sandbox, so only enable retries for idempotent work. + +## Integration Modes + +### SIMPLE Worker + +`conductor_crabbox_worker.py` polls a `SIMPLE` task named `crabbox_run`, executes +the task input with `crabbox run`, streams task logs, sends `IN_PROGRESS` +heartbeats, and returns a terminal Conductor task status. + +Register the task definition: + +```bash +curl -X POST 'http://localhost:8080/api/metadata/taskdefs' \ + -H 'Content-Type: application/json' \ + -d @integrations/crabbox/taskdefs/crabbox_run_taskdef.json +``` + +Start the worker: + +```bash +export CONDUCTOR_SERVER_URL=http://localhost:8080 +python3 integrations/crabbox/conductor_crabbox_worker.py +``` + +Set `CONDUCTOR_CRABBOX_DOMAIN` when routing these tasks through a dedicated +Conductor task domain. + +### MCP Server + +`crabbox_mcp_server.py` exposes `crabbox_run_command` and `crabbox_doctor` over +MCP. Conductor workflows can call those tools with `LIST_MCP_TOOLS` and +`CALL_MCP_TOOL`. + +```bash +python3 -m pip install -r integrations/crabbox/requirements.txt +python3 integrations/crabbox/crabbox_mcp_server.py --host 127.0.0.1 --port 3001 +``` + +The MCP endpoint is `http://localhost:3001/mcp`. + +## Task Input Contract + +The `crabbox_run` worker accepts these inputs: + +| Field | Type | Default | Description | +|---|---|---|---| +| `command` | string or string array | required | Command to run remotely. Strings run through `--shell` by default. | +| `provider` | string | `local-container` | Any provider supported by the installed `crabbox` binary, for example `local-container`, `islo`, `aws`, `hetzner`, or `ssh`. | +| `workspaceDir` | string | current directory | Local checkout to sync. | +| `profile` | string | none | Crabbox profile name. | +| `leaseId` / `id` | string | none | Existing lease or provider sandbox to reuse. | +| `class` / `machineClass` | string | none | Crabbox machine class. | +| `target` | string | provider default | Crabbox target OS. | +| `ttl` | string | provider default | Maximum lease lifetime. | +| `idleTimeout` | string | provider default | Idle lease timeout. | +| `keep` | boolean | `false` | Keep the lease after success. | +| `keepOnFailure` | boolean | `false` | Keep failed one-shot leases for debugging. | +| `noSync` | boolean | `false` | Skip sync when supported by the provider. | +| `fullResync` | boolean | `false` | Force a fresh sync when supported by the provider. | +| `preflight` | boolean | `false` | Run Crabbox preflight before the command. | +| `timeoutSeconds` | number | none | Local worker timeout for the Crabbox process. | +| `env` | object | `{}` | Extra environment values for the Crabbox CLI process. Prefer Crabbox `env.allow` for remote forwarding. | + +The worker returns `status`, `exitCode`, `reason`, `retryable`, `provider`, +`durationSeconds`, `timing`, `output`, `outputTail`, and `truncated`. + +## Failure Mapping + +The bridge maps Crabbox results into Conductor task statuses: + +| Crabbox result | Conductor status | +|---|---| +| Exit code `0` | `COMPLETED` | +| Auth, config, usage, or invalid provider errors | `FAILED_WITH_TERMINAL_ERROR` | +| Capacity, provisioning, sync, timeout, stream, or remote command failures | `FAILED` | + +Conductor retries should be reserved for idempotent commands. A task retry may +start a new remote sandbox and rerun the command. + +## Islo Proof Runs + +`crabbox.islo.yaml` is a narrow config for validating this integration through +Crabbox's Islo delegated provider. It syncs only this integration, the example +workflows, the docs page, and `mkdocs.yml`. Use it when `ISLO_API_KEY` or +`CRABBOX_ISLO_API_KEY` is available. + +```bash +tmp_config="$(mktemp)" +install -m 600 integrations/crabbox/crabbox.islo.yaml "$tmp_config" +CRABBOX_CONFIG="$tmp_config" \ + crabbox run --provider islo --shell \ + 'python3 -m py_compile integrations/crabbox/*.py && python3 -m json.tool ai/examples/23-crabbox-mcp-run.json >/dev/null && python3 -m json.tool ai/examples/24-crabbox-simple-worker.json >/dev/null' +``` diff --git a/integrations/crabbox/conductor_crabbox_worker.py b/integrations/crabbox/conductor_crabbox_worker.py new file mode 100644 index 0000000000..ea1b92cfd1 --- /dev/null +++ b/integrations/crabbox/conductor_crabbox_worker.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +"""Minimal Conductor SIMPLE worker backed by `crabbox run`. + +This worker intentionally uses only the standard library. It is an example +bridge, not a replacement for the official Conductor SDK worker runtimes. +""" + +from __future__ import annotations + +import argparse +import json +import os +import socket +import time +import urllib.error +import urllib.parse +import urllib.request +from typing import Any + +from crabbox_bridge import CrabboxRequest, run_crabbox + + +def main() -> None: + parser = argparse.ArgumentParser(description="Poll Conductor tasks and execute them via Crabbox.") + parser.add_argument("--server", default=os.getenv("CONDUCTOR_SERVER_URL", "http://localhost:8080")) + parser.add_argument("--task-type", default=os.getenv("CONDUCTOR_CRABBOX_TASK_TYPE", "crabbox_run")) + parser.add_argument("--domain", default=os.getenv("CONDUCTOR_CRABBOX_DOMAIN")) + parser.add_argument("--worker-id", default=os.getenv("CONDUCTOR_WORKER_ID") or default_worker_id()) + parser.add_argument("--poll-interval", type=float, default=float(os.getenv("CONDUCTOR_POLL_INTERVAL_SECONDS", "2"))) + parser.add_argument("--long-poll-ms", type=int, default=int(os.getenv("CONDUCTOR_LONG_POLL_MS", "1000"))) + parser.add_argument("--heartbeat-seconds", type=int, default=int(os.getenv("CONDUCTOR_HEARTBEAT_SECONDS", "30"))) + parser.add_argument("--once", action="store_true", help="Process at most one task and exit.") + args = parser.parse_args() + + client = ConductorClient(args.server, args.worker_id) + print(f"polling taskType={args.task_type} domain={args.domain or ''} workerId={args.worker_id}") + + while True: + task = client.poll(args.task_type, args.domain, args.long_poll_ms) + if task: + process_task(client, task, args.heartbeat_seconds) + if args.once: + return + else: + if args.once: + return + time.sleep(args.poll_interval) + + +def process_task(client: "ConductorClient", task: dict[str, Any], heartbeat_seconds: int) -> None: + task_id = task["taskId"] + workflow_id = task["workflowInstanceId"] + input_data = task.get("inputData") or {} + last_heartbeat = 0.0 + + def heartbeat(output_tail: str = "") -> None: + nonlocal last_heartbeat + now = time.monotonic() + if now - last_heartbeat < heartbeat_seconds: + return + last_heartbeat = now + client.update_task( + workflow_id, + task_id, + "IN_PROGRESS", + { + "phase": "running", + "provider": input_data.get("provider", "local-container"), + "outputTail": output_tail[-4000:], + }, + callback_after_seconds=heartbeat_seconds * 2, + ) + + try: + request = CrabboxRequest.from_mapping(input_data) + seen_lines: list[str] = [] + + def on_line(line: str) -> None: + seen_lines.append(line) + if len(seen_lines) > 20: + del seen_lines[:-20] + client.add_log(task_id, line[:4000]) + heartbeat("\n".join(seen_lines)) + + heartbeat() + result = run_crabbox(request, on_line=on_line) + client.update_task( + workflow_id, + task_id, + result["status"], + result, + reason_for_incompletion=None if result["status"] == "COMPLETED" else result["reason"], + ) + except Exception as exc: + client.update_task( + workflow_id, + task_id, + "FAILED_WITH_TERMINAL_ERROR", + {"error": str(exc), "errorType": exc.__class__.__name__}, + reason_for_incompletion=str(exc), + ) + + +class ConductorClient: + def __init__(self, server: str, worker_id: str) -> None: + self.server = server.rstrip("/") + self.worker_id = worker_id + self.auth_token = os.getenv("CONDUCTOR_AUTH_TOKEN") + + def poll(self, task_type: str, domain: str | None, timeout_ms: int) -> dict[str, Any] | None: + query = {"workerid": self.worker_id, "timeout": str(timeout_ms)} + if domain: + query["domain"] = domain + path = f"/api/tasks/poll/{urllib.parse.quote(task_type)}?{urllib.parse.urlencode(query)}" + try: + response = self._request("GET", path) + except urllib.error.HTTPError as exc: + if exc.code == 204: + return None + raise + if response is None: + return None + return json.loads(response) + + def update_task( + self, + workflow_id: str, + task_id: str, + status: str, + output_data: dict[str, Any], + *, + reason_for_incompletion: str | None = None, + callback_after_seconds: int | None = None, + ) -> None: + payload: dict[str, Any] = { + "workflowInstanceId": workflow_id, + "taskId": task_id, + "status": status, + "outputData": output_data, + } + if reason_for_incompletion: + payload["reasonForIncompletion"] = reason_for_incompletion + if callback_after_seconds is not None: + payload["callbackAfterSeconds"] = callback_after_seconds + self._request("POST", "/api/tasks", payload) + + def add_log(self, task_id: str, line: str) -> None: + if not line: + return + self._request("POST", f"/api/tasks/{urllib.parse.quote(task_id)}/log", line, content_type="text/plain") + + def _request( + self, + method: str, + path: str, + payload: Any | None = None, + *, + content_type: str = "application/json", + ) -> str | None: + data = None + if payload is not None: + if content_type == "application/json": + data = json.dumps(payload).encode("utf-8") + else: + data = str(payload).encode("utf-8") + request = urllib.request.Request(f"{self.server}{path}", data=data, method=method) + request.add_header("accept", "application/json") + request.add_header("content-type", content_type) + if self.auth_token: + request.add_header("authorization", f"Bearer {self.auth_token}") + try: + with urllib.request.urlopen(request, timeout=60) as response: + body = response.read() + return body.decode("utf-8") if body else None + except urllib.error.HTTPError as exc: + if exc.code == 204: + return None + raise + + +def default_worker_id() -> str: + return f"crabbox-{socket.gethostname()}-{os.getpid()}" + + +if __name__ == "__main__": + main() diff --git a/integrations/crabbox/crabbox.islo.yaml b/integrations/crabbox/crabbox.islo.yaml new file mode 100644 index 0000000000..f399176aca --- /dev/null +++ b/integrations/crabbox/crabbox.islo.yaml @@ -0,0 +1,25 @@ +provider: islo +target: linux +sync: + include: + - integrations/crabbox + - ai/examples/23-crabbox-mcp-run.json + - ai/examples/24-crabbox-simple-worker.json + - docs/devguide/ai/crabbox-sandbox.md + - mkdocs.yml + exclude: + - .git + - build + - .gradle + - ui/node_modules + - ui-next/node_modules +env: + allow: + - CI + - PYTHONPATH +islo: + image: docker.io/library/python:3.12-slim + workdir: conductor-crabbox + vcpus: 2 + memoryMB: 4096 + diskGB: 20 diff --git a/integrations/crabbox/crabbox_bridge.py b/integrations/crabbox/crabbox_bridge.py new file mode 100644 index 0000000000..5b67b5f0ee --- /dev/null +++ b/integrations/crabbox/crabbox_bridge.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 +"""Shared Crabbox runner helpers for Conductor integration examples.""" + +from __future__ import annotations + +import json +import os +import shlex +import subprocess +import threading +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +DEFAULT_OUTPUT_LIMIT = 200_000 +DEFAULT_TAIL_LINES = 120 + + +@dataclass +class CrabboxRequest: + """Validated subset of the Crabbox run surface exposed to workflows.""" + + command: str | list[str] + provider: str = "local-container" + shell: bool = False + workspace_dir: str | None = None + profile: str | None = None + lease_id: str | None = None + machine_class: str | None = None + target: str | None = None + arch: str | None = None + ttl: str | None = None + idle_timeout: str | None = None + keep: bool = False + keep_on_failure: bool = False + no_sync: bool = False + full_resync: bool = False + preflight: bool = False + timing_json: bool = True + timeout_seconds: int | None = None + extra_env: dict[str, str] = field(default_factory=dict) + + @classmethod + def from_mapping(cls, data: dict[str, Any]) -> "CrabboxRequest": + command = data.get("command") + if not command: + raise ValueError("input 'command' is required") + if not isinstance(command, (str, list)): + raise ValueError("input 'command' must be a string or list of strings") + if isinstance(command, list) and not all(isinstance(part, str) for part in command): + raise ValueError("input 'command' list must contain only strings") + + extra_env = data.get("env") or data.get("extraEnv") or {} + if not isinstance(extra_env, dict): + raise ValueError("input 'env' must be an object when provided") + + timeout_seconds = data.get("timeoutSeconds") + if timeout_seconds is not None: + timeout_seconds = int(timeout_seconds) + + return cls( + command=command, + provider=str(data.get("provider") or "local-container"), + shell=bool(data.get("shell", isinstance(command, str))), + workspace_dir=data.get("workspaceDir") or data.get("repoPath"), + profile=data.get("profile"), + lease_id=data.get("leaseId") or data.get("id"), + machine_class=data.get("class") or data.get("machineClass"), + target=data.get("target"), + arch=data.get("arch"), + ttl=data.get("ttl"), + idle_timeout=data.get("idleTimeout"), + keep=bool(data.get("keep", False)), + keep_on_failure=bool(data.get("keepOnFailure", False)), + no_sync=bool(data.get("noSync", False)), + full_resync=bool(data.get("fullResync", False) or data.get("freshSync", False)), + preflight=bool(data.get("preflight", False)), + timing_json=bool(data.get("timingJson", True)), + timeout_seconds=timeout_seconds, + extra_env={str(key): str(value) for key, value in extra_env.items()}, + ) + + +def build_crabbox_command(request: CrabboxRequest) -> list[str]: + cmd = ["crabbox", "run", "--provider", request.provider] + + optional_flags = [ + ("--profile", request.profile), + ("--id", request.lease_id), + ("--class", request.machine_class), + ("--target", request.target), + ("--arch", request.arch), + ("--ttl", request.ttl), + ("--idle-timeout", request.idle_timeout), + ] + for flag, value in optional_flags: + if value: + cmd.extend([flag, str(value)]) + + if request.keep: + cmd.append("--keep") + if request.keep_on_failure: + cmd.append("--keep-on-failure") + if request.no_sync: + cmd.append("--no-sync") + if request.full_resync: + cmd.append("--full-resync") + if request.preflight: + cmd.append("--preflight") + if request.timing_json: + cmd.append("--timing-json") + + if request.shell: + shell_command = ( + request.command + if isinstance(request.command, str) + else " ".join(shlex.quote(part) for part in request.command) + ) + cmd.extend(["--shell", shell_command]) + else: + if isinstance(request.command, str): + cmd.extend(["--", request.command]) + else: + cmd.extend(["--", *request.command]) + return cmd + + +def run_crabbox( + request: CrabboxRequest, + *, + on_line: Any | None = None, + output_limit: int = DEFAULT_OUTPUT_LIMIT, +) -> dict[str, Any]: + cwd = Path(request.workspace_dir).expanduser() if request.workspace_dir else Path.cwd() + if not cwd.exists() or not cwd.is_dir(): + raise ValueError(f"workspaceDir does not exist or is not a directory: {cwd}") + + env = os.environ.copy() + env.update(request.extra_env) + start = time.monotonic() + argv = build_crabbox_command(request) + + output_parts: list[str] = [] + output_size = 0 + timing: dict[str, Any] | None = None + + try: + process = subprocess.Popen( + argv, + cwd=str(cwd), + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + except FileNotFoundError as exc: + raise RuntimeError("crabbox binary was not found on PATH") from exc + + timed_out = False + timeout_timer: threading.Timer | None = None + + def kill_on_timeout() -> None: + nonlocal timed_out + timed_out = True + process.kill() + + if request.timeout_seconds is not None: + timeout_timer = threading.Timer(request.timeout_seconds, kill_on_timeout) + timeout_timer.daemon = True + timeout_timer.start() + + try: + assert process.stdout is not None + for line in process.stdout: + if on_line: + on_line(line.rstrip("\n")) + if output_size < output_limit: + remaining = output_limit - output_size + chunk = line[:remaining] + output_parts.append(chunk) + output_size += len(chunk) + parsed = _try_parse_json_object(line) + if parsed and "exitCode" in parsed: + timing = parsed + exit_code = process.wait() + finally: + if timeout_timer: + timeout_timer.cancel() + + if timed_out: + output_parts.append(f"\ncrabbox run timed out after {request.timeout_seconds}s\n") + + output = "".join(output_parts) + status, reason = classify_crabbox_exit(exit_code, output) + return { + "status": status, + "exitCode": exit_code, + "reason": reason, + "retryable": status == "FAILED", + "command": redact_command(argv), + "provider": request.provider, + "durationSeconds": round(time.monotonic() - start, 3), + "timing": timing, + "output": output, + "outputTail": tail_lines(output), + "truncated": output_size >= output_limit, + } + + +def classify_crabbox_exit(exit_code: int, output: str) -> tuple[str, str]: + if exit_code == 0: + return "COMPLETED", "crabbox command completed" + + lowered = output.lower() + terminal_markers = [ + "unauthorized", + "forbidden", + "missing api key", + "requires islo_api_key", + "requires crabbox_islo_api_key", + "not logged in", + "unknown flag", + "invalid provider", + "invalid config", + "usage:", + ] + retryable_markers = [ + "capacity", + "rate limit", + "429", + "timeout", + "temporarily", + "provision", + "readiness", + "stream", + "sync", + ] + + if any(marker in lowered for marker in terminal_markers): + return "FAILED_WITH_TERMINAL_ERROR", "crabbox configuration or authentication failed" + if any(marker in lowered for marker in retryable_markers): + return "FAILED", "crabbox infrastructure path failed and may be retryable" + return "FAILED", "remote command failed" + + +def tail_lines(output: str, limit: int = DEFAULT_TAIL_LINES) -> str: + lines = output.splitlines() + return "\n".join(lines[-limit:]) + + +def _try_parse_json_object(line: str) -> dict[str, Any] | None: + text = line.strip() + if not text.startswith("{") or not text.endswith("}"): + return None + try: + value = json.loads(text) + except json.JSONDecodeError: + return None + return value if isinstance(value, dict) else None + + +def redact_command(argv: list[str]) -> list[str]: + redacted: list[str] = [] + redact_next = False + secret_flags = {"--token", "--token-stdin", "--password", "--secret"} + for part in argv: + if redact_next: + redacted.append("") + redact_next = False + continue + redacted.append(part) + if part in secret_flags: + redact_next = True + return redacted diff --git a/integrations/crabbox/crabbox_mcp_server.py b/integrations/crabbox/crabbox_mcp_server.py new file mode 100644 index 0000000000..eeaa1d7142 --- /dev/null +++ b/integrations/crabbox/crabbox_mcp_server.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +"""MCP server exposing Crabbox-backed execution tools to Conductor.""" + +from __future__ import annotations + +import argparse +import os +import shutil +import subprocess +from typing import Any + +from crabbox_bridge import CrabboxRequest, run_crabbox + +try: + from mcp.server.fastmcp import FastMCP +except ImportError as exc: # pragma: no cover - startup guidance + raise SystemExit( + "Missing MCP Python SDK. Install it with: pip install -r integrations/crabbox/requirements.txt" + ) from exc + + +mcp = FastMCP("conductor-crabbox", stateless_http=True, json_response=True) + + +@mcp.tool() +def crabbox_run_command( + command: str, + provider: str = "local-container", + shell: bool = True, + workspace_dir: str | None = None, + profile: str | None = None, + timeout_seconds: int | None = None, + keep_on_failure: bool = False, +) -> dict[str, Any]: + """Run a command through Crabbox and return status, output tail, and timing.""" + + request = CrabboxRequest( + command=command, + provider=provider, + shell=shell, + workspace_dir=workspace_dir, + profile=profile, + timeout_seconds=timeout_seconds, + keep_on_failure=keep_on_failure, + ) + return run_crabbox(request) + + +@mcp.tool() +def crabbox_doctor(provider: str = "local-container") -> dict[str, Any]: + """Run `crabbox doctor` for the configured provider.""" + + if not shutil.which("crabbox"): + return { + "status": "FAILED_WITH_TERMINAL_ERROR", + "reason": "crabbox binary was not found on PATH", + "exitCode": 127, + } + + result = subprocess.run( + ["crabbox", "doctor", "--provider", provider], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + check=False, + ) + return { + "status": "COMPLETED" if result.returncode == 0 else "FAILED", + "exitCode": result.returncode, + "provider": provider, + "output": result.stdout, + } + + +def main() -> None: + parser = argparse.ArgumentParser(description="Serve Crabbox tools over MCP.") + parser.add_argument("--transport", default=os.getenv("MCP_TRANSPORT", "streamable-http")) + parser.add_argument("--host", default=os.getenv("MCP_HOST", "127.0.0.1")) + parser.add_argument("--port", type=int, default=int(os.getenv("MCP_PORT", "3001"))) + args = parser.parse_args() + + if args.transport == "streamable-http": + mcp.run(transport="streamable-http", host=args.host, port=args.port) + else: + mcp.run(transport=args.transport) + + +if __name__ == "__main__": + main() diff --git a/integrations/crabbox/requirements.txt b/integrations/crabbox/requirements.txt new file mode 100644 index 0000000000..6664c6da98 --- /dev/null +++ b/integrations/crabbox/requirements.txt @@ -0,0 +1 @@ +mcp diff --git a/integrations/crabbox/taskdefs/crabbox_run_taskdef.json b/integrations/crabbox/taskdefs/crabbox_run_taskdef.json new file mode 100644 index 0000000000..9e6857f7df --- /dev/null +++ b/integrations/crabbox/taskdefs/crabbox_run_taskdef.json @@ -0,0 +1,27 @@ +[ + { + "name": "crabbox_run", + "description": "Execute a command through Crabbox from an external Conductor worker.", + "retryCount": 1, + "retryLogic": "FIXED", + "retryDelaySeconds": 30, + "timeoutSeconds": 1800, + "responseTimeoutSeconds": 120, + "pollTimeoutSeconds": 60, + "inputKeys": [ + "command", + "provider", + "workspaceDir", + "timeoutSeconds" + ], + "outputKeys": [ + "status", + "exitCode", + "reason", + "provider", + "durationSeconds", + "outputTail", + "timing" + ] + } +] diff --git a/mkdocs.yml b/mkdocs.yml index 6f0fe42a58..bfd72ef7bd 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -53,6 +53,7 @@ nav: - AI & LLM Recipes: devguide/cookbook/ai-llm.md - LLM Orchestration: devguide/ai/llm-orchestration.md - MCP Integration: devguide/ai/mcp-guide.md + - Crabbox Remote Execution: devguide/ai/crabbox-sandbox.md - Production Agent Architecture: devguide/ai/production-agent-architecture.md - Failure Semantics: devguide/ai/failure-semantics.md - Why Conductor for Agents: devguide/ai/why-conductor.md