Skip to content

Commit 5702791

Browse files
committed
feat(livestream): embed per-frame SEI timestamps for latency debugging
1 parent be4d4e9 commit 5702791

4 files changed

Lines changed: 96 additions & 11 deletions

File tree

camera_hub/src/fmp4.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl TrakTracker {
106106
buf.put_u64(self.fragment_start_time);
107107
});
108108

109-
let mut data_offset_pos: Option<usize> = None;
109+
let data_offset_pos: Option<usize>;
110110

111111
const TRUN_DATA_OFFSET: u32 = 0x000001;
112112
const TRUN_FIRST_SAMPLE_FL: u32 = 0x000004;
@@ -265,7 +265,7 @@ impl<W: AsyncWrite + Unpin, V: CodecParameters, A: CodecParameters> Fmp4Writer<W
265265
}
266266

267267
fn write_video_fragment(&self, buf: &mut BytesMut) -> Result<Option<usize>, Error> {
268-
let mut trun_off: Option<usize> = None;
268+
let trun_off: Option<usize>;
269269
write_box!(buf, b"traf", {
270270
write_box!(buf, b"tfhd", {
271271
buf.put_u32(0x020000); // default-base-is-moof
@@ -278,7 +278,7 @@ impl<W: AsyncWrite + Unpin, V: CodecParameters, A: CodecParameters> Fmp4Writer<W
278278
}
279279

280280
fn write_audio_fragment(&self, buf: &mut BytesMut) -> Result<Option<usize>, Error> {
281-
let mut trun_off: Option<usize> = None;
281+
let trun_off: Option<usize>;
282282
write_box!(buf, b"traf", {
283283
write_box!(buf, b"tfhd", {
284284
buf.put_u32(0x020000); // default-base-is-moof

camera_hub/src/livestream.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::pin::Pin;
1212
use std::sync::mpsc;
1313
use std::sync::mpsc::Sender;
1414
use std::task::{Context, Poll};
15+
use std::time::{SystemTime, UNIX_EPOCH};
1516
use tokio::io::AsyncWrite;
1617

1718
/// Used to determine when to end livestream
@@ -104,12 +105,31 @@ pub fn livestream(
104105
// to prevent a malicious server from reordering the chunks.
105106
let mut data: Vec<u8> = chunk_number.to_be_bytes().to_vec();
106107
data.extend(rx.recv().unwrap());
108+
109+
let start = SystemTime::now();
110+
let since_the_epoch = start
111+
.duration_since(UNIX_EPOCH)
112+
.expect("time should go forward");
113+
debug!("Livestream: Received data for chunk {} at {}", chunk_number, since_the_epoch.as_millis());
114+
107115
let enc_data = mls_client.encrypt(&data)?;
108116

117+
let end = SystemTime::now();
118+
let end_since_epoch = end
119+
.duration_since(UNIX_EPOCH)
120+
.expect("time should go forward");
121+
let diff = end_since_epoch.as_millis() - since_the_epoch.as_millis();
122+
debug!("Livestream: Took {}ms for chunk {} for encryption", diff, chunk_number);
123+
109124
let num_pending_files =
110125
http_client.livestream_upload(&group_name, enc_data, chunk_number)?;
111126
chunk_number += 1;
112127

128+
let end2 = SystemTime::now();
129+
let end_since_epoch2 = end2.duration_since(UNIX_EPOCH).expect("time should go forward");
130+
let diff2 = end_since_epoch2.as_millis() - end_since_epoch.as_millis();
131+
debug!("Livestream: Took {}ms for chunk {} for uploading (curr time = {})", diff2, chunk_number - 1, end_since_epoch2.as_millis());
132+
113133
// The server returns 0 when the app has explicitly ended livestream
114134
if num_pending_files == 0 || num_pending_files > MAX_NUM_PENDING_LIVESTREAM_CHUNKS {
115135
info!("Ending livestream.");

camera_hub/src/raspberry_pi/rpi_camera.rs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! SPDX-License-Identifier: GPL-3.0-or-later
44
5-
use std::time::Instant;
5+
use std::time::{Instant, SystemTime, UNIX_EPOCH};
66
use std::{
77
collections::VecDeque,
88
io,
@@ -22,7 +22,7 @@ use crate::{
2222
traits::{Camera, CodecParameters},
2323
write_box,
2424
};
25-
use anyhow::{Context, Error};
25+
use anyhow::{Error};
2626
use bytes::{BufMut, BytesMut};
2727
use crossbeam_channel::unbounded;
2828
use image::RgbImage;
@@ -51,15 +51,15 @@ pub enum VideoFrameKind {
5151
pub struct VideoFrame {
5252
pub data: Vec<u8>,
5353
pub kind: VideoFrameKind,
54-
pub timestamp: Instant,
54+
pub timestamp: SystemTime,
5555
}
5656

5757
impl VideoFrame {
5858
pub fn new(data: Vec<u8>, kind: VideoFrameKind) -> Self {
5959
Self {
6060
data,
6161
kind,
62-
timestamp: Instant::now(),
62+
timestamp: SystemTime::now(),
6363
}
6464
}
6565
}
@@ -223,11 +223,19 @@ impl RaspberryPiCamera {
223223
let ts = frame_count * ticks_per_frame;
224224

225225
let avcc = Self::annexb_to_avcc_frame(&frame.data, /*strip_aud*/ true, /*strip_ps*/ true);
226+
227+
// Prepend per-frame SEI carrying capture time
228+
let sei = Self::make_sei_unreg_avcc(frame.timestamp.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64);
229+
let mut sample = Vec::with_capacity(sei.len() + avcc.len());
230+
sample.extend_from_slice(&sei);
231+
sample.extend_from_slice(&avcc);
232+
226233
mp4.video(
227-
&avcc,
234+
&sample,
228235
ts,
229236
frame.kind == VideoFrameKind::IFrame,
230237
).await?;
238+
231239
frame_count += 1;
232240
samples_in_fragment += 1;
233241
}
@@ -285,6 +293,63 @@ impl RaspberryPiCamera {
285293
Ok(())
286294
}
287295

296+
297+
fn make_sei_unreg_avcc(epoch_ms: u64) -> Vec<u8> {
298+
const NAL_TYPE_SEI: u8 = 6;
299+
const PAYLOAD_TYPE_UNREG: u8 = 5;
300+
301+
// 16-byte UUID
302+
const UUID: &[u8; 16] = b"SECLUSO_LATENCY_";
303+
304+
// Build raw RBSP (before EPB insertion)
305+
// payload = UUID(16) + timestamp(8, big-endian)
306+
let mut payload = Vec::with_capacity(24);
307+
payload.extend_from_slice(UUID);
308+
payload.extend_from_slice(&epoch_ms.to_be_bytes()); // 8B BE
309+
310+
// payloadType (5) using 255-extension coding
311+
// payloadSize (24) using 255-extension coding
312+
let mut rbsp = Vec::with_capacity(1 + 1 + payload.len() + 1);
313+
rbsp.push(PAYLOAD_TYPE_UNREG);
314+
315+
let mut size = payload.len() as u32; // must be 24
316+
while size >= 255 {
317+
rbsp.push(255);
318+
size -= 255;
319+
}
320+
rbsp.push(size as u8);
321+
322+
rbsp.extend_from_slice(&payload);
323+
324+
// rbsp_trailing_bits: 1 bit '1' then pad with zeros to byte boundary
325+
rbsp.push(0x80);
326+
327+
// Emulation-prevention bytes
328+
// Scan rbsp and after any 0x00 0x00 {00..03} pattern, insert 0x03 before that
329+
let mut rbsp_epb = Vec::with_capacity(rbsp.len() + 8);
330+
let mut zero_run = 0usize;
331+
for &b in &rbsp {
332+
if zero_run >= 2 && b <= 0x03 {
333+
rbsp_epb.push(0x03);
334+
zero_run = 0; // reset after insertion
335+
}
336+
rbsp_epb.push(b);
337+
if b == 0x00 { zero_run += 1 } else { zero_run = 0 }
338+
}
339+
340+
// Assemble the NAL (header + rbsp_epb)
341+
// forbidden_zero_bit=0, nal_ref_idc=0, nal_unit_type=6 (SEI)
342+
let mut nal = Vec::with_capacity(1 + rbsp_epb.len());
343+
nal.push(0x00 | NAL_TYPE_SEI);
344+
nal.extend_from_slice(&rbsp_epb);
345+
346+
// AVCC length-prefixed output
347+
let mut out = Vec::with_capacity(4 + nal.len());
348+
out.extend_from_slice(&(nal.len() as u32).to_be_bytes());
349+
out.extend_from_slice(&nal);
350+
out
351+
}
352+
288353
/// Streams fmp4 video.
289354
async fn write_fmp4(
290355
livestream_writer: LivestreamWriter,

camera_hub/src/raspberry_pi/rpi_dual_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::collections::VecDeque;
77
use std::os::unix::net::UnixStream;
88
use std::sync::{Arc, Mutex};
99
use std::thread::sleep;
10-
use std::time::Instant;
10+
use std::time::{SystemTime};
1111
use std::{
1212
io::{BufReader, Read, Write},
1313
process::{Command, Stdio},
@@ -77,7 +77,7 @@ pub fn start(
7777
Ok(h264_frame2) => {
7878
if let Some(mut frame) = h264_frame2 {
7979
// Update the frame timestamp on extraction.
80-
frame.timestamp = Instant::now();
80+
frame.timestamp = SystemTime::now();
8181

8282
if !sps_sent && frame.kind == VideoFrameKind::Sps {
8383
let _ = ps_tx.send(frame.clone());
@@ -166,7 +166,7 @@ fn add_frame_and_drop_old(frame_queue: Arc<Mutex<VecDeque<VideoFrame>>>, frame:
166166

167167
// Remove frames older than the time window.
168168
while let Some(front) = queue.front() {
169-
if Instant::now().duration_since(front.timestamp) > time_window {
169+
if SystemTime::now().duration_since(front.timestamp).unwrap() > time_window {
170170
queue.pop_front();
171171
} else {
172172
break;

0 commit comments

Comments
 (0)