Skip to content

Commit 7470415

Browse files
committed
refactor: extract per-episode video recorder into shared module
#53 (RoboMME) wires episode mp4 saving directly into `RoboMMEBenchmark` — a buffered list of frames + an `imageio.mimsave` at the end of every episode. That pattern is going to recur in every benchmark that wants agent-view captures (failure-case debugging, demo browsing, public showcase, qualitative analysis), so lift it out of RoboMME and into `vla_eval.benchmarks.recording.EpisodeVideoRecorder`. Improvements over the inline implementation in #53: - **Streaming write** via `imageio.get_writer().append_data()` — O(1) memory regardless of episode length. A 1300-step 256×256×3 episode that previously held ~250 MB of pending frames now holds one. A side benefit: a partially-written mp4 is left on disk if the process is killed mid-episode, playable up to the last completed frame, useful for debugging crashes. - **Atomic finalize** via `tempfile.mkstemp` in the output dir + `os.replace` on `save()`. Concurrent jobs sharing a directory don't collide on partially-written files. - **Logging-style filename templating**: `str.format` template (or callable) over a context dict the caller passes at `start()`, plus a `status` key injected at `save()`. `filename` and `required_context` are required at construction time — every benchmark identifies tasks differently (`env_id`, `task_id`, `suite/task` …) so there is no universal default. Required-context validation catches mismatched keys at `start()` rather than as a silent dropped mp4 at `save()` time. - **Latched failure**: first `record()` failure disables the recorder for the rest of the episode so a wedged writer subprocess doesn't flood the log with one warning per step. `RoboMMEBenchmark` now constructs an `EpisodeVideoRecorder` (when `save_episode_video=True`) and routes the same `reset → step → save` hooks through it. The previous public flags (`save_episode_video`, `video_dir`) and behavior are preserved; the internal `_episode_frames` buffer + `_save_episode_video` method are removed. `imageio[ffmpeg]` is added as a runtime dep so the recorder works on a base install (it was previously an undeclared transitive of the RoboMME image only). `tests/test_recording.py` (16 tests, 0.8 s) covers the lifecycle: happy path, str + callable filename templating, subdirectory templates, missing template key at save, required-context validation, no-op semantics for record/save/discard before start, writer-open failure, mid-episode start-again cleanup, lazy output-dir creation, str path support.
1 parent 9c7ca4c commit 7470415

4 files changed

