-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy path_runtime.py
More file actions
422 lines (346 loc) · 15.3 KB
/
Copy path_runtime.py
File metadata and controls
422 lines (346 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
"""Reusable runtime base classes for common pack-author patterns.
These are optional. Packs can implement the ``RuntimeHandle`` Protocol
directly. Use one of these when your runtime fits the pattern.
Two siblings share a filesystem-lifecycle base (``env_root`` /
``solver_root`` / ``pack_root``, file-snapshot checkpoint/restore,
``result.json`` terminal signal):
* :class:`SubprocessRuntime` — spawns and supervises a
long-running child the solver interacts with (e.g. a webapp, a
simulator). Owns the SIGTERM→SIGKILL grace period and the startup
handshake.
* :class:`OnDemandRuntime` — no persistent child. Agent acts on
files; the pack exposes on-demand callables (e.g. ``run_tests``) via
``surface_extras``. Fits SWE-style packs where the world is a code
workspace.
"""
from __future__ import annotations
import json
import os
import shutil
import signal
import subprocess
import tempfile
from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
from contextlib import suppress
from pathlib import Path
from typing import Any
from graphschema import WorldGraph
from openrange_pack_sdk._errors import OpenRangeError
from openrange_pack_sdk._helpers import write_tree
class _FilesystemRuntime(ABC):
"""Shared lifecycle base for filesystem-backed RuntimeHandles.
Owns the tempdir trio (``env_root``, ``solver_root``, ``pack_root``;
each ``None`` before the first ``reset()`` and after ``stop()``),
file-snapshot ``checkpoint`` / ``restore`` of the solver's workspace,
``terminal()`` via ``result.json``, and the default ``collect()``
shape. Packs subclass one of the public siblings, not this directly.
"""
RESULT_FILE = "result.json"
def __init__(self, graph: WorldGraph) -> None:
self._graph = graph
self._env_root: Path | None = None
self._solver_root: Path | None = None
self._pack_root: Path | None = None
self._checkpoint_dirs: list[Path] = []
@property
def env_root(self) -> Path | None:
return self._env_root
@property
def solver_root(self) -> Path | None:
return self._solver_root
@property
def pack_root(self) -> Path | None:
return self._pack_root
@abstractmethod
def prepare_env_files(self, graph: WorldGraph) -> Mapping[str, str]:
"""Return ``{relative_path: file_contents}`` written under ``pack_root``
on each ``reset()``."""
def surface_extras(self) -> Mapping[str, Any]:
"""Override to add keys to ``surface()`` (callables, URLs, etc.)."""
return {}
def collect_extras(self) -> Mapping[str, Any]:
"""Override to add keys to ``collect()`` (parsed logs, metrics)."""
return {}
def poll_events(self) -> tuple[Mapping[str, Any], ...]:
return ()
@abstractmethod
def reset(self) -> None:
"""Subclasses do their own setup, calling :meth:`_init_env` to
prepare the tempdir trio."""
@abstractmethod
def stop(self) -> None:
"""Fully tear down. Subclasses must call :meth:`_teardown_env`
and drop ``_checkpoint_dirs``."""
def surface(self) -> Mapping[str, Any]:
if self._solver_root is None:
raise OpenRangeError("surface() called before reset()")
return {
"solver_root": str(self._solver_root),
**self.surface_extras(),
}
def terminal(self) -> tuple[bool, str | None]:
if self._solver_root is None:
return False, None
if (self._solver_root / self.RESULT_FILE).exists():
return True, "solver wrote result"
return False, None
def checkpoint(self) -> Any:
if self._solver_root is None:
raise OpenRangeError("checkpoint() called before reset()")
snap = Path(tempfile.mkdtemp(prefix=f"{self._tempdir_prefix()}-ckpt-"))
shutil.copytree(self._solver_root, snap / "solver", dirs_exist_ok=True)
self._checkpoint_dirs.append(snap)
return {"solver_root_snapshot": str(snap)}
def restore(self, state: Any) -> None:
if not isinstance(state, Mapping):
raise OpenRangeError(
f"restore() expects a mapping, got {type(state).__name__}"
)
snap_path = state.get("solver_root_snapshot")
if not isinstance(snap_path, str):
raise OpenRangeError(
"restore() payload missing 'solver_root_snapshot' (str)"
)
solver_snap = Path(snap_path) / "solver"
if not solver_snap.exists():
raise OpenRangeError(f"restore() snapshot missing: {solver_snap}")
if self._solver_root is None:
raise OpenRangeError("restore() called before reset()")
for child in self._solver_root.iterdir():
if child.is_dir():
shutil.rmtree(child)
else:
child.unlink()
shutil.copytree(solver_snap, self._solver_root, dirs_exist_ok=True)
def collect(self) -> Mapping[str, Any]:
if self._solver_root is None:
return {}
result = self._read_result()
return {
"solver_root": str(self._solver_root),
"result": dict(result),
**self.collect_extras(),
}
def _init_env(self) -> None:
env_root = Path(tempfile.mkdtemp(prefix=f"{self._tempdir_prefix()}-"))
# Record before the mkdirs below so a failure still leaves it reclaimable.
self._env_root = env_root
solver_root = env_root / "solver"
solver_root.mkdir(parents=True, exist_ok=True)
pack_root = env_root / "pack"
pack_root.mkdir(parents=True, exist_ok=True)
self._solver_root = solver_root
self._pack_root = pack_root
write_tree(pack_root, self.prepare_env_files(self._graph))
def _teardown_env(self) -> None:
if self._env_root is not None and self._env_root.exists():
shutil.rmtree(self._env_root, ignore_errors=True)
self._env_root = None
self._solver_root = None
self._pack_root = None
def _drop_checkpoints(self) -> None:
for ckpt in self._checkpoint_dirs:
shutil.rmtree(ckpt, ignore_errors=True)
self._checkpoint_dirs.clear()
def _read_result(self) -> Mapping[str, Any]:
assert self._solver_root is not None
result_path = self._solver_root / self.RESULT_FILE
if not result_path.exists():
return {}
try:
data = json.loads(result_path.read_text(encoding="utf-8"))
except (OSError, ValueError): # ValueError also covers a non-UTF-8 read
return {}
return dict(data) if isinstance(data, Mapping) else {}
def _tempdir_prefix(self) -> str:
return type(self).__name__.lower()
class OnDemandRuntime(_FilesystemRuntime):
"""RuntimeHandle for packs with no persistent subprocess.
Pattern: the solver acts on files under ``solver_root``; the pack
exposes on-demand callables (e.g. ``run_tests(name)``,
``run_cmd(argv)``) via ``surface_extras`` that shell out per call.
Fits SWE-style packs where the world is a code workspace and the
interesting actions are file edits + occasional command invocations.
No startup-line contract, no subprocess supervision.
Packs override (minimum):
* ``prepare_env_files(graph)`` → initial files under ``pack_root``.
Packs override (as needed):
* ``surface_extras()`` — add callables the solver can invoke.
* ``collect_extras()`` — add computed-from-workspace keys to
``collect()``.
* ``poll_events()`` — per-tick events (default = no events).
"""
def reset(self) -> None:
self._teardown_env()
self._init_env()
def stop(self) -> None:
"""Fully tear down: wipe env_root + drop all checkpoints.
``reset()`` between episodes preserves checkpoints; use ``stop()``
only for final teardown.
"""
self._teardown_env()
self._drop_checkpoints()
class SubprocessRuntime(_FilesystemRuntime):
"""RuntimeHandle scaffold for packs whose realized world is a child
subprocess the solver interacts with.
Domains this fits naturally: a webapp serving HTTP, a simulator
exposing a broker API, an in-pack mock service. Common structure:
spawn a process, optionally exchange a small startup descriptor
(URL, port, fd), let the solver act, capture results from the
solver's filesystem at the end.
The class owns:
* The shared filesystem lifecycle (``env_root`` / ``solver_root`` /
``pack_root``, ``checkpoint`` / ``restore``, terminal-via-result.json).
* Subprocess spawn with ``start_new_session=True`` so process-group
signals reach the child without affecting the harness.
* SIGTERM → ``GRACE_SECONDS`` → SIGKILL on ``stop()``.
* A startup-line handshake bounded by ``STARTUP_TIMEOUT_SECONDS`` so
a misbehaving child can't hang the harness.
Packs override (minimum):
* ``prepare_env_files(graph)`` → ``{relative_path: contents}`` for
``pack_root`` (e.g., the codegen-rendered app source).
* ``subprocess_command(env_root, solver_root)`` → the command to spawn.
Packs override (as needed):
* ``parse_startup(stdout_line)`` — extract a surface descriptor from
the subprocess's first stdout line (e.g., ``{"base_url": ...}``).
* ``subprocess_env()`` — environment variables for the child.
* ``subprocess_popen_kwargs()`` — extra ``Popen`` kwargs (e.g.
``stdin=subprocess.PIPE`` for two-way comms). Additive; don't
override ``stdout``/``stderr``/``start_new_session`` — the SDK
relies on those.
* ``surface_extras()`` — extra keys the solver reads (callables, URLs).
* ``poll_events()`` — per-tick event drain (default = no events).
* ``collect_extras()`` — per-pack final-state keys.
The subprocess's stdout is captured; nothing else is consumed beyond
the startup line. Packs that need request logs or other side-channel
state typically write to a file under ``env_root`` and read it in
``poll_events`` / ``collect_extras``.
Contract: the spawned subprocess MUST emit at least one newline on
stdout before the solver acts. ``reset()`` blocks on ``readline()``
(bounded by ``STARTUP_TIMEOUT_SECONDS``) to capture optional startup
info. Packs with no startup info to advertise should print a single
``\\n`` immediately.
"""
GRACE_SECONDS = 2.0
STARTUP_TIMEOUT_SECONDS: float = 30.0
def __init__(self, graph: WorldGraph) -> None:
super().__init__(graph)
self._process: subprocess.Popen[str] | None = None
self._startup_info: dict[str, Any] = {}
@property
def process(self) -> subprocess.Popen[str] | None:
"""The spawned subprocess; ``None`` before ``reset()`` / after ``stop()``."""
return self._process
@abstractmethod
def subprocess_command(
self,
env_root: Path,
solver_root: Path,
) -> Sequence[str]:
"""The argv to ``subprocess.Popen``."""
def subprocess_env(self) -> Mapping[str, str] | None:
"""Override to set the child's env. Default: inherit parent."""
return None
def subprocess_popen_kwargs(self) -> Mapping[str, Any]:
"""Override to add extra ``Popen`` kwargs (e.g.
``{"stdin": subprocess.PIPE}`` for two-way comms).
Additive: the SDK always sets ``stdout=PIPE``, ``stderr=PIPE``,
``text=True``, ``start_new_session=True``. Don't override those
— the startup-line readline, process-group kill, and stderr
capture depend on them.
"""
return {}
def parse_startup(self, stdout_line: str) -> Mapping[str, Any]:
"""Parse the subprocess's first stdout line into surface keys.
Default: no startup exchange (returns ``{}``). Common override
parses JSON: ``{"host": "...", "port": 12345}`` → ``{"base_url":
f"http://{host}:{port}"}``.
"""
del stdout_line
return {}
def reset(self) -> None:
self._teardown_subprocess()
self._teardown_env()
self._init_env()
assert self._env_root is not None and self._solver_root is not None
self._process = self._spawn(self._env_root, self._solver_root)
assert self._process.stdout is not None
first_line = _readline_with_timeout(self._process, self.STARTUP_TIMEOUT_SECONDS)
if first_line:
self._startup_info = dict(self.parse_startup(first_line))
def stop(self) -> None:
"""Fully tear down: kill process, wipe env_root, drop all checkpoints.
``reset()`` between episodes preserves checkpoints; use ``stop()``
only for final teardown.
"""
self._teardown_subprocess()
self._teardown_env()
self._drop_checkpoints()
def surface(self) -> Mapping[str, Any]:
base = super().surface()
return {**base, **self._startup_info, **self.surface_extras()}
def _teardown_subprocess(self) -> None:
if self._process is not None:
_terminate_process_group(self._process, self.GRACE_SECONDS)
self._process = None
self._startup_info = {}
def _spawn(
self,
env_root: Path,
solver_root: Path,
) -> subprocess.Popen[str]:
cmd = list(self.subprocess_command(env_root, solver_root))
kwargs: dict[str, Any] = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"text": True,
"start_new_session": True,
**dict(self.subprocess_popen_kwargs()),
}
env = self.subprocess_env()
if env is not None:
kwargs["env"] = dict(env)
return subprocess.Popen(cmd, **kwargs)
def _terminate_process_group(
process: subprocess.Popen[str],
grace_seconds: float,
) -> None:
"""SIGTERM the process group; SIGKILL after the grace period.
Relies on ``start_new_session=True`` in ``_spawn`` — that makes the
child a session/process-group leader, so ``pgid == child.pid``.
"""
if process.poll() is not None:
return
with suppress(ProcessLookupError, PermissionError):
os.killpg(process.pid, signal.SIGTERM)
try:
process.wait(timeout=grace_seconds)
return
except subprocess.TimeoutExpired:
pass
with suppress(ProcessLookupError, PermissionError):
os.killpg(process.pid, signal.SIGKILL)
with suppress(subprocess.TimeoutExpired):
process.wait(timeout=grace_seconds)
def _readline_with_timeout(
process: subprocess.Popen[str],
timeout_seconds: float,
) -> str:
"""Read one line from ``process.stdout``, waiting up to ``timeout_seconds``.
Returns ``""`` if the child exits without writing (EOF). Raises
``OpenRangeError`` if the child neither writes nor exits within the
budget — that case means the pack's subprocess violated the
"emit at least one newline before reset returns" contract.
"""
import select
assert process.stdout is not None
fd = process.stdout.fileno()
ready, _, _ = select.select([fd], [], [], timeout_seconds)
if not ready:
raise OpenRangeError(
f"subprocess did not write a startup line within "
f"{timeout_seconds:.1f}s; pack must emit a newline before "
"reset() can return (see SubprocessRuntime docstring)"
)
return process.stdout.readline()