Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 54 additions & 17 deletions moq-transport/src/serve/datagram.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{fmt, sync::Arc};
use std::{collections::VecDeque, fmt, sync::Arc};

use crate::watch::State;

use super::{ServeError, Track};

/// Maximum number of datagrams to buffer before dropping old ones.
/// This prevents unbounded memory growth while allowing burst handling.
const MAX_DATAGRAM_BUFFER: usize = 1024;

pub struct Datagrams {
pub track: Arc<Track>,
}
Expand All @@ -20,11 +24,14 @@ impl Datagrams {
}

struct DatagramsState {
// The latest datagram
latest: Option<Datagram>,
// Queue of pending datagrams (FIFO)
queue: VecDeque<Datagram>,

// Global write counter - incremented each time a datagram is added
write_count: u64,

// Increased each time datagram changes.
epoch: u64,
// Number of datagrams dropped from front due to buffer overflow
dropped_count: u64,

// Set when the writer or all readers are dropped.
closed: Result<(), ServeError>,
Expand All @@ -33,8 +40,9 @@ struct DatagramsState {
impl Default for DatagramsState {
fn default() -> Self {
Self {
latest: None,
epoch: 0,
queue: VecDeque::with_capacity(256),
write_count: 0,
dropped_count: 0,
closed: Ok(()),
}
}
Expand All @@ -53,8 +61,14 @@ impl DatagramsWriter {
pub fn write(&mut self, datagram: Datagram) -> Result<(), ServeError> {
let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?;

state.latest = Some(datagram);
state.epoch += 1;
// If queue is full, drop oldest datagram to make room
if state.queue.len() >= MAX_DATAGRAM_BUFFER {
state.queue.pop_front();
state.dropped_count += 1;
}

state.queue.push_back(datagram);
state.write_count += 1;

Ok(())
}
Expand All @@ -75,25 +89,48 @@ pub struct DatagramsReader {
state: State<DatagramsState>,
pub track: Arc<Track>,

epoch: u64,
// Track how many datagrams this reader has consumed (absolute count)
// This allows us to calculate our position in the queue
consumed_count: u64,
}

impl DatagramsReader {
fn new(state: State<DatagramsState>, track: Arc<Track>) -> Self {
// Initialize consumed_count to current dropped_count so we start from current position
let initial_dropped = {
let state = state.lock();
state.dropped_count
};

Self {
state,
track,
epoch: 0,
consumed_count: initial_dropped,
}
}

pub async fn read(&mut self) -> Result<Option<Datagram>, ServeError> {
loop {
{
let state = self.state.lock();
if self.epoch < state.epoch {
self.epoch = state.epoch;
return Ok(state.latest.clone());

// Calculate our index in the current queue
// queue index = consumed_count - dropped_count
// If consumed_count < dropped_count, we missed some datagrams (they were dropped)
let queue_index = if self.consumed_count >= state.dropped_count {
(self.consumed_count - state.dropped_count) as usize
} else {
// We're behind - some datagrams were dropped that we haven't seen
// Skip to the beginning of current queue
self.consumed_count = state.dropped_count;
0
};

// Check if there's a datagram we haven't read yet
if queue_index < state.queue.len() {
let datagram = state.queue.get(queue_index).cloned();
self.consumed_count += 1;
return Ok(datagram);
}

state.closed.clone()?;
Expand All @@ -106,12 +143,12 @@ impl DatagramsReader {
}
}

// Returns the largest group/sequence
// Returns the largest group/sequence from the most recent datagram
pub fn latest(&self) -> Option<(u64, u64)> {
let state = self.state.lock();
state
.latest
.as_ref()
.queue
.back()
.map(|datagram| (datagram.group_id, datagram.object_id))
}
}
Expand Down
80 changes: 60 additions & 20 deletions moq-transport/src/session/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,50 @@ impl Subscriber {

/// Handle the reception of a SubscribeOk message from the publisher.
fn recv_subscribe_ok(&mut self, msg: &message::SubscribeOk) -> Result<(), SessionError> {
if let Some(subscribe) = self.subscribes.lock().unwrap().get_mut(&msg.id) {
let subscribes = self.subscribes.lock().unwrap();
let subscribe_ids: Vec<u64> = subscribes.keys().cloned().collect();
log::debug!(
"[SUBSCRIBER] recv_subscribe_ok: id={}, track_alias={}, existing_subscribe_ids={:?}",
msg.id,
msg.track_alias,
subscribe_ids
);

if let Some(subscribe) = subscribes.get(&msg.id) {
// Map track alias to subscription id for quick lookup when receiving streams/datagrams
log::debug!(
"[SUBSCRIBER] recv_subscribe_ok: MAPPING track_alias={} -> subscribe_id={}",
msg.track_alias,
msg.id
);
drop(subscribes); // Release the lock before acquiring another

self.subscribe_alias_map
.lock()
.unwrap()
.insert(msg.track_alias, msg.id);

// Log current alias map state
let alias_map = self.subscribe_alias_map.lock().unwrap();
log::debug!(
"[SUBSCRIBER] recv_subscribe_ok: alias_map now contains: {:?}",
alias_map.iter().collect::<Vec<_>>()
);
drop(alias_map);

// Notify waiting tasks that the alias map has been updated
self.subscribe_alias_notify.notify_waiters();

// Notify the subscribe of the successful subscription
subscribe.ok(msg.track_alias)?;
let mut subscribes = self.subscribes.lock().unwrap();
if let Some(subscribe) = subscribes.get_mut(&msg.id) {
subscribe.ok(msg.track_alias)?;
}
} else {
log::warn!(
"[SUBSCRIBER] recv_subscribe_ok: subscribe_id={} NOT FOUND in subscribes map!",
msg.id
);
}

Ok(())
Expand Down Expand Up @@ -487,7 +519,7 @@ impl Subscriber {

// Check for Immutable Extensions (type 0xB = 11)
if object.extension_headers.has(0xB) {
log::info!(
log::debug!(
"[SUBSCRIBER] recv_subgroup: object #{} contains IMMUTABLE EXTENSIONS (type 0xB) - will be forwarded",
object_count + 1
);
Expand All @@ -501,7 +533,7 @@ impl Subscriber {

// Check for Prior Group ID Gap (type 0x3C = 60)
if object.extension_headers.has(0x3C) {
log::info!(
log::debug!(
"[SUBSCRIBER] recv_subgroup: object #{} contains PRIOR GROUP ID GAP (type 0x3C)",
object_count + 1
);
Expand Down Expand Up @@ -624,7 +656,7 @@ impl Subscriber {
object_count += 1;
}

log::info!(
log::debug!(
"[SUBSCRIBER] recv_subgroup: completed subgroup (group_id={}, subgroup_id={}, {} objects received)",
subgroup_writer.info.group_id,
subgroup_writer.info.subgroup_id,
Expand Down Expand Up @@ -659,7 +691,7 @@ impl Subscriber {

// Check for Immutable Extensions (type 0xB = 11)
if ext_headers.has(0xB) {
log::info!(
log::debug!(
"[SUBSCRIBER] recv_datagram: datagram contains IMMUTABLE EXTENSIONS (type 0xB)"
);
if let Some(immutable_ext) = ext_headers.get(0xB) {
Expand All @@ -672,7 +704,7 @@ impl Subscriber {

// Check for Prior Group ID Gap (type 0x3C = 60)
if ext_headers.has(0x3C) {
log::info!(
log::debug!(
"[SUBSCRIBER] recv_datagram: datagram contains PRIOR GROUP ID GAP (type 0x3C)"
);
if let Some(gap_ext) = ext_headers.get(0x3C) {
Expand All @@ -684,32 +716,40 @@ impl Subscriber {
}
}

// Log current alias map state before lookup
{
let alias_map = self.subscribe_alias_map.lock().unwrap();
log::debug!(
"[SUBSCRIBER] recv_datagram: looking up track_alias={}, current alias_map={:?}",
datagram.track_alias,
alias_map.iter().collect::<Vec<_>>()
);
}

// Look up the subscribe id for this track alias
if let Some(subscribe_id) = self
.get_subscribe_id_by_alias(datagram.track_alias, Some(DEFAULT_ALIAS_WAIT_TIME_MS))
.await
{
// Look up the subscribe by id
if let Some(subscribe) = self.subscribes.lock().unwrap().get_mut(&subscribe_id) {
log::trace!(
"[SUBSCRIBER] recv_datagram: track_alias={}, group_id={}, object_id={}, publisher_priority={}, status={}, payload_length={}",
log::debug!(
"[SUBSCRIBER] recv_datagram: FOUND track_alias={} -> subscribe_id={}, forwarding datagram",
datagram.track_alias,
datagram.group_id,
datagram.object_id.unwrap_or(0),
datagram.publisher_priority,
datagram.status.as_ref().map_or("None".to_string(), |s| format!("{:?}", s)),
datagram.payload.as_ref().map_or(0, |p| p.len()));
subscribe_id
);
subscribe.datagram(datagram)?;
}
} else {
// Log the full state for debugging
let alias_map = self.subscribe_alias_map.lock().unwrap();
let subscribes = self.subscribes.lock().unwrap();
log::warn!(
"[SUBSCRIBER] recv_datagram: discarded due to unknown track_alias: track_alias={}, group_id={}, object_id={}, publisher_priority={}, status={}, payload_length={}",
"[SUBSCRIBER] recv_datagram: UNKNOWN track_alias={}, alias_map={:?}, subscribe_ids={:?}",
datagram.track_alias,
datagram.group_id,
datagram.object_id.unwrap_or(0),
datagram.publisher_priority,
datagram.status.as_ref().map_or("None".to_string(), |s| format!("{:?}", s)),
datagram.payload.as_ref().map_or(0, |p| p.len()));
alias_map.iter().collect::<Vec<_>>(),
subscribes.keys().collect::<Vec<_>>()
);
}

Ok(())
Expand Down