1111
1212from urllib .parse import quote
1313
14+ from aiocache import cached
1415from commonwealth .utils .apis import GenericErrorHandlingRoute , PrettyJSONResponse
1516from commonwealth .utils .general import file_is_open_async
1617from commonwealth .utils .logs import InterceptHandler , init_logger
2627RECORDER_DIR = Path ("/usr/blueos/userdata/recorder" )
2728PORT = 9150
2829
30+ # Prevent thumbnails from being generated while MCAP extraction is running
31+ thumbnail_lock = asyncio .Lock ()
32+
2933logging .basicConfig (handlers = [InterceptHandler ()], level = logging .DEBUG )
3034init_logger (SERVICE_NAME )
3135logger .info ("Starting Recorder Extractor service" )
@@ -86,21 +90,31 @@ def parse_duration_ns(discover_output: str) -> int:
8690 return duration_ns
8791
8892
89- def build_thumbnail_bytes (path : Path ) -> bytes :
93+ @cached
94+ async def build_thumbnail_bytes (path : Path ) -> bytes :
9095 """
91- Extract a single JPEG frame from the recording using a raw gst-launch pipeline.
96+ Extract a single JPEG frame from the recording using a raw gst-launch pipeline (ASYNC) .
9297
9398 Seek to the middle of the file, scale to 320x180, and encode as JPEG. If any step fails,
9499 propagate an HTTP 500 so callers can fall back.
95100 """
96101 # 1) Discover duration (nanoseconds) using gst-discoverer
97102 discover_cmd = ["gst-discoverer-1.0" , f"file://{ path } " ]
98- discover = subprocess .run (discover_cmd , capture_output = True , text = True , check = False )
99- if discover .returncode != 0 :
100- logger .error ("gst-discoverer-1.0 failed for %s: %s" , path , discover .stderr .strip ())
101- raise HTTPException (status_code = status .HTTP_500_INTERNAL_SERVER_ERROR , detail = "Failed to inspect recording." )
103+ discover_proc = await asyncio .create_subprocess_exec (
104+ * discover_cmd ,
105+ stdout = asyncio .subprocess .PIPE ,
106+ stderr = asyncio .subprocess .PIPE ,
107+ text = True ,
108+ )
109+ stdout , stderr = await discover_proc .communicate ()
110+ if discover_proc .returncode != 0 :
111+ logger .error (f"gst-discoverer-1.0 failed for { path } : { stderr .strip ()} " )
112+ raise HTTPException (
113+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
114+ detail = "Failed to inspect recording." ,
115+ )
102116
103- duration_ns = parse_duration_ns (discover . stdout )
117+ duration_ns = parse_duration_ns (stdout )
104118 target_ns = duration_ns // 2 if duration_ns > 0 else 0
105119 target_sec = target_ns / 1_000_000_000
106120
@@ -125,13 +139,21 @@ def build_thumbnail_bytes(path: Path) -> bytes:
125139 f"Thumbnail target: duration_ns={ duration_ns } target_ns={ target_ns } target_sec={ target_sec :.3f} file={ path } "
126140 )
127141 logger .info (f"Thumbnail command: { ' ' .join (play_cmd )} " )
128- result = subprocess .run (play_cmd , capture_output = True , check = False )
129- if result .returncode != 0 or not result .stdout :
130- stderr = result .stderr .decode ("utf-8" , "ignore" )
131- logger .error (f"gst-play-1.0 failed for { path } (code={ result .returncode } ): { stderr } " )
132- raise HTTPException (status_code = status .HTTP_500_INTERNAL_SERVER_ERROR , detail = "Failed to generate thumbnail." )
142+ play_proc = await asyncio .create_subprocess_exec (
143+ * play_cmd ,
144+ stdout = asyncio .subprocess .PIPE ,
145+ stderr = asyncio .subprocess .PIPE ,
146+ )
147+ out_bytes , err_bytes = await play_proc .communicate ()
148+ if play_proc .returncode != 0 or not out_bytes :
149+ stderr = err_bytes .decode ("utf-8" , "ignore" )
150+ logger .error (f"gst-play-1.0 failed for { path } (code={ play_proc .returncode } ): { stderr } " )
151+ raise HTTPException (
152+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
153+ detail = "Failed to generate thumbnail." ,
154+ )
133155
134- return result . stdout
156+ return out_bytes
135157
136158
137159async def extract_mcap_recordings () -> None :
@@ -159,27 +181,21 @@ async def extract_mcap_recordings() -> None:
159181 str (output_dir ),
160182 ]
161183 logger .info (f"Extracting MCAP video to { output_dir } with command: { ' ' .join (command )} " )
162- process = await asyncio .create_subprocess_exec (
163- * command ,
164- stdout = asyncio .subprocess .PIPE ,
165- stderr = asyncio .subprocess .PIPE ,
166- )
167- stdout , stderr = await process .communicate ()
184+ async with thumbnail_lock :
185+ process = await asyncio .create_subprocess_exec (
186+ * command ,
187+ stdout = asyncio .subprocess .PIPE ,
188+ stderr = asyncio .subprocess .PIPE ,
189+ )
190+ stdout , stderr = await process .communicate ()
168191 if process .returncode != 0 :
169192 logger .error (
170- "MCAP extract failed for %s (code=%s): %s" ,
171- mcap_path ,
172- process .returncode ,
173- stderr .decode ("utf-8" , "ignore" ),
193+ f"MCAP extract failed for { mcap_path } (code={ process .returncode } ): { stderr .decode ('utf-8' , 'ignore' )} " ,
174194 )
175195 else :
176- logger .info (
177- "MCAP extract completed for %s: %s" ,
178- mcap_path ,
179- stdout .decode ("utf-8" , "ignore" ).strip (),
180- )
196+ logger .info (f"MCAP extract completed for { mcap_path } : { stdout .decode ('utf-8' , 'ignore' ).strip ()} " )
181197 except Exception as exception :
182- logger .exception ("MCAP extraction loop failed: %s" , exception )
198+ logger .exception (f "MCAP extraction loop failed: { exception } " )
183199
184200
185201def to_http_exception (endpoint : Callable [..., Any ]) -> Callable [..., Any ]:
@@ -245,7 +261,8 @@ async def list_recordings() -> List[RecordingFile]:
245261@to_http_exception
246262async def get_recording_thumbnail (filename : str ) -> StreamingResponse :
247263 path = resolve_recording (filename )
248- thumbnail_bytes = await asyncio .to_thread (build_thumbnail_bytes , path )
264+ async with thumbnail_lock :
265+ thumbnail_bytes = await build_thumbnail_bytes (path )
249266 return StreamingResponse (BytesIO (thumbnail_bytes ), media_type = "image/jpeg" )
250267
251268
@@ -260,7 +277,7 @@ async def delete_recording(filename: str) -> None:
260277 try :
261278 path .unlink ()
262279 except Exception as exception :
263- logger .exception ("Failed to delete recording %s" , filename )
280+ logger .exception (f "Failed to delete recording { filename } " )
264281 raise HTTPException (
265282 status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
266283 detail = "Failed to delete recording." ,
0 commit comments