Lines changed: 583 additions & 34 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ license = {text = "Apache-2.0"}
1212
dependencies = [
1313
"anyio>=4.0.0",
1414
"filelock>=3.0",
15+
"imageio[ffmpeg]>=2.20", # Streaming mp4 writer for benchmarks/recording.py
1516
"lazyregistry>=0.4.0", # Python 3.8 support
1617
"msgpack>=1.0.0",
1718
"numpy>=1.24",
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
"""Per-episode video recording helper, shared across benchmarks.
2+
3+
Most benchmarks have a "save the agent's view of each episode as an mp4"
4+
need (failure-case debugging, demo browsing, public showcase, qualitative
5+
analysis). This module is a single home for that pattern so each
6+
benchmark doesn't reinvent it.
7+
8+
## Design
9+
10+
* **Streaming**: frames are encoded to disk as they arrive
11+
(``imageio.get_writer`` + ``append_data``) rather than buffered in RAM.
12+
Memory is O(1) regardless of episode length; a 1300-step 256×256×3
13+
episode that previously held ~250 MB now holds one frame at a time.
14+
A side benefit is that a partially-written mp4 is left on disk if the
15+
process is killed mid-episode — playable up to the last completed
16+
frame, useful for debugging crashes.
17+
* **Atomic finalize**: frames stream into a tempfile in the same output
18+
directory; ``save()`` resolves the final filename (which usually
19+
depends on success/failure status) and ``os.replace``-s the tempfile
20+
into place. Concurrent jobs sharing a directory don't collide.
21+
* **Logging-style filename templating**: the filename is a ``str.format``
22+
template (or callable) over a context dict that the caller passes at
23+
``start()`` time, plus a ``status`` key injected at ``save()`` time.
24+
``filename`` and ``required_context`` are required at construction time
25+
— there is no universal default because every benchmark names tasks
26+
differently (``env_id``, ``task_id``, ``suite/task`` …). Forcing the
27+
caller to spell it out catches mismatched context keys at ``start()``
28+
rather than as a silent dropped mp4 at ``save()``.
29+
* **Best-effort**: every encode-side failure logs a warning with the
30+
context for debuggability and clears state — a corrupted video should
31+
never bring down an otherwise good eval episode.
32+
33+
## Caller pattern
34+
35+
from vla_eval.benchmarks.recording import EpisodeVideoRecorder
36+
37+
recorder = EpisodeVideoRecorder(
38+
output_dir="/workspace/results/videos",
39+
filename="{env_id}_ep{episode_idx}_{status}.mp4",
40+
required_context=("env_id", "episode_idx"),
41+
fps=20,
42+
)
43+
44+
# In benchmark.reset(task):
45+
recorder.start({"env_id": task["env_id"], "episode_idx": task["episode_idx"]})
46+
recorder.record(initial_frame)
47+
48+
# In benchmark.step(action):
49+
recorder.record(frame) # default ffmpeg path copies via .tobytes() —
50+
# see record() docstring for backend caveats
51+
52+
# In benchmark.get_step_result(step_result):
53+
recorder.save(status="success" if success else "fail")
54+
55+
# In benchmark.cleanup():
56+
recorder.discard() # drops any in-flight writer + tempfile
57+
58+
## Multi-camera
59+
60+
The recorder is single-stream by design. Benchmarks with multiple views
61+
(e.g. front + wrist) instantiate one recorder per view with different
62+
``filename`` templates (e.g. ``"{view}_ep{episode_idx}_{status}.mp4"``,
63+
substituting the view name into the context at ``start()``). This keeps
64+
each recorder simple and lets callers mix-and-match resolutions, fps,
65+
and codecs per view.
66+
"""
67+
68+
from __future__ import annotations
69+
70+
import logging
71+
import os
72+
import tempfile
73+
from collections.abc import Callable, Mapping, Sequence
74+
from pathlib import Path
75+
from typing import TYPE_CHECKING, Any
76+
77+
if TYPE_CHECKING:
78+
import numpy as np
79+
80+
logger = logging.getLogger(__name__)
81+
82+
# Either a `str.format`-style template or a callable that takes the resolved
83+
# context (caller's start() context + injected ``status``) and returns a
84+
# filename relative to ``output_dir``.
85+
FilenameSpec = str | Callable[[Mapping[str, Any]], str]
86+
87+
88+
class EpisodeVideoRecorder:
89+
"""Streaming per-episode video recorder.
90+
91+
Lifecycle: ``start()`` → ``record()`` × N → ``save()`` (or
92+
``discard()``). ``start()`` may be called again to begin a new
93+
episode; if a previous episode never reached ``save()``/``discard()``
94+
(e.g. orchestrator crash) the in-flight writer is closed and its
95+
tempfile cleaned up first.
96+
97+
Inactive (no episode in progress) is a valid state: ``record()`` /
98+
``save()`` / ``discard()`` are no-ops in that case so callers don't
99+
need defensive ``if recorder.active`` checks.
100+
"""
101+
102+
def __init__(
103+
self,
104+
output_dir: str | os.PathLike[str],
105+
filename: FilenameSpec,
106+
required_context: Sequence[str],
107+
fps: int = 20,
108+
writer_kwargs: Mapping[str, Any] | None = None,
109+
) -> None:
110+
"""
111+
Args:
112+
output_dir: Directory the final mp4 lands in. Created if missing.
113+
Filename templates may include subdirectories (e.g.
114+
``"{suite}/{task}_..."``); intermediate dirs are also created.
115+
filename: ``str.format`` template or callable producing the
116+
filename relative to ``output_dir``. Resolved at ``save()``
117+
time over ``{**start_context, "status": status}``. Required
118+
because every benchmark identifies tasks differently
119+
(``env_id``, ``task_id``, ``suite/task``) — there is no
120+
universally safe default.
121+
required_context: Keys that must be present in the dict passed to
122+
``start()``. ``ValueError`` is raised at ``start()`` if any
123+
are missing. Required so callers explicitly declare the
124+
template's expectations; failing fast at ``start()`` avoids
125+
a silent ``KeyError`` -> dropped mp4 at ``save()`` time.
126+
Should include every key the ``filename`` template references.
127+
fps: Output framerate.
128+
writer_kwargs: Extra kwargs forwarded to ``imageio.get_writer``
129+
(e.g. ``{"codec": "libx264", "quality": 8}``).
130+
"""
131+
self.output_dir = Path(output_dir)
132+
self._filename_spec = filename
133+
self.fps = fps
134+
self._required_context = tuple(required_context)
135+
self._writer_kwargs = dict(writer_kwargs or {})
136+
137+
# Lifecycle state — None whenever no episode is in progress.
138+
self._writer: Any = None
139+
self._tempfile: Path | None = None
140+
self._context: dict[str, Any] | None = None
141+
self._frames_written = 0
142+
# Latched on the first record() failure so we don't log-spam every
143+
# subsequent step with the same warning when the underlying writer
144+
# is wedged (corrupt subprocess pipe, etc.).
145+
self._record_failed = False
146+
147+
@property
148+
def active(self) -> bool:
149+
return self._writer is not None
150+
151+
def start(self, context: Mapping[str, Any]) -> None:
152+
"""Begin a new episode.
153+
154+
Validates required context keys, opens a streaming writer to a
155+
tempfile in ``output_dir``. If a previous episode is still in
156+
flight (no ``save``/``discard`` called) it is discarded first.
157+
On writer-open failure the recorder stays inactive and subsequent
158+
``record()``/``save()`` are no-ops; the failure is logged.
159+
"""
160+
missing = [k for k in self._required_context if k not in context]
161+
if missing:
162+
raise ValueError(f"EpisodeVideoRecorder.start: missing required context keys: {missing}")
163+
164+
if self.active:
165+
self.discard()
166+
167+
self._context = dict(context)
168+
self._record_failed = False
169+
try:
170+
self.output_dir.mkdir(parents=True, exist_ok=True)
171+
# Tempfile keeps a real `.mp4` suffix so imageio's format
172+
# auto-detection works; the `.recorder-` prefix marks it as ours.
173+
fd, temp_path = tempfile.mkstemp(
174+
prefix=".recorder-",
175+
suffix=".mp4",
176+
dir=str(self.output_dir),
177+
)
178+
os.close(fd)
179+
self._tempfile = Path(temp_path)
180+
181+
import imageio
182+
183+
self._writer = imageio.get_writer(str(self._tempfile), fps=self.fps, **self._writer_kwargs)
184+
except Exception as e:
185+
logger.warning("Failed to open video writer for context=%r: %s", self._context, e)
186+
_safe_unlink(self._tempfile)
187+
self._tempfile = None
188+
self._writer = None
189+
self._context = None
190+
191+
def record(self, frame: np.ndarray) -> None:
192+
"""Append a frame to the in-flight episode.
193+
194+
On the default ``.mp4`` / ffmpeg path, imageio serializes the
195+
frame via ``np.ndarray.tobytes()`` before piping it to the
196+
encoder subprocess — that's a synchronous copy, so the caller
197+
can mutate the underlying buffer once this returns. If you
198+
configure a non-ffmpeg writer that retains references (e.g.
199+
the pillow plugin appends ``Image.fromarray(arr)`` to a list
200+
flushed at close), you must pass copies yourself.
201+
202+
No-op if no episode is in progress. The first encode failure
203+
latches the recorder so subsequent ``record()`` calls become
204+
no-ops rather than flooding the log.
205+
"""
206+
if not self.active or self._record_failed:
207+
return
208+
try:
209+
self._writer.append_data(frame)
210+
self._frames_written += 1
211+
except Exception as e:
212+
logger.warning(
213+
"record() failed for context=%r at frame %d: %s; remaining frames in this episode will be dropped",
214+
self._context,
215+
self._frames_written,
216+
e,
217+
)
218+
self._record_failed = True
219+
220+
def save(self, status: str = "success") -> Path | None:
221+
"""Finalize the in-flight episode.
222+
223+
Closes the streaming writer, resolves the final filename from
224+
``{**context, "status": status}``, and atomically moves the
225+
tempfile into place. Returns the final ``Path`` on success,
226+
``None`` if the recorder was inactive or any finalize step
227+
failed. After this call the recorder is inactive again.
228+
"""
229+
if not self.active:
230+
return None
231+
232+
writer, tempfile_path, context = self._writer, self._tempfile, self._context
233+
frames_written = self._frames_written
234+
try:
235+
try:
236+
writer.close()
237+
except Exception as e:
238+
logger.warning("Failed to close video writer for context=%r: %s", context, e)
239+
_safe_unlink(tempfile_path)
240+
return None
241+
242+
try:
243+
relative_name = self._resolve_filename({**(context or {}), "status": status})
244+
except Exception as e:
245+
logger.warning("Failed to resolve filename for context=%r status=%r: %s", context, status, e)
246+
_safe_unlink(tempfile_path)
247+
return None
248+
249+
final_path = self.output_dir / relative_name
250+
try:
251+
final_path.parent.mkdir(parents=True, exist_ok=True)
252+
os.replace(str(tempfile_path), str(final_path))
253+
except Exception as e:
254+
logger.warning("Failed to finalize video at %s for context=%r: %s", final_path, context, e)
255+
_safe_unlink(tempfile_path)
256+
return None
257+
258+
logger.info("Saved episode video: %s (%d frames)", final_path, frames_written)
259+
return final_path
260+
finally:
261+
self._writer = None
262+
self._tempfile = None
263+
self._context = None
264+
self._frames_written = 0
265+
self._record_failed = False
266+
267+
def discard(self) -> None:
268+
"""Abandon the in-flight episode without producing an mp4.
269+
270+
Closes the writer (best-effort) and removes the tempfile. Safe
271+
to call when no episode is in progress (no-op).
272+
"""
273+
writer, tempfile_path = self._writer, self._tempfile
274+
self._writer = None
275+
self._tempfile = None
276+
self._context = None
277+
self._frames_written = 0
278+
self._record_failed = False
279+
if writer is not None:
280+
try:
281+
writer.close()
282+
except Exception:
283+
pass
284+
_safe_unlink(tempfile_path)
285+
286+
def _resolve_filename(self, context: Mapping[str, Any]) -> str:
287+
spec = self._filename_spec
288+
if isinstance(spec, str):
289+
return spec.format(**context)
290+
return spec(context)
291+
292+
293+
def _safe_unlink(path: Path | None) -> None:
294+
if path is None:
295+
return
296+
try:
297+
path.unlink(missing_ok=True)
298+
except Exception:
299+
pass

0 commit comments

Comments
 (0)