Skip to content

Commit 31ff8d8

Browse files
core: services: recoder_extractor: Add mcap recover
Signed-off-by: Patrick José Pereira <patrickelectric@gmail.com>
1 parent 27207a5 commit 31ff8d8

File tree

1 file changed

+92
-0
lines changed
  • core/services/recorder_extractor

1 file changed

+92
-0
lines changed

core/services/recorder_extractor/main.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import asyncio
44
import contextlib
55
import logging
6+
import shutil
7+
import tempfile
68
from functools import wraps
79
from io import BytesIO
810
from pathlib import Path
@@ -101,6 +103,93 @@ def parse_duration_ns(discover_output: str) -> int:
101103
return duration_ns
102104

103105

106+
async def check_and_recover_mcap(mcap_path: Path) -> None:
107+
"""
108+
Check if mcap binary is available, run mcap doctor on the file,
109+
and if it fails, run mcap recover to fix the file.
110+
"""
111+
# Check if mcap binary exists
112+
mcap_binary = shutil.which("mcap")
113+
if not mcap_binary:
114+
logger.warning("mcap binary not found, skipping doctor/recover check")
115+
return
116+
117+
# Ensure path exists and is a file
118+
if not mcap_path.exists() or not mcap_path.is_file():
119+
logger.debug(f"MCAP file not found or not a file: {mcap_path}")
120+
return
121+
122+
logger.info(f"Running mcap doctor on {mcap_path}")
123+
# Run mcap doctor
124+
doctor_cmd = ["mcap", "doctor", str(mcap_path)]
125+
doctor_proc = await asyncio.create_subprocess_exec(
126+
*doctor_cmd,
127+
stdout=asyncio.subprocess.PIPE,
128+
stderr=asyncio.subprocess.PIPE,
129+
text=False,
130+
)
131+
stdout_bytes, stderr_bytes = await doctor_proc.communicate()
132+
stdout = stdout_bytes.decode("utf-8", "ignore")
133+
stderr = stderr_bytes.decode("utf-8", "ignore")
134+
135+
if doctor_proc.returncode == 0:
136+
logger.info(f"mcap doctor passed for {mcap_path}: {stdout.strip()}")
137+
return
138+
139+
logger.warning(f"mcap doctor failed for {mcap_path} (code={doctor_proc.returncode}): {stderr.strip()}")
140+
logger.info(f"Attempting to recover {mcap_path}")
141+
142+
# Create a temporary file path in the same directory as the mcap file
143+
# This ensures atomic replacement on the same filesystem
144+
tmp_path = None
145+
try:
146+
with tempfile.NamedTemporaryFile(delete=False, dir=mcap_path.parent, suffix=".recover") as tmpfile:
147+
tmp_path = Path(tmpfile.name)
148+
149+
recover_cmd = ["mcap", "recover", str(mcap_path), "-o", str(tmp_path)]
150+
recover_proc = await asyncio.create_subprocess_exec(
151+
*recover_cmd,
152+
stdout=asyncio.subprocess.PIPE,
153+
stderr=asyncio.subprocess.PIPE,
154+
text=False,
155+
)
156+
recover_stdout_bytes, recover_stderr_bytes = await recover_proc.communicate()
157+
recover_stdout = recover_stdout_bytes.decode("utf-8", "ignore")
158+
recover_stderr = recover_stderr_bytes.decode("utf-8", "ignore")
159+
160+
# Check if recovery succeeded
161+
if recover_proc.returncode != 0:
162+
logger.error(
163+
f"mcap recover command failed for {mcap_path} (code={recover_proc.returncode}): {recover_stderr.strip()}",
164+
)
165+
return
166+
167+
if not tmp_path.exists():
168+
logger.error(f"mcap recover did not create output file: {tmp_path}")
169+
return
170+
171+
if tmp_path.stat().st_size == 0:
172+
logger.error(f"mcap recover produced empty file: {tmp_path}")
173+
return
174+
175+
# Atomically replace the original file with the recovered one
176+
# Using replace ensures atomic operation
177+
tmp_path.replace(mcap_path)
178+
logger.info(f"Successfully recovered {mcap_path} (recovered size: {mcap_path.stat().st_size} bytes)")
179+
tmp_path = None # Mark as successfully moved to prevent cleanup
180+
except OSError as exception:
181+
logger.error(f"Failed to replace original file after mcap recover: {exception}")
182+
except Exception as exception:
183+
logger.exception(f"Unexpected error during mcap recover: {exception}")
184+
finally:
185+
# Clean up temporary file if it still exists
186+
if tmp_path is not None and tmp_path.exists():
187+
try:
188+
tmp_path.unlink()
189+
except OSError as exception:
190+
logger.error(f"Failed to clean up temporary file {tmp_path}: {exception}")
191+
192+
104193
@cached()
105194
async def build_thumbnail_bytes(path: Path) -> bytes:
106195
"""
@@ -187,6 +276,9 @@ async def extract_mcap_recordings() -> None:
187276
logger.info(f"Skipping MCAP extract, file in use: {mcap_path}")
188277
continue
189278

279+
# Check and recover MCAP file if mcap binary is available
280+
await check_and_recover_mcap(mcap_path)
281+
190282
command = [
191283
"mcap-foxglove-video-extract",
192284
str(mcap_path),

0 commit comments

Comments
 (0)