2929# Prevent thumbnails from being generated while MCAP extraction is running
3030thumbnail_lock = asyncio .Lock ()
3131
32+ # Track MCAP files currently being processed
33+ processing_mcap_files : set [str ] = set ()
34+
3235logging .basicConfig (handlers = [InterceptHandler ()], level = logging .DEBUG )
3336init_logger (SERVICE_NAME )
3437logger .info ("Starting Recorder Extractor service" )
@@ -44,6 +47,15 @@ class RecordingFile(BaseModel):
4447 thumbnail_url : str
4548
4649
50+ class ProcessingFile (BaseModel ):
51+ name : str
52+ path : str
53+
54+
55+ class ProcessingStatus (BaseModel ):
56+ processing : List [ProcessingFile ]
57+
58+
4759def ensure_recorder_dir () -> Path :
4860 RECORDER_DIR .mkdir (parents = True , exist_ok = True )
4961 return RECORDER_DIR .resolve ()
@@ -183,16 +195,21 @@ async def extract_mcap_recordings() -> None:
183195 str (output_dir ),
184196 ]
185197 logger .info (f"Extracting MCAP video to { output_dir } with command: { ' ' .join (command )} " )
186- async with thumbnail_lock :
187- process = await asyncio .create_subprocess_exec (
188- * command ,
189- stdout = asyncio .subprocess .PIPE ,
190- stderr = asyncio .subprocess .PIPE ,
191- text = False ,
192- )
193- stdout_bytes , stderr_bytes = await process .communicate ()
194- stdout = stdout_bytes .decode ("utf-8" , "ignore" )
195- stderr = stderr_bytes .decode ("utf-8" , "ignore" )
198+ mcap_relative = str (mcap_path .relative_to (base ))
199+ processing_mcap_files .add (mcap_relative )
200+ try :
201+ async with thumbnail_lock :
202+ process = await asyncio .create_subprocess_exec (
203+ * command ,
204+ stdout = asyncio .subprocess .PIPE ,
205+ stderr = asyncio .subprocess .PIPE ,
206+ text = False ,
207+ )
208+ stdout_bytes , stderr_bytes = await process .communicate ()
209+ stdout = stdout_bytes .decode ("utf-8" , "ignore" )
210+ stderr = stderr_bytes .decode ("utf-8" , "ignore" )
211+ finally :
212+ processing_mcap_files .discard (mcap_relative )
196213 if process .returncode != 0 :
197214 logger .error (
198215 f"MCAP extract failed for { mcap_path } (code={ process .returncode } ): { stderr } " ,
@@ -259,6 +276,19 @@ async def list_recordings() -> List[RecordingFile]:
259276 return files
260277
261278
279+ @recorder_router .get (
280+ "/status" ,
281+ response_model = ProcessingStatus ,
282+ summary = "Get MCAP extraction processing status." ,
283+ )
284+ @to_http_exception
285+ async def get_processing_status () -> ProcessingStatus :
286+ """Return MCAP files currently being processed."""
287+ # Snapshot the set with list to avoid RuntimeError from concurrent mutation
288+ processing = [ProcessingFile (name = Path (path ).name , path = path ) for path in list (processing_mcap_files )]
289+ return ProcessingStatus (processing = processing )
290+
291+
262292@recorder_router .get (
263293 "/files/{filename:path}/thumbnail" ,
264294 summary = "Grab a thumbnail from a recording." ,
0 commit comments