diff --git a/cli/Cargo.toml b/cli/Cargo.toml index ab80776e4..c3284e57c 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -16,7 +16,7 @@ serde_json = "1.0" dirs = "5.0" base64 = "0.22" getrandom = "0.2" -tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "time", "sync", "signal"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "time", "sync", "signal", "process"] } tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] } futures-util = "0.3" url = "2" diff --git a/cli/src/native/actions.rs b/cli/src/native/actions.rs index 40ab43681..6b2e0ad21 100644 --- a/cli/src/native/actions.rs +++ b/cli/src/native/actions.rs @@ -1,8 +1,9 @@ use serde_json::{json, Value}; use std::env; use std::io::Write; +use std::sync::atomic::AtomicU64; use std::sync::Arc; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::{broadcast, oneshot, RwLock}; use super::auth; use super::browser::{BrowserManager, WaitUntil}; @@ -166,6 +167,31 @@ impl DaemonState { } } + /// Spawn a background task that polls screenshots and pipes them to ffmpeg. + async fn start_recording_task( + &mut self, + client: Arc, + session_id: String, + ) -> Result<(), String> { + let shared_count = Arc::new(AtomicU64::new(0)); + let (cancel_tx, cancel_rx) = oneshot::channel(); + let handle = recording::spawn_recording_task( + client, + session_id, + self.recording_state.output_path.clone(), + shared_count.clone(), + cancel_rx, + ); + self.recording_state.capture_task = Some(handle); + self.recording_state.shared_frame_count = Some(shared_count); + self.recording_state.cancel_tx = Some(cancel_tx); + Ok(()) + } + + async fn stop_recording_task(&mut self) -> Result<(), String> { + recording::stop_recording_task(&mut self.recording_state).await + } + fn drain_cdp_events( &mut self, ) -> ( @@ -346,21 +372,6 @@ impl DaemonState { } } "Page.screencastFrame" => { - if self.recording_state.active { - if let Some(data) = - event.params.get("data").and_then(|v| v.as_str()) - { - if let Ok(bytes) = base64::Engine::decode( - &base64::engine::general_purpose::STANDARD, - data, - ) { - recording::recording_add_frame( - &mut self.recording_state, - &bytes, - ); - } - } - } if let Some(sid) = event.params.get("sessionId").and_then(|v| v.as_i64()) { @@ -2574,132 +2585,111 @@ async fn handle_recording_start(cmd: &Value, state: &mut DaemonState) -> Result< .and_then(|v| v.as_str()) .filter(|s| !s.is_empty()); - let mgr = state.browser.as_mut().ok_or("Browser not launched")?; - let old_session_id = mgr.active_session_id()?.to_string(); + let (client, new_session_id) = { + let mgr = state.browser.as_mut().ok_or("Browser not launched")?; + let old_session_id = mgr.active_session_id()?.to_string(); - // Capture current URL if no URL specified - let nav_url = if let Some(u) = recording_url { - u.to_string() - } else { - mgr.get_url() - .await - .unwrap_or_else(|_| "about:blank".to_string()) - }; + // Capture current URL if no URL specified + let nav_url = if let Some(u) = recording_url { + u.to_string() + } else { + mgr.get_url() + .await + .unwrap_or_else(|_| "about:blank".to_string()) + }; - // Capture current cookies - let cookies_result = mgr - .client - .send_command_no_params("Network.getAllCookies", Some(&old_session_id)) - .await - .ok(); + // Capture current cookies + let cookies_result = mgr + .client + .send_command_no_params("Network.getAllCookies", Some(&old_session_id)) + .await + .ok(); - // Create new browser context - let ctx_result = mgr - .client - .send_command_no_params("Target.createBrowserContext", None) - .await?; - let context_id = ctx_result - .get("browserContextId") - .and_then(|v| v.as_str()) - .ok_or("Failed to get browserContextId")? - .to_string(); + // Create new browser context + let ctx_result = mgr + .client + .send_command_no_params("Target.createBrowserContext", None) + .await?; + let context_id = ctx_result + .get("browserContextId") + .and_then(|v| v.as_str()) + .ok_or("Failed to get browserContextId")? + .to_string(); - // Create page in new context - let create_result: CreateTargetResult = mgr - .client - .send_command_typed( - "Target.createTarget", - &json!({ "url": "about:blank", "browserContextId": context_id }), - None, - ) - .await?; + // Create page in new context + let create_result: CreateTargetResult = mgr + .client + .send_command_typed( + "Target.createTarget", + &json!({ "url": "about:blank", "browserContextId": context_id }), + None, + ) + .await?; - let attach_result: AttachToTargetResult = mgr - .client - .send_command_typed( - "Target.attachToTarget", - &AttachToTargetParams { - target_id: create_result.target_id.clone(), - flatten: true, - }, - None, - ) - .await?; + let attach_result: AttachToTargetResult = mgr + .client + .send_command_typed( + "Target.attachToTarget", + &AttachToTargetParams { + target_id: create_result.target_id.clone(), + flatten: true, + }, + None, + ) + .await?; - let new_session_id = attach_result.session_id.clone(); - mgr.enable_domains_pub(&new_session_id).await?; + let new_session_id = attach_result.session_id.clone(); + mgr.enable_domains_pub(&new_session_id).await?; - // Transfer cookies to new context - if let Some(ref cr) = cookies_result { - if let Some(cookie_arr) = cr.get("cookies").and_then(|v| v.as_array()) { - if !cookie_arr.is_empty() { - let _ = mgr - .client - .send_command( - "Network.setCookies", - Some(json!({ "cookies": cookie_arr })), - Some(&new_session_id), - ) - .await; + // Transfer cookies to new context + if let Some(ref cr) = cookies_result { + if let Some(cookie_arr) = cr.get("cookies").and_then(|v| v.as_array()) { + if !cookie_arr.is_empty() { + let _ = mgr + .client + .send_command( + "Network.setCookies", + Some(json!({ "cookies": cookie_arr })), + Some(&new_session_id), + ) + .await; + } } } - } - // Add page and switch to it - mgr.add_page(super::browser::PageInfo { - target_id: create_result.target_id, - session_id: new_session_id.clone(), - url: nav_url.clone(), - title: String::new(), - target_type: "page".to_string(), - }); + // Add page and switch to it + mgr.add_page(super::browser::PageInfo { + target_id: create_result.target_id, + session_id: new_session_id.clone(), + url: nav_url.clone(), + title: String::new(), + target_type: "page".to_string(), + }); - // Navigate to URL - if nav_url != "about:blank" { - let _ = mgr - .client - .send_command( - "Page.navigate", - Some(json!({ "url": nav_url })), - Some(&new_session_id), - ) - .await; - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - } + // Navigate to URL + if nav_url != "about:blank" { + let _ = mgr + .client + .send_command( + "Page.navigate", + Some(json!({ "url": nav_url })), + Some(&new_session_id), + ) + .await; + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + } - let result = recording::recording_start(&mut state.recording_state, path)?; + (mgr.client.clone(), new_session_id) + }; - // Start screencast on new page - stream::start_screencast(&mgr.client, &new_session_id, "jpeg", 80, 1280, 720).await?; - state.screencasting = true; + let result = recording::recording_start(&mut state.recording_state, path)?; + state.start_recording_task(client, new_session_id).await?; Ok(result) } async fn handle_recording_stop(state: &mut DaemonState) -> Result { - // Stop screencast - if state.screencasting { - if let Some(ref browser) = state.browser { - if let Ok(session_id) = browser.active_session_id() { - let _ = stream::stop_screencast(&browser.client, session_id).await; - } - } - state.screencasting = false; - } - - // Drain remaining frames before stopping - let (ack_ids, _, _, _) = state.drain_cdp_events(); - if !ack_ids.is_empty() { - if let Some(ref browser) = state.browser { - if let Ok(session_id) = browser.active_session_id() { - for ack_sid in ack_ids { - let _ = - stream::ack_screencast_frame(&browser.client, session_id, ack_sid).await; - } - } - } - } - + state.stop_recording_task().await?; recording::recording_stop(&mut state.recording_state) } @@ -2709,22 +2699,14 @@ async fn handle_recording_restart(cmd: &Value, state: &mut DaemonState) -> Resul .and_then(|v| v.as_str()) .ok_or("Missing 'path' parameter")?; - // Stop screencast, restart recording, start screencast again - if state.screencasting { - if let Some(ref browser) = state.browser { - if let Ok(session_id) = browser.active_session_id() { - let _ = stream::stop_screencast(&browser.client, session_id).await; - } - } - state.screencasting = false; - } - + let _ = state.stop_recording_task().await; let result = recording::recording_restart(&mut state.recording_state, path)?; if let Some(ref browser) = state.browser { let session_id = browser.active_session_id()?.to_string(); - stream::start_screencast(&browser.client, &session_id, "jpeg", 80, 1280, 720).await?; - state.screencasting = true; + state + .start_recording_task(browser.client.clone(), session_id) + .await?; } Ok(result) @@ -4351,8 +4333,9 @@ async fn handle_video_start(cmd: &Value, state: &mut DaemonState) -> Result Result { })); } - if state.screencasting { - if let Some(ref browser) = state.browser { - if let Ok(session_id) = browser.active_session_id() { - let _ = stream::stop_screencast(&browser.client, session_id).await; - } - } - state.screencasting = false; - } - - let (ack_ids, _, _, _) = state.drain_cdp_events(); - if !ack_ids.is_empty() { - if let Some(ref browser) = state.browser { - if let Ok(session_id) = browser.active_session_id() { - for ack_sid in ack_ids { - let _ = - stream::ack_screencast_frame(&browser.client, session_id, ack_sid).await; - } - } - } - } - + state.stop_recording_task().await?; recording::recording_stop(&mut state.recording_state) } diff --git a/cli/src/native/recording.rs b/cli/src/native/recording.rs index e4cbd68d4..cdd51c206 100644 --- a/cli/src/native/recording.rs +++ b/cli/src/native/recording.rs @@ -1,12 +1,24 @@ use serde_json::{json, Value}; -use std::path::PathBuf; -use std::process::Command; +use std::process::Stdio; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; + +use super::cdp::client::CdpClient; +use super::cdp::types::{CaptureScreenshotParams, CaptureScreenshotResult}; + +const CAPTURE_INTERVAL_MS: u64 = 100; +const CAPTURE_FPS: u32 = 10; pub struct RecordingState { pub active: bool, pub output_path: String, - pub temp_dir: PathBuf, pub frame_count: u64, + pub capture_task: Option>>, + pub shared_frame_count: Option>, + pub cancel_tx: Option>, } impl RecordingState { @@ -14,8 +26,10 @@ impl RecordingState { Self { active: false, output_path: String::new(), - temp_dir: PathBuf::new(), frame_count: 0, + capture_task: None, + shared_frame_count: None, + cancel_tx: None, } } } @@ -25,34 +39,13 @@ pub fn recording_start(state: &mut RecordingState, path: &str) -> Result Result { if !state.active { return Err("No recording in progress".to_string()); @@ -61,51 +54,10 @@ pub fn recording_stop(state: &mut RecordingState) -> Result { state.active = false; if state.frame_count == 0 { - let _ = std::fs::remove_dir_all(&state.temp_dir); return Err("No frames captured".to_string()); } - let frame_pattern = state - .temp_dir - .join("frame_%06d.jpg") - .to_string_lossy() - .to_string(); - - let output = &state.output_path; - - let codec_args: &[&str] = if output.ends_with(".webm") { - &["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0"] - } else { - &["-c:v", "libx264", "-preset", "fast"] - }; - - let result = Command::new("ffmpeg") - .args(["-y", "-framerate", "30", "-i", &frame_pattern]) - .args(["-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2"]) - .args(codec_args) - .args(["-pix_fmt", "yuv420p"]) - .arg(output) - .output(); - - let _ = std::fs::remove_dir_all(&state.temp_dir); - - match result { - Ok(output_result) => { - if output_result.status.success() { - Ok(json!({ "path": output, "frames": state.frame_count })) - } else { - let stderr = String::from_utf8_lossy(&output_result.stderr); - Err(format!( - "ffmpeg failed: {}", - stderr.chars().take(200).collect::() - )) - } - } - Err(e) => Err(format!( - "ffmpeg not found or failed to execute: {}. Install ffmpeg to enable recording.", - e - )), - } + Ok(json!({ "path": &state.output_path, "frames": state.frame_count })) } pub fn recording_restart(state: &mut RecordingState, path: &str) -> Result { @@ -127,6 +79,160 @@ pub fn recording_restart(state: &mut RecordingState, path: &str) -> Result tokio::process::Command { + let mut cmd = tokio::process::Command::new("ffmpeg"); + + cmd.args(["-y"]) + .args(["-avioflags", "direct"]) + .args([ + "-fpsprobesize", + "0", + "-probesize", + "32", + "-analyzeduration", + "0", + ]) + .args([ + "-f", + "image2pipe", + "-c:v", + "mjpeg", + "-framerate", + &CAPTURE_FPS.to_string(), + "-i", + "pipe:0", + ]) + .args(["-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2"]); + + if output_path.ends_with(".webm") { + cmd.args(["-c:v", "libvpx", "-crf", "30", "-b:v", "1M"]); + } else { + cmd.args(["-c:v", "libx264", "-preset", "ultrafast"]); + } + + cmd.args(["-pix_fmt", "yuv420p", "-threads", "1"]) + .arg(output_path) + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) + .kill_on_drop(true); + + cmd +} + +/// Spawn a background task that captures screenshots at a fixed interval +/// and pipes them to ffmpeg in real-time. +pub fn spawn_recording_task( + client: Arc, + session_id: String, + output_path: String, + shared_count: Arc, + cancel_rx: oneshot::Receiver<()>, +) -> tokio::task::JoinHandle> { + tokio::spawn(async move { + let mut cancel_rx = std::pin::pin!(cancel_rx); + + let mut ffmpeg = build_ffmpeg_command(&output_path).spawn().map_err(|e| { + format!( + "ffmpeg not found or failed to execute: {}. Install ffmpeg to enable recording.", + e + ) + })?; + + let mut stdin = ffmpeg + .stdin + .take() + .ok_or_else(|| "Failed to open ffmpeg stdin".to_string())?; + + let mut interval = tokio::time::interval(Duration::from_millis(CAPTURE_INTERVAL_MS)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + let params = CaptureScreenshotParams { + format: Some("jpeg".to_string()), + quality: Some(80), + clip: None, + from_surface: Some(true), + capture_beyond_viewport: None, + }; + + loop { + tokio::select! { + _ = &mut cancel_rx => break, + _ = interval.tick() => {} + } + + let result: Result = client + .send_command_typed("Page.captureScreenshot", ¶ms, Some(&session_id)) + .await; + + let screenshot = match result { + Ok(s) => s, + Err(e) => { + if e.contains("Target closed") || e.contains("not found") { + break; + } + continue; + } + }; + + let bytes = match base64::Engine::decode( + &base64::engine::general_purpose::STANDARD, + &screenshot.data, + ) { + Ok(b) => b, + Err(_) => continue, + }; + + if stdin.write_all(&bytes).await.is_err() { + break; + } + shared_count.fetch_add(1, Ordering::Relaxed); + } + + drop(stdin); + + let output = ffmpeg + .wait_with_output() + .await + .map_err(|e| format!("ffmpeg wait failed: {}", e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!( + "ffmpeg failed: {}", + stderr.chars().take(300).collect::() + )); + } + + Ok(()) + }) +} + +pub async fn stop_recording_task(state: &mut RecordingState) -> Result<(), String> { + if let Some(tx) = state.cancel_tx.take() { + let _ = tx.send(()); + } + + let counter = state.shared_frame_count.take(); + let handle = state.capture_task.take(); + + let result = if let Some(h) = handle { + match h.await { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => Err(e), + Err(e) => Err(format!("Recording task panicked: {}", e)), + } + } else { + Ok(()) + }; + + if let Some(c) = counter { + state.frame_count = c.load(Ordering::Relaxed); + } + + result +} + #[cfg(test)] mod tests { use super::*; @@ -147,19 +253,15 @@ mod tests { assert!(state.active); assert_eq!(state.output_path, "/tmp/test.mp4"); assert_eq!(state.frame_count, 0); - // Cleanup - let _ = std::fs::remove_dir_all(&state.temp_dir); } #[test] fn test_recording_start_while_active() { let mut state = RecordingState::new(); recording_start(&mut state, "/tmp/test1.mp4").unwrap(); - let temp_dir = state.temp_dir.clone(); let result = recording_start(&mut state, "/tmp/test2.mp4"); assert!(result.is_err()); assert!(result.unwrap_err().contains("already active")); - let _ = std::fs::remove_dir_all(&temp_dir); } #[test] @@ -181,19 +283,41 @@ mod tests { } #[test] - fn test_recording_add_frame_inactive() { + fn test_recording_restart_while_inactive() { let mut state = RecordingState::new(); - recording_add_frame(&mut state, b"fake-frame"); - assert_eq!(state.frame_count, 0); + let result = recording_restart(&mut state, "/tmp/new.webm"); + assert!(result.is_ok()); + assert!(state.active); + assert_eq!(state.output_path, "/tmp/new.webm"); } #[test] - fn test_recording_add_frame_active() { + fn test_recording_restart_while_active() { let mut state = RecordingState::new(); - recording_start(&mut state, "/tmp/test.mp4").unwrap(); - recording_add_frame(&mut state, b"fake-frame-1"); - recording_add_frame(&mut state, b"fake-frame-2"); - assert_eq!(state.frame_count, 2); - let _ = std::fs::remove_dir_all(&state.temp_dir); + recording_start(&mut state, "/tmp/old.webm").unwrap(); + state.frame_count = 10; + let result = recording_restart(&mut state, "/tmp/new.webm").unwrap(); + assert!(state.active); + assert_eq!(state.output_path, "/tmp/new.webm"); + assert_eq!(state.frame_count, 0); + assert_eq!(result["previousPath"], "/tmp/old.webm"); + } + + #[test] + fn test_build_ffmpeg_command_webm() { + let cmd = build_ffmpeg_command("/tmp/out.webm"); + let args: Vec<&std::ffi::OsStr> = cmd.as_std().get_args().collect(); + let args_str: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect(); + assert!(args_str.contains(&"libvpx")); + assert!(args_str.contains(&"/tmp/out.webm")); + } + + #[test] + fn test_build_ffmpeg_command_mp4() { + let cmd = build_ffmpeg_command("/tmp/out.mp4"); + let args: Vec<&std::ffi::OsStr> = cmd.as_std().get_args().collect(); + let args_str: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect(); + assert!(args_str.contains(&"libx264")); + assert!(args_str.contains(&"/tmp/out.mp4")); } }