Skip to content

Commit 64207d1

Browse files
MilkCloudsclaude
andauthored
refactor: extract per-episode video recorder into shared module (#52)
* refactor: extract per-episode video recorder into shared module #53 (RoboMME) wires episode mp4 saving directly into `RoboMMEBenchmark` — a buffered list of frames + an `imageio.mimsave` at the end of every episode. That pattern is going to recur in every benchmark that wants agent-view captures (failure-case debugging, demo browsing, public showcase, qualitative analysis), so lift it out of RoboMME and into `vla_eval.benchmarks.recording.EpisodeVideoRecorder`. Improvements over the inline implementation in #53: - **Streaming write** via `imageio.get_writer().append_data()` — O(1) memory regardless of episode length. A 1300-step 256×256×3 episode that previously held ~250 MB of pending frames now holds one. A side benefit: a partially-written mp4 is left on disk if the process is killed mid-episode, playable up to the last completed frame, useful for debugging crashes. - **Atomic finalize** via `tempfile.mkstemp` in the output dir + `os.replace` on `save()`. Concurrent jobs sharing a directory don't collide on partially-written files. - **Logging-style filename templating**: `str.format` template (or callable) over a context dict the caller passes at `start()`, plus a `status` key injected at `save()`. `filename` and `required_context` are required at construction time — every benchmark identifies tasks differently (`env_id`, `task_id`, `suite/task` …) so there is no universal default. Required-context validation catches mismatched keys at `start()` rather than as a silent dropped mp4 at `save()` time. - **Latched failure**: first `record()` failure disables the recorder for the rest of the episode so a wedged writer subprocess doesn't flood the log with one warning per step. `RoboMMEBenchmark` now constructs an `EpisodeVideoRecorder` (when `save_episode_video=True`) and routes the same `reset → step → save` hooks through it. The previous public flags (`save_episode_video`, `video_dir`) and behavior are preserved; the internal `_episode_frames` buffer + `_save_episode_video` method are removed. `imageio[ffmpeg]` is added as a runtime dep so the recorder works on a base install (it was previously an undeclared transitive of the RoboMME image only). `tests/test_recording.py` (16 tests, 0.8 s) covers the lifecycle: happy path, str + callable filename templating, subdirectory templates, missing template key at save, required-context validation, no-op semantics for record/save/discard before start, writer-open failure, mid-episode start-again cleanup, lazy output-dir creation, str path support. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(recording): cut overengineering, fix UX papercuts Followups after a self-review pass and reviewer feedback: ## Drop overclaimed atomic-write machinery The previous code used `tempfile.mkstemp` + scattered cleanup paths across every save() / discard() failure mode, justified by claims that mostly didn't hold: - "Concurrent jobs sharing a directory don't collide" — atomic write doesn't actually prevent collisions when two writers target the same final filename; it just controls *when* the overwrite happens. - "Partially-written mp4 left on disk if killed mid-episode" — arguably worse than the alternative: the leftover is a random `.recorder-XXXX.mp4` with no indication of which episode it was. - "Atomic vs readers" — there are no concurrent readers; eval mp4s are post-run artifacts. Renaming at save() *is* still necessary because the filename encodes status (success/fail), which only resolves at episode end. That part is kept — but the working file is now a single deterministic per-instance path (`.recorder-<uid>.mp4` set at __init__) and cleanup goes through a single `_safe_unlink` helper. Result: ~30 fewer lines, simpler error paths, same behaviour for the cases that actually matter. ## Real collision detection (overwrite=False) Previously, two episodes that resolved to the same final filename silently overwrote — a programmer-error (e.g. a duplicate `episode_idx`) lost frames with no warning. `save()` now checks the final path before renaming and raises `FileExistsError` by default, leaving the working file on disk so the caller can recover. `EpisodeVideoRecorder(..., overwrite=True)` keeps the old behaviour for callers that want it. ## Auto-derive `required_context` for str templates Repeating `("env_id", "episode_idx")` next to a `"{env_id}_ep{episode_idx:04d}_{status}.mp4"` template was pure duplication. When `required_context` is omitted on a str template, it's now derived from the template's field names via `string.Formatter().parse()` (status excluded; format specs and attribute/index access stripped). Callable templates still require an explicit value — there's no way to introspect a callable's key dependencies. Explicit `required_context` is still allowed and is treated as deliberate, so the existing "subset = some keys are optional" semantics still work. ## Filename UX papercuts - Zero-pad `episode_idx` in the docstring example and the RoboMME wiring (`{episode_idx:04d}` not `{episode_idx}`). Without this, ep10 sorts before ep2 in any directory listing. - New "Filename layout" section calling out the two field-ordering cases that bite users: episode-first for multi-camera (front/wrist of the same episode adjacent), task-first for multi-task single-camera. ## Tests - Updated existing tests to use `:04d` template + auto-derived required_context where appropriate. - New tests: `_fields_from_template` direct tests, callable filename rejects `required_context=None`, save raises on collision by default, save overwrites with `overwrite=True`. - 20 tests passing in 1.05 s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(recording): drop writer_kwargs YAGNI; widen docstring wraps ## Drop `writer_kwargs` The constructor accepted ``writer_kwargs: Mapping[str, Any]`` and forwarded it verbatim to ``imageio.get_writer``. Two reasons to remove it: 1. **It's the only imageio-shaped knob in the public API.** Every other recorder method (`start`, `record`, `save`, `discard`) is library-agnostic — only ``writer_kwargs`` would break callers if we ever swapped the encode backend. Removing it makes the surface fully encapsulated, so the imageio dep is a pure implementation detail. 2. **Currently unused.** RoboMME's wiring doesn't pass it; tests don't pass it; no caller in the tree depends on it. Speculative API surface that doesn't carry weight. If a real codec/quality-tuning need lands later, it's better added back with a deliberate design (e.g. a backend selector + a neutralized kwarg shape) than left as an imageio leak. ## Widen docstring line wraps Project ``[tool.ruff].line-length`` is 119 but the recording.py docstrings were wrapping at ~70 chars (commit-message habit). That made the docstrings noisier than they needed to be — every paragraph broke 3-4 times where it could have been 1-2. Reflowed to land near ~100 chars (well under the 119 limit, leaving a margin for further edits). Same content, ~30% fewer lines. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4e89c6d commit 64207d1

4 files changed

Lines changed: 666 additions & 34 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ license = {text = "Apache-2.0"}
1212
dependencies = [
1313
"anyio>=4.0.0",
1414
"filelock>=3.0",
15+
"imageio[ffmpeg]", # Streaming mp4 writer for benchmarks/recording.py
1516
"lazyregistry>=0.4.0", # Python 3.8 support
1617
"msgpack>=1.0.0",
1718
"numpy>=1.24",
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
"""Per-episode video recording helper, shared across benchmarks.
2+
3+
Most benchmarks have a "save the agent's view of each episode as an mp4" need (failure-case debugging,
4+
demo browsing, qualitative analysis). This module is one home for the pattern so each benchmark doesn't
5+
reinvent it.
6+
7+
## Design notes
8+
9+
* **Streaming write**: frames are encoded to disk as they arrive (``imageio.get_writer`` + ``append_data``).
10+
Memory is O(1) regardless of episode length — a 1300-step 256×256×3 episode that would have buffered
11+
~250 MB now holds one frame at a time.
12+
* **Filename can encode success/fail status**: the final filename often depends on whether the episode
13+
succeeded, which isn't known until the episode ends. ``record()`` therefore writes to a hidden working
14+
file (``.recorder-<uid>.mp4`` in ``output_dir``); ``save()`` resolves the final filename and renames the
15+
working file into place.
16+
* **Required-context fail-fast**: ``filename`` declares the template, and ``required_context`` declares
17+
the keys the caller will pass at ``start()``. Missing keys raise at ``start()``, before frames are
18+
recorded — not as a silent ``KeyError`` -> dropped mp4 at ``save()``. For ``str.format`` templates,
19+
``required_context`` is auto-derived from the field names so callers don't have to repeat themselves.
20+
* **Collision detection at save**: if the resolved final path already exists and the recorder wasn't
21+
constructed with ``overwrite=True``, ``save()`` raises ``FileExistsError`` and leaves the working file
22+
on disk so the caller can recover the frames.
23+
24+
## Filename layout
25+
26+
Two non-obvious things that catch users out:
27+
28+
1. **Zero-pad ``episode_idx``**: ``"ep{episode_idx:04d}"`` not ``"ep{episode_idx}"`` — alphabetic sort
29+
otherwise puts ``ep10`` before ``ep2``.
30+
2. **Put the field you'd want to scan adjacent files by first.** For multi-camera recording (front +
31+
wrist + ...), the views of the same episode are usually what you want to compare side-by-side, so
32+
``"ep{episode_idx:04d}_{view}_{status}.mp4"`` keeps them adjacent. For multi-task single-camera, the
33+
task is more useful first: ``"{task}_ep{episode_idx:04d}_{status}.mp4"``.
34+
35+
## Caller pattern
36+
37+
recorder = EpisodeVideoRecorder(
38+
output_dir="/workspace/results/videos",
39+
filename="{env_id}_ep{episode_idx:04d}_{status}.mp4",
40+
# required_context is auto-derived for str templates → ("env_id", "episode_idx")
41+
fps=20,
42+
)
43+
44+
# In benchmark.reset(task):
45+
recorder.start({"env_id": task["env_id"], "episode_idx": task["episode_idx"]})
46+
recorder.record(initial_frame)
47+
48+
# In benchmark.step(action):
49+
recorder.record(frame)
50+
51+
# In benchmark.get_step_result(step_result):
52+
recorder.save(status="success" if success else "fail")
53+
54+
# In benchmark.cleanup():
55+
recorder.discard() # drops any in-flight working file
56+
57+
Each recorder records a single stream. To capture multiple views (e.g. front + wrist), construct one
58+
recorder per view; they don't share state.
59+
"""
60+
61+
from __future__ import annotations
62+
63+
import logging
64+
import os
65+
import string
66+
import uuid
67+
from collections.abc import Callable, Mapping, Sequence
68+
from pathlib import Path
69+
from typing import TYPE_CHECKING, Any
70+
71+
if TYPE_CHECKING:
72+
import numpy as np
73+
74+
logger = logging.getLogger(__name__)
75+
76+
# A `str.format`-style template, or a callable taking the resolved context (caller's start() context +
77+
# injected ``status``) and returning a filename relative to ``output_dir``.
78+
FilenameSpec = str | Callable[[Mapping[str, Any]], str]
79+
80+
81+
class EpisodeVideoRecorder:
82+
"""Streaming per-episode video recorder.
83+
84+
Lifecycle: ``start()`` → ``record()`` × N → ``save()`` (or ``discard()``). ``start()`` may be called
85+
again to begin a new episode; if a previous episode never reached ``save()``/``discard()``, its
86+
working file is dropped first.
87+
88+
Inactive (no episode in progress) is a valid state: ``record()`` / ``save()`` / ``discard()`` are
89+
no-ops then, so callers don't need defensive ``if recorder.active`` checks.
90+
"""
91+
92+
def __init__(
93+
self,
94+
output_dir: str | os.PathLike[str],
95+
filename: FilenameSpec,
96+
required_context: Sequence[str] | None = None,
97+
fps: int = 20,
98+
overwrite: bool = False,
99+
) -> None:
100+
"""
101+
Args:
102+
output_dir: Directory the final mp4 lands in. Created on first ``start()`` if missing.
103+
Filename templates may include subdirectories (``"{suite}/{task}_..."``); intermediate
104+
dirs are created at ``save()`` time.
105+
filename: ``str.format`` template or callable producing the filename relative to
106+
``output_dir``. Resolved at ``save()`` time over ``{**start_context, "status": status}``.
107+
Required because every benchmark identifies tasks differently (``env_id``, ``task_id``,
108+
``suite/task``); there is no universally safe default.
109+
required_context: Keys that must be present in the dict passed to ``start()``. ``ValueError``
110+
is raised at ``start()`` if any are missing. When ``None`` (the default) and ``filename``
111+
is a ``str.format`` template, this is auto-derived from the template's field names
112+
(``status`` excluded, since ``save()`` injects it) — callers don't have to repeat
113+
themselves. When the ``filename`` is a callable, this must be specified explicitly:
114+
there's no way to introspect a callable's key dependencies. An explicit value is allowed
115+
to be a subset of the template's keys (i.e. some keys can be optional — they'll fail at
116+
``save()`` time if ultimately missing).
117+
fps: Output framerate.
118+
overwrite: When False (default), ``save()`` raises ``FileExistsError`` if the resolved final
119+
path is already taken. When True, an existing file is replaced.
120+
"""
121+
self.output_dir = Path(output_dir)
122+
self._filename_spec = filename
123+
self.fps = fps
124+
if required_context is None:
125+
if not isinstance(filename, str):
126+
raise ValueError(
127+
"required_context must be specified when filename is a callable; "
128+
"the recorder can't introspect callables to discover key dependencies."
129+
)
130+
required_context = _fields_from_template(filename)
131+
self._required_context = tuple(required_context)
132+
self._overwrite = overwrite
133+
134+
# One working file per recorder instance, reused across episodes. Hidden (`.recorder-`) so it
135+
# doesn't show up in casual listings, uuid-suffixed so concurrent recorders sharing output_dir
136+
# don't collide.
137+
self._working_path = self.output_dir / f".recorder-{uuid.uuid4().hex[:12]}.mp4"
138+
139+
# Lifecycle state — None whenever no episode is in progress.
140+
self._writer: Any = None
141+
self._context: dict[str, Any] | None = None
142+
self._frames_written = 0
143+
# Latched on the first record() failure so the writer being wedged (corrupt subprocess pipe, etc.)
144+
# doesn't produce one warning per step.
145+
self._record_failed = False
146+
147+
@property
148+
def active(self) -> bool:
149+
return self._writer is not None
150+
151+
def start(self, context: Mapping[str, Any]) -> None:
152+
"""Begin a new episode.
153+
154+
Validates required context keys, opens a streaming writer to the working file. If a previous
155+
episode is still in flight (no ``save()`` / ``discard()``) it is dropped first. On writer-open
156+
failure the recorder stays inactive and subsequent ``record()`` / ``save()`` are no-ops; the
157+
failure is logged.
158+
"""
159+
missing = [k for k in self._required_context if k not in context]
160+
if missing:
161+
raise ValueError(f"EpisodeVideoRecorder.start: missing required context keys: {missing}")
162+
163+
if self.active:
164+
self.discard()
165+
166+
self._context = dict(context)
167+
self._frames_written = 0
168+
self._record_failed = False
169+
try:
170+
self.output_dir.mkdir(parents=True, exist_ok=True)
171+
import imageio
172+
173+
self._writer = imageio.get_writer(str(self._working_path), fps=self.fps)
174+
except Exception as e:
175+
logger.warning("Failed to open video writer for context=%r: %s", self._context, e)
176+
self._writer = None
177+
self._context = None
178+
_safe_unlink(self._working_path)
179+
180+
def record(self, frame: np.ndarray) -> None:
181+
"""Append a frame to the in-flight episode.
182+
183+
On the default ``.mp4`` / ffmpeg path, imageio serializes the frame via ``ndarray.tobytes()``
184+
before piping it to the encoder subprocess — that's a synchronous copy, so the caller can mutate
185+
the underlying buffer once this returns. If you configure a non-ffmpeg writer that retains
186+
references (e.g. the pillow plugin appends ``Image.fromarray(arr)`` to a list flushed at close),
187+
pass copies yourself.
188+
189+
No-op if no episode is in progress. The first encode failure latches the recorder so subsequent
190+
``record()`` calls become no-ops rather than flooding the log.
191+
"""
192+
if not self.active or self._record_failed:
193+
return
194+
try:
195+
self._writer.append_data(frame)
196+
self._frames_written += 1
197+
except Exception as e:
198+
logger.warning(
199+
"record() failed for context=%r at frame %d: %s; remaining frames will be dropped",
200+
self._context,
201+
self._frames_written,
202+
e,
203+
)
204+
self._record_failed = True
205+
206+
def save(self, status: str = "success") -> Path | None:
207+
"""Finalize the in-flight episode.
208+
209+
Closes the writer, resolves the final filename from ``{**context, "status": status}``, and moves
210+
the working file into place. Returns the final ``Path``, or ``None`` if the recorder was inactive
211+
or filename resolution / writer close failed.
212+
213+
Raises:
214+
FileExistsError: if the resolved final path already exists and the recorder was constructed
215+
with ``overwrite=False`` (the default). The working file is left on disk so the caller
216+
can recover the frames manually.
217+
"""
218+
if not self.active:
219+
return None
220+
221+
writer, context = self._writer, self._context
222+
frames_written = self._frames_written
223+
# Reset state up front: any return path below leaves the recorder inactive.
224+
self._writer = None
225+
self._context = None
226+
self._frames_written = 0
227+
self._record_failed = False
228+
229+
try:
230+
writer.close()
231+
except Exception as e:
232+
logger.warning("Failed to close video writer for context=%r: %s", context, e)
233+
_safe_unlink(self._working_path)
234+
return None
235+
236+
try:
237+
relative_name = self._resolve_filename({**(context or {}), "status": status})
238+
except Exception as e:
239+
logger.warning("Failed to resolve filename for context=%r status=%r: %s", context, status, e)
240+
_safe_unlink(self._working_path)
241+
return None
242+
243+
final_path = self.output_dir / relative_name
244+
if final_path.exists() and not self._overwrite:
245+
raise FileExistsError(
246+
f"{final_path} already exists. Recorded frames are at {self._working_path}. "
247+
f"Pass overwrite=True to replace, or rename the working file manually."
248+
)
249+
250+
try:
251+
final_path.parent.mkdir(parents=True, exist_ok=True)
252+
os.replace(str(self._working_path), str(final_path))
253+
except Exception as e:
254+
logger.warning("Failed to move working file to %s for context=%r: %s", final_path, context, e)
255+
_safe_unlink(self._working_path)
256+
return None
257+
258+
logger.info("Saved episode video: %s (%d frames)", final_path, frames_written)
259+
return final_path
260+
261+
def discard(self) -> None:
262+
"""Abandon the in-flight episode without producing an mp4.
263+
264+
Closes the writer (best-effort) and removes the working file. Safe to call when no episode is in
265+
progress (no-op).
266+
"""
267+
writer = self._writer
268+
self._writer = None
269+
self._context = None
270+
self._frames_written = 0
271+
self._record_failed = False
272+
if writer is not None:
273+
try:
274+
writer.close()
275+
except Exception:
276+
pass
277+
_safe_unlink(self._working_path)
278+
279+
def _resolve_filename(self, context: Mapping[str, Any]) -> str:
280+
spec = self._filename_spec
281+
if isinstance(spec, str):
282+
return spec.format(**context)
283+
return spec(context)
284+
285+
286+
def _safe_unlink(path: Path) -> None:
287+
try:
288+
path.unlink(missing_ok=True)
289+
except Exception:
290+
pass
291+
292+
293+
def _fields_from_template(template: str) -> tuple[str, ...]:
294+
"""Extract top-level field names from a ``str.format`` template.
295+
296+
``status`` is excluded — ``save()`` always injects it and it would just cause a spurious "missing
297+
required context key" failure if treated as required. Format specs (``:04d``) and attribute /
298+
indexing access are stripped: ``"{episode_idx:04d}"`` → ``"episode_idx"``, ``"{task.name}"`` →
299+
``"task"``. Order is preserved (first occurrence wins) so error messages are deterministic.
300+
"""
301+
seen: list[str] = []
302+
for _, field_name, _, _ in string.Formatter().parse(template):
303+
if not field_name:
304+
continue
305+
# `field_name` can be "name", "name.attr", "name[0]", or just "0" for positional.
306+
bare = field_name.split(".", 1)[0].split("[", 1)[0]
307+
if bare and bare != "status" and bare not in seen:
308+
seen.append(bare)
309+
return tuple(seen)

0 commit comments

Comments
 (0)