Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rust/examples/ws_stream_mcap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ fn main() -> Result<()> {
info!("Starting stream");
let mut last_status = PlaybackStatus::Paused;
while !done.load(Ordering::Relaxed) {
{
let mut player = mcap_player.lock().unwrap();
if player.needs_backfill() {
if let Err(err) = player.log_backfill() {
tracing::warn!("backfill failed: {err:?}");
}
}
}

let status = {
let player = mcap_player.lock().unwrap();
let status = player.status();
Expand Down
64 changes: 63 additions & 1 deletion rust/examples/ws_stream_mcap/src/mcap_player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::time::{Duration, Instant};
use anyhow::{anyhow, Context, Result};
use foxglove::websocket::PlaybackStatus;
use foxglove::{ChannelBuilder, PartialMetadata, RawChannel, Schema, WebSocketServerHandle};
use mcap::sans_io::indexed_reader::{IndexedReadEvent, IndexedReader, IndexedReaderOptions};
use mcap::sans_io::indexed_reader::{
IndexedReadEvent, IndexedReader, IndexedReaderOptions, ReadOrder,
};
use mcap::sans_io::summary_reader::{SummaryReadEvent, SummaryReader};
use mcap::Summary;

Expand All @@ -29,6 +31,8 @@ pub struct McapPlayer {
playback_speed: f32,
/// Buffered message that was read but not yet ready to emit.
pending_message: Option<(mcap::records::MessageHeader, Vec<u8>)>,
/// Whether a backfill is needed after a seek.
needs_backfill: bool,
}

impl McapPlayer {
Expand Down Expand Up @@ -69,6 +73,7 @@ impl McapPlayer {
chunk_buffer: Vec::new(),
time_tracker: None,
pending_message: None,
needs_backfill: false,
})
}

Expand Down Expand Up @@ -115,6 +120,62 @@ impl McapPlayer {
}
}
}

/// Returns whether a backfill is needed after a seek.
pub fn needs_backfill(&self) -> bool {
self.needs_backfill
}

/// Logs the most recent message on each channel at or before the current seek time.
///
/// This populates panels with "the state of the world" at the seek position so they
/// don't appear blank until forward playback catches up.
pub fn log_backfill(&mut self) -> Result<()> {
self.needs_backfill = false;

for mcap_channel in self.summary.channels.values() {
let mut reader = IndexedReader::new_with_options(
&self.summary,
IndexedReaderOptions::new()
.with_order(ReadOrder::ReverseLogTime)
.log_time_before(self.current_time.saturating_add(1))
.include_topics([&mcap_channel.topic]),
)
.map_err(|e| anyhow!("failed to create backfill reader: {e}"))?;

loop {
match reader.next_event() {
None => break,
Some(Err(e)) => return Err(anyhow!("backfill reader error: {e}")),
Some(Ok(IndexedReadEvent::ReadChunkRequest { offset, length })) => {
self.file
.seek(SeekFrom::Start(offset))
.context("backfill seek to chunk")?;
self.chunk_buffer.resize(length, 0);
self.file
.read_exact(&mut self.chunk_buffer)
.context("backfill read chunk")?;
reader
.insert_chunk_record_data(offset, &self.chunk_buffer)
.map_err(|e| anyhow!("failed to insert backfill chunk: {e}"))?;
}
Some(Ok(IndexedReadEvent::Message { header, data })) => {
if let Some(channel) = self.channels.get(&header.channel_id) {
channel.log_with_meta(
data,
PartialMetadata {
log_time: Some(header.log_time),
},
);
}
break;
}
}
}
}

Ok(())
}
}

impl PlaybackSource for McapPlayer {
Expand Down Expand Up @@ -168,6 +229,7 @@ impl PlaybackSource for McapPlayer {
if self.status == PlaybackStatus::Ended {
self.status = PlaybackStatus::Paused;
}
self.needs_backfill = true;
Ok(())
}

Expand Down
Loading