diff --git a/Cargo.lock b/Cargo.lock index fb4ac0df9..e19042caa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -865,7 +865,6 @@ name = "example_ws_stream_mcap" version = "0.0.0" dependencies = [ "anyhow", - "bytes", "clap", "ctrlc", "env_logger", diff --git a/rust/examples/ws_stream_mcap/Cargo.toml b/rust/examples/ws_stream_mcap/Cargo.toml index 861fa1394..c1e24c8f7 100644 --- a/rust/examples/ws_stream_mcap/Cargo.toml +++ b/rust/examples/ws_stream_mcap/Cargo.toml @@ -8,7 +8,6 @@ workspace = true [dependencies] anyhow = "1.0" -bytes = "1.9" clap = { version = "4.5", features = ["derive"] } ctrlc = "3.4" env_logger = "0.11" diff --git a/rust/examples/ws_stream_mcap/src/main.rs b/rust/examples/ws_stream_mcap/src/main.rs index 9aaab20a3..ec7476eb4 100644 --- a/rust/examples/ws_stream_mcap/src/main.rs +++ b/rust/examples/ws_stream_mcap/src/main.rs @@ -1,26 +1,80 @@ //! Streams an mcap file over a websocket. -use std::borrow::Cow; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::fs::File; -use std::io::{BufReader, Read, Seek, SeekFrom}; -use std::path::{Path, PathBuf}; +mod mcap_player; +mod playback_source; + +use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use mcap_player::McapPlayer; +use playback_source::PlaybackSource; -use anyhow::{anyhow, Context, Result}; -use bytes::Buf; +use anyhow::Result; use clap::Parser; -use foxglove::websocket::Capability; -use foxglove::{ - ChannelBuilder, PartialMetadata, RawChannel, Schema, WebSocketServer, WebSocketServerHandle, +use foxglove::websocket::{ + Capability, PlaybackCommand, PlaybackControlRequest, PlaybackState, PlaybackStatus, + ServerListener, }; -use mcap::records::{MessageHeader, Record, SchemaHeader}; -use mcap::sans_io::linear_reader::{LinearReadEvent, LinearReader, LinearReaderOptions}; +use foxglove::WebSocketServer; use tracing::info; +struct Listener { + player: Arc>, +} + +impl Listener { + fn new(player: Arc>) -> Self { + Self { player } + } +} + +/// Implement RangedPlayback-specific listener logic for responding to PlaybackControlRequests +impl ServerListener for Listener { + /// Respond to a PlaybackControlRequest from Foxglove and send an updated PlaybackState. + /// First we process the fields in the request (seeking, updating the playback speed, and + /// handling play/pause PlaybackCommands by calling functions on our MCAP-specific PlaybackSource. + /// Then we query the PlaybackSource to fill out the PlaybackState message sent in response to + /// update Foxglove's UI. + /// + /// The intent of PlaybackSource is to let you implement the trait with your own data + /// format, then reuse the structure of this function in your own player application. + fn on_playback_control_request( + &self, + request: PlaybackControlRequest, + ) -> Option { + let mut player = self.player.lock().unwrap(); + + // Handle seek first, before play/pause. This is important for looping behavior, + // where Foxglove sends a seek to the beginning followed by a Play command. + // Setting this flag to true clears panels in the Foxglove player. For simplicity, we set this every time a seek is requested from Foxglove. In your application, consider implementing logic that determines whether a seek represents a significant jump in time for the data you're playing back. + let mut did_seek = request.seek_time.is_some(); + + if let Some(seek_time) = request.seek_time { + if let Err(err) = player.seek(seek_time) { + did_seek = false; + tracing::warn!("failed to seek: {err:?}"); + } + } + + player.set_playback_speed(request.playback_speed); + + match request.playback_command { + PlaybackCommand::Play => player.play(), + PlaybackCommand::Pause => player.pause(), + }; + + Some(PlaybackState { + current_time: player.current_time(), + playback_speed: player.playback_speed(), + status: player.status(), + did_seek, + request_id: Some(request.request_id), + }) + } +} + #[derive(Debug, Parser)] struct Cli { /// Server TCP port. @@ -32,13 +86,10 @@ struct Cli { /// MCAP file to read. #[arg(short, long)] file: PathBuf, - /// Whether to loop. - #[arg(long)] - r#loop: bool, } fn main() -> Result<()> { - let env = env_logger::Env::default().default_filter_or("debug"); + let env = env_logger::Env::default().default_filter_or("info"); env_logger::init_from_env(env); let args = Cli::parse(); @@ -57,256 +108,61 @@ fn main() -> Result<()> { }) .expect("Failed to set SIGINT handler"); + info!("Loading mcap summary"); + + let mcap_player = McapPlayer::new(&args.file)?; + let (start_time, end_time) = mcap_player.time_range(); + + let mcap_player = Arc::new(Mutex::new(mcap_player)); + let listener = Arc::new(Listener::new(mcap_player.clone())); + let server = WebSocketServer::new() .name(file_name) - .capabilities([Capability::Time]) + .capabilities([Capability::RangedPlayback, Capability::Time]) + .playback_time_range(start_time, end_time) + .listener(listener) .bind(&args.host, args.port) .start_blocking() .expect("Server failed to start"); - info!("Loading mcap summary"); - let summary = Summary::load_from_mcap(&args.file)?; - info!("Waiting for client"); std::thread::sleep(Duration::from_secs(1)); info!("Starting stream"); + let mut last_status = PlaybackStatus::Paused; while !done.load(Ordering::Relaxed) { - summary.file_stream().stream_until(&server, &done)?; - if !args.r#loop { - done.store(true, Ordering::Relaxed); - } else { - info!("Looping"); - server.clear_session(None); - } - } - - server.stop().wait_blocking(); - Ok(()) -} - -/// Helper function to advance the mcap reader. -fn advance_reader( - reader: &mut LinearReader, - file: &mut R, - mut handle_record: F, -) -> Result -where - R: Read + Seek, - F: FnMut(Record<'_>) -> Result<()>, -{ - if let Some(event) = reader.next_event() { - match event? { - LinearReadEvent::ReadRequest(count) => { - let count = file.read(reader.insert(count))?; - reader.notify_read(count); - } - LinearReadEvent::Record { data, opcode } => { - let record = mcap::parse_record(opcode, data)?; - handle_record(record)?; + let status = { + let player = mcap_player.lock().unwrap(); + let status = player.status(); + + // Broadcast state change when playback ends + if status == PlaybackStatus::Ended && last_status != PlaybackStatus::Ended { + server.broadcast_playback_state(PlaybackState { + current_time: player.current_time(), + playback_speed: player.playback_speed(), + status, + did_seek: false, + request_id: None, + }); } - } - Ok(true) - } else { - Ok(false) - } -} - -#[derive(Default)] -struct Summary { - path: PathBuf, - schemas: HashMap, - channels: HashMap>, -} - -impl Summary { - fn load_from_mcap(path: &Path) -> Result { - let mut file = BufReader::new(File::open(path)?); - - // Read the last 28 bytes of the file to validate the trailing magic (8 bytes) and obtain - // the summary start value, which is the first u64 in the footer record (20 bytes). - let mut buf = Vec::with_capacity(28); - file.seek(SeekFrom::End(-28)).context("seek footer")?; - file.read_to_end(&mut buf).context("read footer")?; - if !buf.ends_with(mcap::MAGIC) { - return Err(anyhow!("bad footer magic")); - } - - // Seek to summary section. - let summary_start = buf.as_slice().get_u64_le(); - if summary_start == 0 { - return Err(anyhow!("missing summary section")); - } - file.seek(SeekFrom::Start(summary_start)) - .context("seek summary")?; - let mut reader = LinearReader::new_with_options(LinearReaderOptions { - skip_start_magic: true, - ..Default::default() - }); - - let mut summary = Summary { - path: path.to_owned(), - schemas: HashMap::new(), - channels: HashMap::new(), + status }; - while advance_reader(&mut reader, &mut file, |rec| summary.handle_record(rec)) - .context("read summary")? - {} - - Ok(summary) - } - - /// Creates a new file stream. - fn file_stream(&self) -> FileStream<'_> { - FileStream::new(&self.path, &self.channels) - } - - // Handles a record from the summary section. - fn handle_record(&mut self, record: Record<'_>) -> Result<()> { - match record { - Record::Schema { header, data } => self.handle_schema(&header, data), - Record::Channel(channel) => self.handle_channel(channel), - _ => Ok(()), - } - } - - /// Caches schema information. - fn handle_schema( - &mut self, - header: &SchemaHeader, - data: Cow<'_, [u8]>, - ) -> Result<(), anyhow::Error> { - if header.id == 0 { - return Err(anyhow!("invalid schema id"))?; - } - if let Entry::Vacant(entry) = self.schemas.entry(header.id) { - let schema = Schema::new(&header.name, &header.encoding, data.into_owned()); - entry.insert(schema); - } - Ok(()) - } - - /// Registers a new channel. - fn handle_channel(&mut self, record: mcap::records::Channel) -> Result<(), anyhow::Error> { - if let Entry::Vacant(entry) = self.channels.entry(record.id) { - let schema = self.schemas.get(&record.schema_id).cloned(); - let channel = ChannelBuilder::new(record.topic) - .message_encoding(&record.message_encoding) - .schema(schema) - .build_raw()?; - entry.insert(channel); - } - Ok(()) - } -} - -struct FileStream<'a> { - path: PathBuf, - channels: &'a HashMap>, - time_tracker: Option, -} - -impl<'a> FileStream<'a> { - /// Creates a new file stream. - fn new(path: &Path, channels: &'a HashMap>) -> Self { - Self { - path: path.to_owned(), - channels, - time_tracker: None, - } - } - - /// Streams the file content until `done` is set. - fn stream_until( - mut self, - server: &WebSocketServerHandle, - done: &Arc, - ) -> Result<()> { - let mut file = BufReader::new(File::open(&self.path)?); - let mut reader = LinearReader::new(); - while !done.load(Ordering::Relaxed) - && advance_reader(&mut reader, &mut file, |rec| { - self.handle_record(server, rec); - Ok(()) - }) - .context("read data")? - {} - Ok(()) - } - - /// Handles an mcap record parsed from the file. - fn handle_record(&mut self, server: &WebSocketServerHandle, record: Record<'_>) { - if let Record::Message { header, data } = record { - self.handle_message(server, header, &data); - } - } - - /// Streams the message data to the server. - fn handle_message( - &mut self, - server: &WebSocketServerHandle, - header: MessageHeader, - data: &[u8], - ) { - let tt = self - .time_tracker - .get_or_insert_with(|| TimeTracker::start(header.log_time)); - - tt.sleep_until(header.log_time); - - if let Some(timestamp) = tt.notify() { - server.broadcast_time(timestamp); - } + last_status = status; - if let Some(channel) = self.channels.get(&header.channel_id) { - channel.log_with_meta( - data, - PartialMetadata { - log_time: Some(header.log_time), - }, - ); + if status != PlaybackStatus::Playing { + std::thread::sleep(Duration::from_millis(10)); + continue; } - } -} - -/// Helper for keep tracking of the relationship between a file timestamp and the wallclock. -struct TimeTracker { - start: Instant, - offset_ns: u64, - now_ns: u64, - notify_interval_ns: u64, - notify_last: u64, -} -impl TimeTracker { - /// Initializes a new time tracker, treating "now" as the specified offset from epoch. - fn start(offset_ns: u64) -> Self { - Self { - start: Instant::now(), - offset_ns, - now_ns: offset_ns, - notify_interval_ns: 1_000_000_000 / 60, - notify_last: 0, - } - } - /// Sleeps until the specified offset. - fn sleep_until(&mut self, offset_ns: u64) { - let abs = Duration::from_nanos(offset_ns.saturating_sub(self.offset_ns)); - let delta = abs.saturating_sub(self.start.elapsed()); - if delta >= Duration::from_micros(1) { - std::thread::sleep(delta); + // Log next message, sleeping outside the lock if needed + let sleep_duration = mcap_player.lock().unwrap().log_next_message(&server)?; + if let Some(duration) = sleep_duration { + // Upper-bound sleep time to avoid the player from becoming unresponsive for too long + std::thread::sleep(std::cmp::min(duration, Duration::from_secs(1))); } - self.now_ns = offset_ns; } - /// Periodically returns a timestamp reference to broadcast to clients. - fn notify(&mut self) -> Option { - if self.now_ns.saturating_sub(self.notify_last) >= self.notify_interval_ns { - self.notify_last = self.now_ns; - Some(self.now_ns) - } else { - None - } - } + server.stop().wait_blocking(); + Ok(()) } diff --git a/rust/examples/ws_stream_mcap/src/mcap_player.rs b/rust/examples/ws_stream_mcap/src/mcap_player.rs new file mode 100644 index 000000000..6e15a8706 --- /dev/null +++ b/rust/examples/ws_stream_mcap/src/mcap_player.rs @@ -0,0 +1,372 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::path::Path; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use anyhow::{anyhow, Context, Result}; +use foxglove::websocket::PlaybackStatus; +use foxglove::{ChannelBuilder, PartialMetadata, RawChannel, Schema, WebSocketServerHandle}; +use mcap::sans_io::indexed_reader::{IndexedReadEvent, IndexedReader, IndexedReaderOptions}; +use mcap::sans_io::summary_reader::{SummaryReadEvent, SummaryReader}; +use mcap::Summary; + +use crate::playback_source::{Nanoseconds, PlaybackSource}; + +const MIN_PLAYBACK_SPEED: f32 = 0.01; + +pub struct McapPlayer { + summary: Summary, + channels: HashMap>, + reader: IndexedReader, + file: BufReader, + chunk_buffer: Vec, + time_tracker: Option, + time_range: (Nanoseconds, Nanoseconds), + status: PlaybackStatus, + current_time: Nanoseconds, + playback_speed: f32, + /// Buffered message that was read but not yet ready to emit. + pending_message: Option<(mcap::records::MessageHeader, Vec)>, +} + +/// An implementation of the `PlaybackSource` trait for the MCAP file format. Handles playback +/// operations (play/pause, seeking, adjusting playback speed), time tracking, and logging messages +/// when called from a playback loop. +impl McapPlayer { + /// Creates a new MCAP player. + pub(crate) fn new(path: &Path) -> Result { + let mut file = BufReader::new(File::open(path)?); + + // Read the summary using SummaryReader + let summary = load_summary(&mut file)?.ok_or_else(|| anyhow!("missing summary section"))?; + + let stats = summary + .stats + .as_ref() + .ok_or_else(|| anyhow!("MCAP summary section missing stats record"))?; + + let time_range = (stats.message_start_time, stats.message_end_time); + let current_time = stats.message_start_time; + + // Create foxglove channels from the summary + let channels = create_channels(&summary)?; + + // Create the indexed reader + let reader = IndexedReader::new_with_options( + &summary, + IndexedReaderOptions::new().log_time_on_or_after(current_time), + ) + .map_err(|e| anyhow!("failed to create indexed reader: {e}"))?; + + Ok(Self { + time_range, + current_time, + status: PlaybackStatus::Paused, + playback_speed: 1.0, + summary, + channels, + reader, + file, + chunk_buffer: Vec::new(), + time_tracker: None, + pending_message: None, + }) + } + + /// Re-creates the indexed reader starting from the given time. + fn reset_reader(&mut self, start_time: Nanoseconds) -> Result<()> { + self.reader = IndexedReader::new_with_options( + &self.summary, + IndexedReaderOptions::new().log_time_on_or_after(start_time), + ) + .map_err(|e| anyhow!("failed to create indexed reader: {e}"))?; + self.current_time = start_time; + self.time_tracker = None; + self.pending_message = None; + Ok(()) + } + + /// Processes reader events until a message is available or EOF. + /// Returns the next message header and data, or None if no more messages. + fn next_message(&mut self) -> Result)>> { + // Return buffered message first if available + if let Some(msg) = self.pending_message.take() { + return Ok(Some(msg)); + } + + loop { + match self.reader.next_event() { + None => return Ok(None), + Some(Err(e)) => return Err(anyhow!("indexed reader error: {e}")), + Some(Ok(IndexedReadEvent::ReadChunkRequest { offset, length })) => { + self.file + .seek(SeekFrom::Start(offset)) + .context("seek to chunk")?; + self.chunk_buffer.resize(length, 0); + self.file + .read_exact(&mut self.chunk_buffer) + .context("read chunk")?; + self.reader + .insert_chunk_record_data(offset, &self.chunk_buffer) + .map_err(|e| anyhow!("failed to insert chunk: {e}"))?; + } + Some(Ok(IndexedReadEvent::Message { header, data })) => { + return Ok(Some((header, data.to_vec()))); + } + } + } + } +} + +impl PlaybackSource for McapPlayer { + fn time_range(&self) -> (Nanoseconds, Nanoseconds) { + self.time_range + } + + fn status(&self) -> PlaybackStatus { + self.status + } + + fn current_time(&self) -> Nanoseconds { + self.current_time + } + + fn playback_speed(&self) -> f32 { + self.playback_speed + } + + fn set_playback_speed(&mut self, speed: f32) { + let speed = TimeTracker::clamp_speed(speed); + if let Some(tt) = &mut self.time_tracker { + tt.set_speed(speed); + } + self.playback_speed = speed; + } + + fn play(&mut self) { + // Don't transition to Playing if playback has ended. + // To restart playback, the caller must seek first. + if self.status == PlaybackStatus::Ended { + return; + } + if let Some(tt) = &mut self.time_tracker { + tt.resume(); + } + self.status = PlaybackStatus::Playing; + } + + fn pause(&mut self) { + // Don't transition from Ended state. Once playback has ended, + // the caller must seek to a new position to resume. + if self.status == PlaybackStatus::Ended { + return; + } + if let Some(tt) = &mut self.time_tracker { + tt.pause(); + } + self.status = PlaybackStatus::Paused; + } + + fn seek(&mut self, log_time: Nanoseconds) -> Result<()> { + let log_time = log_time.clamp(self.time_range.0, self.time_range.1); + self.reset_reader(log_time)?; + // If playback had ended, reset to Paused so play() can transition to Playing + if self.status == PlaybackStatus::Ended { + self.status = PlaybackStatus::Paused; + } + Ok(()) + } + + fn log_next_message(&mut self, server: &WebSocketServerHandle) -> Result> { + if self.status != PlaybackStatus::Playing { + return Ok(None); + } + + // Get the next message to log + let Some((header, data)) = self.next_message()? else { + // No more messages, playback has ended + self.status = PlaybackStatus::Ended; + self.current_time = self.time_range.1; + return Ok(None); + }; + + let tt = self + .time_tracker + .get_or_insert_with(|| TimeTracker::start(header.log_time, self.playback_speed)); + + // Check if we need to wait before emitting this message + let wakeup = tt.wakeup_for(header.log_time); + if let Some(sleep_duration) = wakeup.checked_duration_since(Instant::now()) { + // Buffer the message and return the sleep duration + self.pending_message = Some((header, data)); + return Ok(Some(sleep_duration)); + } + + // Update current time + self.current_time = header.log_time; + + // Broadcast time update periodically + if let Some(timestamp) = tt.notify(header.log_time) { + server.broadcast_time(timestamp); + } + + // Log the message to the appropriate channel + if let Some(channel) = self.channels.get(&header.channel_id) { + channel.log_with_meta( + &data, + PartialMetadata { + log_time: Some(header.log_time), + }, + ); + } + + Ok(None) + } +} + +/// Helper for keeping track of the relationship between a file timestamp and the wallclock. +struct TimeTracker { + /// Wall-clock time when playback started/resumed + start: Instant, + /// Log time corresponding to the start instant + offset_ns: Nanoseconds, + /// Current playback speed multiplier + speed: f32, + /// Whether playback is paused + paused: bool, + /// Elapsed log time when paused + paused_elapsed_ns: Nanoseconds, + /// Interval for time broadcast notifications + notify_interval_ns: Nanoseconds, + /// Last log time that was broadcast + notify_last: Nanoseconds, +} + +impl TimeTracker { + /// Initializes a new time tracker, treating "now" as the specified log time. + fn start(offset_ns: Nanoseconds, speed: f32) -> Self { + let speed = Self::clamp_speed(speed); + Self { + start: Instant::now(), + offset_ns, + speed, + paused: false, + paused_elapsed_ns: 0, + notify_interval_ns: 1_000_000_000 / 60, + notify_last: 0, + } + } + + /// Returns the current playback log time based on elapsed wall time and speed. + fn current_log_time(&self) -> Nanoseconds { + if self.paused { + self.offset_ns + self.paused_elapsed_ns + } else { + let elapsed_wall = self.start.elapsed(); + let elapsed_log_ns = (elapsed_wall.as_nanos() as f64 * self.speed as f64) as u64; + self.offset_ns + self.paused_elapsed_ns + elapsed_log_ns + } + } + + /// Returns the wall-clock instant when the given log_time will be ready. + fn wakeup_for(&self, log_time: Nanoseconds) -> Instant { + let current = self.current_log_time(); + if log_time <= current { + return Instant::now(); + } + let log_diff_ns = log_time - current; + let wall_diff_ns = if self.speed > 0.0 { + (log_diff_ns as f64 / self.speed as f64) as u64 + } else { + // If speed is 0, we'll never reach the time; return a long delay + 1_000_000_000 // 1 second + }; + Instant::now() + Duration::from_nanos(wall_diff_ns) + } + + /// Pauses playback, recording the current elapsed time. + fn pause(&mut self) { + if !self.paused { + let elapsed_wall = self.start.elapsed(); + let elapsed_log_ns = (elapsed_wall.as_nanos() as f64 * self.speed as f64) as u64; + self.paused_elapsed_ns += elapsed_log_ns; + self.paused = true; + } + } + + /// Resumes playback from the paused position. + fn resume(&mut self) { + if self.paused { + self.start = Instant::now(); + self.paused = false; + } + } + + /// Updates the playback speed. + fn set_speed(&mut self, speed: f32) { + let speed = Self::clamp_speed(speed); + if !self.paused { + // Accumulate elapsed time at the old speed before changing + let elapsed_wall = self.start.elapsed(); + let elapsed_log_ns = (elapsed_wall.as_nanos() as f64 * self.speed as f64) as u64; + self.paused_elapsed_ns += elapsed_log_ns; + self.start = Instant::now(); + } + self.speed = speed; + } + + fn clamp_speed(speed: f32) -> f32 { + if speed.is_finite() && speed >= MIN_PLAYBACK_SPEED { + speed + } else { + MIN_PLAYBACK_SPEED + } + } + + /// Periodically returns a timestamp reference to broadcast to clients. + fn notify(&mut self, current_ns: Nanoseconds) -> Option { + if current_ns.saturating_sub(self.notify_last) >= self.notify_interval_ns { + self.notify_last = current_ns; + Some(current_ns) + } else { + None + } + } +} + +/// Loads the MCAP summary using the sans-io SummaryReader. +fn load_summary(file: &mut R) -> Result> { + let mut reader = SummaryReader::new(); + while let Some(event) = reader.next_event() { + match event.map_err(|e| anyhow!("summary read error: {e}"))? { + SummaryReadEvent::ReadRequest(n) => { + let read = file.read(reader.insert(n)).context("read summary")?; + reader.notify_read(read); + } + SummaryReadEvent::SeekRequest(pos) => { + let pos = file.seek(pos).context("seek summary")?; + reader.notify_seeked(pos); + } + } + } + Ok(reader.finish()) +} + +/// Creates foxglove channels from the MCAP summary. +fn create_channels(summary: &Summary) -> Result>> { + let mut channels = HashMap::new(); + for (&id, mcap_channel) in &summary.channels { + let schema = mcap_channel + .schema + .as_ref() + .map(|s| Schema::new(s.name.as_str(), s.encoding.as_str(), s.data.to_vec())); + let channel = ChannelBuilder::new(&mcap_channel.topic) + .message_encoding(&mcap_channel.message_encoding) + .schema(schema) + .build_raw()?; + channels.insert(id, channel); + } + Ok(channels) +} diff --git a/rust/examples/ws_stream_mcap/src/playback_source.rs b/rust/examples/ws_stream_mcap/src/playback_source.rs new file mode 100644 index 000000000..f03bbcf36 --- /dev/null +++ b/rust/examples/ws_stream_mcap/src/playback_source.rs @@ -0,0 +1,66 @@ +use std::time::Duration; + +use anyhow::Result; +use foxglove::{websocket::PlaybackStatus, WebSocketServerHandle}; + +/// A timestamp in nanoseconds since epoch. +pub type Nanoseconds = u64; + +/// A data source that supports ranged playback with play/pause, seek, and variable speed. +/// +/// Implementations are responsible for: +/// - Tracking playback state (playing/paused/ended) and current position +/// - Pacing message delivery according to timestamps and playback speed +/// - Logging messages to channels and broadcasting time updates to the server +pub trait PlaybackSource { + /// Returns the (start, end) time bounds of the data. + /// + /// Determining this is dependent on the format of data you are loading. + fn time_range(&self) -> (Nanoseconds, Nanoseconds); + + /// Sets the playback speed multiplier (e.g., 1.0 for real-time, 2.0 for double speed). + /// + /// Called by a ServerListener when it receives a PlaybackControlRequest from Foxglove + fn set_playback_speed(&mut self, speed: f32); + + /// Begins or resumes playback. + /// + /// Called by a ServerListener when it receives a PlaybackControlRequest from Foxglove + fn play(&mut self); + + /// Pauses playback. + /// + /// Called by a ServerListener when it receives a PlaybackControlRequest from Foxglove + fn pause(&mut self); + + /// Seeks to the specified timestamp. + /// + /// Called by a ServerListener when it receives a PlaybackControlRequest from Foxglove + fn seek(&mut self, log_time: Nanoseconds) -> Result<()>; + + /// Returns the current playback status. + /// + /// Used to send a PlaybackState to Foxglove + fn status(&self) -> PlaybackStatus; + + /// Returns the current playback position. + /// + /// Used to send a PlaybackState to Foxglove + fn current_time(&self) -> Nanoseconds; + + /// Returns the current playback speed multiplier. + /// + /// Used to send a PlaybackState to Foxglove + fn playback_speed(&self) -> f32; + + /// Logs the next message for playback if it's ready, or returns a wall duration to wait + /// (accounting for the current playback speed). This should be called by your main playback + /// loop. + /// + /// Returns `Ok(Some(duration))` if the caller should sleep before calling again. + /// Returns `Ok(None)` if a message was logged or playback is not active. + /// + /// The caller should sleep outside of any lock to allow control requests to be processed. + /// This method also broadcasts time updates via `server.broadcast_time()`. + fn log_next_message(&mut self, server: &WebSocketServerHandle) -> Result>; +}