-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathorchestrator.py
More file actions
452 lines (385 loc) · 20 KB
/
orchestrator.py
File metadata and controls
452 lines (385 loc) · 20 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
"""
任务 Orchestrator:驱动完整的任务生命周期。
职责范围(Phase 2):
1. 从 DB 读取 TaskRequest
2. 调用 workspace/handler.py 准备工作区
3. 构建容器规格(env 注入、broker policy、opencode 配置)
4. 调用 sandbox/manager.py 启动容器
5. 等待 opencode /global/health 就绪
6. 更新 DB 状态到 starting_opencode(占位,Phase 3 opencode adapter 替换)
7. Phase 2 stub:等待容器自然退出,收集 exit code
8. 终态清理:停止/删除容器 + 清理 workspace + 移除 broker policy
Phase 3 集成点:
此 Orchestrator 的 _drive_opencode() 方法是 Phase 3 的主扩展点。
Phase 2 中该方法为 stub(等容器退出),Phase 3 会替换为真实的
opencode HTTP adapter 调用(SSE 订阅 + session 管理 + HITL 路由)。
注册方式:
from worker.orchestrator.orchestrator import run_task
from worker.orchestrator.queue import set_executor
set_executor(run_task)
"""
from __future__ import annotations
import json
import logging
import socket
import time
from pathlib import Path
from typing import Optional
from worker.config import get_settings
from worker.contract.event import TaskEventKind
from worker.contract.task import TaskMode, TaskRequest, TaskStatus
from worker.sandbox.manager import (
ContainerSpec,
ensure_worker_network,
get_container,
remove_container,
reap_orphaned_containers,
start_container,
stop_container,
wait_for_opencode_health,
)
from worker.storage.db import get_db
from worker.storage.repo import (
insert_event,
update_task_status,
)
from worker.workspace.handler import cleanup_workspace, prepare_workspace
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────────────
# 全局常量
# ──────────────────────────────────────────────────────────────────────────────
WORKER_DOCKER_NETWORK = "worker-sandbox-net"
OPENCODE_PLUGIN_ENTRY = "oh-my-openagent@latest"
# ──────────────────────────────────────────────────────────────────────────────
# 主入口(由 queue._run_one 调用)
# ──────────────────────────────────────────────────────────────────────────────
async def run_task(task_id: str) -> None:
"""完整执行一个任务的生命周期(Phase 2)。
调用方(queue._run_one)已负责:
- 进入 semaphore 槽位(并发控制)
- 异常捕获 → task_failed / task_aborted / task_timed_out 事件
本函数负责:
- 状态机驱动:preparing_workspace → starting_container →
starting_opencode → ... → completed
- 工作区准备、container 启动、健康检查
- opencode 驱动
- 终态写入(completed)
- 清理:container + workspace + broker policy
[REVIEW: P1-16] queue 不再代写起始状态,本函数为状态机唯一写者。
若出现任何未捕获异常,都会由 queue._run_one 的 except 块写入对应终态事件。
"""
settings = get_settings()
db = await get_db()
# ── 读取完整 TaskRequest ──────────────────────────────────────────────────
request = await _load_request(db, task_id)
if request is None:
raise RuntimeError(f"TaskRequest not found for task_id={task_id}")
workspaces_base = settings.data_dir / "workspaces"
try:
# ── Step 0: 安全门——拒绝未授权的 host bind mount ───────────────────
if request.workspace.kind == "local" and not settings.allow_host_mount:
raise PermissionError(
"workspace.kind='local' is disabled by policy. "
"Set WORKER_ALLOW_HOST_MOUNT=true to allow host bind-mount workspaces "
"(this disables non-root + read-only FS isolation for that task)."
)
# ── Step 1: 准备 workspace ────────────────────────────────────────────
await update_task_status(db, task_id, TaskStatus.preparing_workspace)
await insert_event(db, task_id, TaskEventKind.task_started,
{"phase": "preparing_workspace"})
workspace_dir = await prepare_workspace(
task_id=task_id,
base_dir=workspaces_base,
kind=request.workspace.kind,
tarball_url=request.workspace.tarball_url,
tarball_inline_b64=request.workspace.tarball_inline_b64,
git_url=request.workspace.git.url if request.workspace.git else None,
git_sha=request.workspace.git.sha if request.workspace.git else None,
git_subpath=request.workspace.git.subpath if request.workspace.git else None,
local_path=request.workspace.local_path,
)
logger.info("task %s: workspace ready at %s", task_id, workspace_dir)
# ── Step 2: 确保 Docker network 存在 ─────────────────────────────────
await ensure_worker_network(WORKER_DOCKER_NETWORK)
# ── Step 3: 构建 broker policy ────────────────────────────────────────
if settings.broker_enabled:
from broker.policy import set_task_policy
allow_hosts = request.broker_policy.allow_egress_hosts if request.broker_policy else []
set_task_policy(task_id, allow_hosts)
else:
logger.debug("task %s: broker disabled, skipping policy setup", task_id)
# ── Step 4: 构建容器 env ──────────────────────────────────────────────
container_env = _build_container_env(task_id, request, settings)
# ── Step 5: 找一个可用宿主端口 ────────────────────────────────────────
host_port = _find_free_port()
# ── Step 6: 启动容器 ──────────────────────────────────────────────────
await update_task_status(db, task_id, TaskStatus.starting_container)
resource_limits = request.resource_limits
# local workspace:以 root 运行、关闭只读 FS(开发/测试模式,用户已知悉安全风险)
is_local = request.workspace.kind == "local"
spec = ContainerSpec(
task_id=task_id,
image=settings.sandbox_image,
workspace_dir=workspace_dir,
env=container_env,
opencode_port_host=host_port,
cpu_limit=str(resource_limits.cpu) if resource_limits else "2",
memory_limit=str(resource_limits.memory) if resource_limits else "4g",
pids_limit=resource_limits.pids if resource_limits else 512,
network_name=WORKER_DOCKER_NETWORK,
broker_host=settings.broker_host if settings.broker_enabled else None,
broker_port=settings.broker_port if settings.broker_enabled else None,
timeout_sec=resource_limits.timeout_sec if resource_limits else 1800,
container_user="0:0" if is_local else "1000:1000",
read_only=not is_local,
)
container = await start_container(spec)
# ── Step 7: 更新 DB container_id ─────────────────────────────────────
await update_task_status(
db, task_id, TaskStatus.starting_container,
container_id=container.id,
)
await insert_event(db, task_id, TaskEventKind.container_started,
{"container_id": container.id[:12]})
# ── Step 8: 等待 opencode 健康 ────────────────────────────────────────
await update_task_status(db, task_id, TaskStatus.starting_opencode)
opencode_password = container_env.get("OPENCODE_SERVER_PASSWORD", "")
await wait_for_opencode_health(
host="127.0.0.1",
port=host_port,
password=opencode_password,
)
await insert_event(db, task_id, TaskEventKind.opencode_ready,
{"port": host_port})
# ── Step 9: 驱动 opencode(Phase 2 stub)────────────────────────────
await _drive_opencode(task_id, request, container, host_port, container_env)
# ── Step 10: 完成 ─────────────────────────────────────────────────────
await update_task_status(db, task_id, TaskStatus.completed)
await insert_event(db, task_id, TaskEventKind.task_completed)
logger.info("task %s: completed", task_id)
finally:
# ── 清理:无论成功失败都执行 ──────────────────────────────────────────
# [REVIEW: P1-18] 工作区清理按 task_id 顶层目录整体删,避免 git_subpath
# 模式只删 subpath 留下顶层壳导致 inode 缓慢泄漏。
await _cleanup(
task_id,
workspace_kind=request.workspace.kind,
workspaces_base=workspaces_base,
)
# ──────────────────────────────────────────────────────────────────────────────
# Phase 3:opencode HTTP adapter 驱动
# ──────────────────────────────────────────────────────────────────────────────
async def _drive_opencode(
task_id: str,
request: TaskRequest,
container,
host_port: int,
container_env: dict[str, str],
) -> None:
"""驱动容器内 opencode 执行任务(Phase 3 实现)。
创建 OpenCodeDriver 实例并委托其完整生命周期:
- SSE 订阅 /global/event
- session 创建 + prompt_async(按 mode 路由 agent)
- 权限 HITL(tool_permission DecisionRequest)
- plan_first HITL 审批
- artifact 收集(diff + transcript)
超时或 abort 时抛出 RuntimeError,由 queue._run_one 写入 task_failed 事件。
"""
from worker.adapters.opencode.driver import OpenCodeDriver
from worker.adapters.opencode.interceptors import build_interceptors_from_config
from worker.storage.db import get_db
db = await get_db()
# W2-1:根据 opencode_profile.interceptors 实例化拦截器列表
# 未注册的工厂名被静默跳过;拦截器构造异常向上抛 → task 进 failed 终态
interceptor_configs = (
request.opencode_profile.interceptors
if request.opencode_profile else []
)
interceptors = build_interceptors_from_config(interceptor_configs)
driver = OpenCodeDriver(
task_id=task_id,
request=request,
host_port=host_port,
container_env=container_env,
db=db,
interceptors=interceptors,
)
await driver.run()
# ──────────────────────────────────────────────────────────────────────────────
# 清理
# ──────────────────────────────────────────────────────────────────────────────
async def _cleanup(
task_id: str,
*,
workspace_kind: str,
workspaces_base: Path,
) -> None:
"""任务终态后清理容器 + workspace + broker policy。
[REVIEW: P1-18] 按 `workspaces_base / task_id` 顶层目录整体删除,
`git_subpath` 模式下既清子目录也清父壳;workspace 从未创建(早失败)
或 local 模式下 cleanup_workspace 自身能容忍不存在的路径。
"""
# 停止并删除容器
try:
await stop_container(task_id, timeout=10)
await remove_container(task_id, force=True)
except Exception as exc:
logger.warning("task %s: container cleanup error: %s", task_id, exc)
# 清理工作区(local 模式跳过,避免删除宿主机原始目录)
if workspace_kind != "local":
task_root = workspaces_base / task_id
try:
await cleanup_workspace(task_root)
except Exception as exc:
logger.warning("task %s: workspace cleanup error: %s", task_id, exc)
# 移除 broker policy(仅在 broker 启用时)
settings = get_settings()
if settings.broker_enabled:
try:
from broker.policy import remove_task_policy
remove_task_policy(task_id)
except Exception as exc:
logger.warning("task %s: broker policy cleanup error: %s", task_id, exc)
# ──────────────────────────────────────────────────────────────────────────────
# 辅助函数
# ──────────────────────────────────────────────────────────────────────────────
async def _load_request(db, task_id: str) -> Optional[TaskRequest]:
"""从 DB 读取任务的 TaskRequest(反序列化 request_json 列)。"""
import aiosqlite
async with db.execute(
"SELECT request_json FROM tasks WHERE id = ?", (task_id,)
) as cur:
row = await cur.fetchone()
if row is None:
return None
return TaskRequest.model_validate_json(row["request_json"])
def _build_container_env(
task_id: str,
request: TaskRequest,
settings,
) -> dict[str, str]:
"""构建注入容器的环境变量字典。
包含:
- OPENCODE_SERVER_PASSWORD(随机生成,每任务唯一)
- OPENCODE_CONFIG_CONTENT(内联 JSON,注入 model/provider/permission)
- OPENCODE_DISABLE_AUTOUPDATE=1
- OPENCODE_PERMISSION(权限模板 JSON)
- worker 侧 proxy 设置
- TaskRequest.env_policy.extra_env(用户自定义 env)
- provider API keys(从宿主 env 透传)
"""
import os
import secrets
opencode_password = secrets.token_hex(16)
# 构建 OPENCODE_CONFIG_CONTENT
profile = request.opencode_profile
config_content: dict = {
"autoupdate": False,
# oh-my-openagent 需要在 opencode 配置中显式声明,单靠 cache 目录不会加载。
"plugin": [OPENCODE_PLUGIN_ENTRY],
}
if profile:
if profile.model:
config_content["model"] = profile.model
# 构建 provider 块:先用 provider_extra_config 作为基础,再注入 apiKey
providers_block: dict = {}
# 合并 provider_extra_config(自定义 npm/name/options/models 等)
if profile.provider_extra_config:
import copy
for p, extra in profile.provider_extra_config.items():
providers_block[p] = copy.deepcopy(extra)
# 为 providers 列表中声明的 provider 注入 apiKey(若有环境变量映射)
for provider in (profile.providers or []):
key_env_var = _provider_key_env_var(provider)
if key_env_var:
entry = providers_block.setdefault(provider, {})
opts = entry.setdefault("options", {})
# 仅在未明确设置 apiKey 时才注入
if "apiKey" not in opts:
opts["apiKey"] = f"{{env:{key_env_var}}}"
if providers_block:
# opencode 配置 key 为 "provider"(单数)
config_content["provider"] = providers_block
# 权限模板
permission_json = _build_permission_json(
profile.permission_template if profile else None,
profile.permission_overrides if profile else None,
)
env: dict[str, str] = {
"OPENCODE_SERVER_PASSWORD": opencode_password,
"OPENCODE_CONFIG_CONTENT": json.dumps(config_content),
"OPENCODE_DISABLE_AUTOUPDATE": "1",
"OPENCODE_PERMISSION": permission_json,
}
# broker proxy(仅当 broker_enabled=True 时注入,否则容器直连外网)
if settings.broker_enabled:
env["HTTP_PROXY"] = f"http://broker:{settings.broker_port}"
env["HTTPS_PROXY"] = f"http://broker:{settings.broker_port}"
# task_id 供 broker 白名单检查使用
env["WORKER_TASK_ID"] = task_id
# 透传 provider API keys(从 Worker 进程 env 复制到容器 env)
if request.env_policy:
for key_name in request.env_policy.provider_keys:
val = os.environ.get(key_name)
if val:
env[key_name] = val
# 用户自定义 extra_env(不允许覆盖 OPENCODE_* 系统变量)
for k, v in (request.env_policy.extra_env or {}).items():
if not k.startswith("OPENCODE_"):
env[k] = v
return env
def _provider_key_env_var(provider: str) -> Optional[str]:
"""将 provider 名称映射到 API key 环境变量名。"""
mapping = {
"anthropic": "ANTHROPIC_API_KEY",
"openai": "OPENAI_API_KEY",
"alibaba": "ALIBABA_API_KEY",
"alibaba-cn": "ALIBABA_CN_API_KEY",
"alibabacloud": "DASHSCOPE_API_KEY",
"dashscope": "DASHSCOPE_API_KEY",
"deepseek": "DEEPSEEK_API_KEY",
"gemini": "GEMINI_API_KEY",
}
return mapping.get(provider.lower())
def _build_permission_json(
template: Optional[str],
overrides: Optional[dict],
) -> str:
"""根据权限模板 + overrides 构建 OPENCODE_PERMISSION JSON 字符串。"""
# 内置默认模板
templates: dict[str, dict] = {
"plan_first_default": {
"bash": "ask",
"write": "ask",
"edit": "ask",
"webfetch": "ask",
"external_directory": "deny",
},
"direct_execute_default": {
"bash": "ask",
"write": "allow",
"edit": "allow",
"webfetch": "ask",
"external_directory": "deny",
},
}
# 选择基础模板
if template and template in templates:
base = dict(templates[template])
elif template == "custom":
base = {}
else:
# 默认使用 plan_first_default
base = dict(templates["plan_first_default"])
# 合并 overrides(只允许合法值)
VALID_VALUES = {"allow", "ask", "deny"}
if overrides:
for k, v in overrides.items():
if v in VALID_VALUES:
base[k] = v
return json.dumps(base)
def _find_free_port() -> int:
"""在宿主上找一个空闲端口用于容器 port mapping。"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]