-
Notifications
You must be signed in to change notification settings - Fork 31
Description
The current implementation of DatagramsWriter and DatagramsReader in moq-transport/src/serve/datagram.rs uses a "last-write-wins" pattern that causes severe packet loss
when datagrams arrive faster than they can be consumed.
Problem
The DatagramsState struct only stores a single datagram:
struct DatagramsState {
latest: Option, // ← Only ONE datagram!
epoch: u64,
closed: Result<(), ServeError>,
}
When write() is called, it overwrites the previous datagram:
pub fn write(&mut self, datagram: Datagram) -> Result<(), ServeError> {
let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?;
state.latest = Some(datagram); // ← Overwrites previous!
state.epoch += 1;
Ok(())
}
Impact: If 4 datagrams arrive before the reader processes one, 3 are silently lost. In real-world video streaming scenarios (30fps H.264 with multiple RTP packets per
frame), this causes ~75% packet loss.
Reproduction
- Use moq-relay-ietf as a relay
- Publish video using datagrams at 30fps
- Subscribe from a browser player
- Observe that only ~25% of datagrams are received
Solution
Replace the single-slot latest: Option with a bounded FIFO queue (VecDeque).
Key changes:
- Writer: Appends to queue instead of overwriting
- Reader: Tracks absolute position via consumed_count
- Buffer overflow: When queue reaches max size (1024), drop oldest datagram
- Multi-reader support: Each reader has independent position tracking
Proposed Fix
use std::{collections::VecDeque, fmt, sync::Arc};
use crate::watch::State;
use super::{ServeError, Track};
const MAX_DATAGRAM_BUFFER: usize = 1024;
struct DatagramsState {
queue: VecDeque, // FIFO queue instead of single slot
write_count: u64, // Total datagrams written
dropped_count: u64, // Datagrams dropped due to buffer overflow
closed: Result<(), ServeError>,
}
impl Default for DatagramsState {
fn default() -> Self {
Self {
queue: VecDeque::with_capacity(256),
write_count: 0,
dropped_count: 0,
closed: Ok(()),
}
}
}
impl DatagramsWriter {
pub fn write(&mut self, datagram: Datagram) -> Result<(), ServeError> {
let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?;
// Drop oldest if buffer full
if state.queue.len() >= MAX_DATAGRAM_BUFFER {
state.queue.pop_front();
state.dropped_count += 1;
}
state.queue.push_back(datagram);
state.write_count += 1;
Ok(())
}
}
#[derive(Clone)]
pub struct DatagramsReader {
state: State,
pub track: Arc,
consumed_count: u64, // Absolute position (handles multi-reader)
}
impl DatagramsReader {
fn new(state: State, track: Arc) -> Self {
let initial_dropped = {
let state = state.lock();
state.dropped_count
};
Self {
state,
track,
consumed_count: initial_dropped,
}
}
pub async fn read(&mut self) -> Result<Option<Datagram>, ServeError> {
loop {
{
let state = self.state.lock();
// Calculate queue index from absolute position
let queue_index = if self.consumed_count >= state.dropped_count {
(self.consumed_count - state.dropped_count) as usize
} else {
// Reader fell behind, skip to front
self.consumed_count = state.dropped_count;
0
};
if queue_index < state.queue.len() {
let datagram = state.queue.get(queue_index).cloned();
self.consumed_count += 1;
return Ok(datagram);
}
state.closed.clone()?;
match state.modified() {
Some(notify) => notify,
None => return Ok(None),
}
}
.await;
}
}
pub fn latest(&self) -> Option<(u64, u64)> {
let state = self.state.lock();
state.queue.back().map(|d| (d.group_id, d.object_id))
}
}