feat(interceptor): add JitterBuffer receiver-side interceptor#84
feat(interceptor): add JitterBuffer receiver-side interceptor#84nightness wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new receiver-side jitter buffer interceptor to rtc-interceptor, intended to buffer inbound RTP per-SSRC and release packets in sequence-number order after an adaptive playout delay (with RTCP bypass).
Changes:
- Introduces
JitterBufferInterceptor+JitterBufferBuilderwith configurable min/initial/max delay and per-SSRC buffering logic. - Implements per-stream jitter estimation (RFC 3550 A.8) and u16 sequence wrap handling, plus force-release at
max_delay. - Exposes the new interceptor from the crate root and adds unit tests for stream behavior and interceptor-chain integration.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
rtc-interceptor/src/lib.rs |
Exposes the new jitter buffer module and re-exports its builder/interceptor types. |
rtc-interceptor/src/jitter_buffer/mod.rs |
Implements the interceptor wrapper, binding, timeout integration, and chain-level tests. |
rtc-interceptor/src/jitter_buffer/stream.rs |
Implements per-SSRC buffering, jitter adaptation, playout-delay parsing, and stream-level tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Insert at the correct sorted position (ascending sequence order). | ||
| let pos = self | ||
| .buffer | ||
| .iter() | ||
| .position(|(s, _, _, _)| Self::seq_is_after(*s, seq)) | ||
| .unwrap_or(self.buffer.len()); | ||
| self.buffer.insert(pos, (seq, now, release, pkt)); | ||
| true |
There was a problem hiding this comment.
insert() currently doesn't deduplicate sequence numbers already present in buffer. If the same RTP packet (same seq) is received twice before it is released, both copies will be buffered and later emitted to the application, causing duplicate delivery. Consider rejecting a packet when its seq already exists in the buffer (or using a map/set keyed by seq).
There was a problem hiding this comment.
Addressed: insert() now rejects duplicate sequence numbers already in the buffer. Should be marked outdated.
| /// Flush ready buffered packets into the inner chain, then poll the inner chain. | ||
| #[overrides] | ||
| fn poll_read(&mut self) -> Option<Self::Rout> { | ||
| self.drain_ready(Instant::now()); | ||
| self.inner.poll_read() | ||
| } |
There was a problem hiding this comment.
poll_read() calls drain_ready(Instant::now()). This makes the interceptor depend on wall-clock time even when a driver is providing its own Time via handle_timeout, and can panic if buffered packet arrival times are in the future relative to real Instant::now(). Consider removing the Instant::now() call (drain only on handle_timeout/poll_timeout), or track a monotonic last_now updated by handle_read/handle_timeout and use that instead.
There was a problem hiding this comment.
Addressed: replaced Instant::now() in poll_read() with tracked last_now field updated from handle_read/handle_timeout, making the interceptor deterministic. Should be marked outdated.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #84 +/- ##
==========================================
+ Coverage 71.17% 71.26% +0.08%
==========================================
Files 442 444 +2
Lines 67330 67524 +194
==========================================
+ Hits 47922 48119 +197
+ Misses 19408 19405 -3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- Fix all clippy collapsible_if warnings (stream.rs and mod.rs) - Reject duplicate sequence numbers already in the buffer - Make playout-delay extension per-packet instead of permanently mutating min/max bounds - Guard against non-monotonic time with checked_duration_since - Guard against zero clock_rate to prevent division by zero / NaN panic - Replace Instant::now() in poll_read with tracked last_now from handle_read/handle_timeout - Fix test_jitter_adapts_target_delay to use strictly increasing arrival times - Fix misleading comment in test_unbound_ssrc_passes_through and add assertion Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// A packet is ready when `now >= release_time` or it has been held for `>= max_delay`. | ||
| pub(crate) fn pop_ready(&mut self, now: Instant) -> Option<TaggedPacket> { | ||
| if let Some(&(_, arrival, release, _)) = self.buffer.front() { | ||
| let ready = now >= release || now.duration_since(arrival) >= self.max_delay; |
There was a problem hiding this comment.
pop_ready uses now.duration_since(arrival) which will panic if now < arrival (e.g., non-monotonic simulated time or a timeout tick earlier than a packet’s msg.now). Since the interceptor explicitly guards against non-monotonic time elsewhere (checked_duration_since), this should also be made non-panicking (e.g., use checked_duration_since and treat negative durations as “not held long enough”).
| let ready = now >= release || now.duration_since(arrival) >= self.max_delay; | |
| let held_long_enough = now | |
| .checked_duration_since(arrival) | |
| .map(|held| held >= self.max_delay) | |
| .unwrap_or(false); | |
| let ready = now >= release || held_long_enough; |
There was a problem hiding this comment.
Addressed: pop_ready now uses checked_duration_since, consistent with the rest of the module. Should be marked outdated.
| if let Packet::Rtp(ref rtp) = msg.message | ||
| && let Some(stream) = self.streams.get_mut(&rtp.header.ssrc) | ||
| { | ||
| // insert() returns false only for already-released sequences; drop those. |
There was a problem hiding this comment.
The comment says insert() returns false only for already-released sequences, but JitterBufferStream::insert can also return false for duplicate sequence numbers already in the buffer. Please update the comment (or handle the return value explicitly) so the drop behavior is accurately documented.
| // insert() returns false only for already-released sequences; drop those. | |
| // Packets rejected by insert() are dropped, including sequences that have | |
| // already been released and duplicates that are already buffered. |
There was a problem hiding this comment.
Addressed: comment updated to say 'returns false for already-released sequences or duplicates'. Pushed in commit 1e02959. Should be marked outdated.
Adds a new `JitterBufferInterceptor` that buffers incoming RTP packets per SSRC and releases them in sequence order after an adaptive playout delay computed from the RFC 3550 §A.8 jitter formula. - `JitterBufferStream` (stream.rs): per-SSRC packet buffer with adaptive target delay; force-release after max_delay to prevent starvation; playout-delay RTP extension (ietf WebRTC draft) for sender-side hints - `JitterBufferInterceptor<P>` (mod.rs): wraps inner chain; buffers RTP for tracked SSRCs, passes RTCP and untracked-SSRC packets immediately; drains ready packets into inner chain in handle_timeout + poll_read - `JitterBufferBuilder`: configurable min/max/initial delay with sensible defaults (20 ms / 500 ms / 50 ms) - Jitter update skips out-of-order RTP timestamps to prevent spurious delay spikes from reordered packets - 15 unit tests across stream.rs and mod.rs; all 129 interceptor tests pass Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix all clippy collapsible_if warnings (stream.rs and mod.rs) - Reject duplicate sequence numbers already in the buffer - Make playout-delay extension per-packet instead of permanently mutating min/max bounds - Guard against non-monotonic time with checked_duration_since - Guard against zero clock_rate to prevent division by zero / NaN panic - Replace Instant::now() in poll_read with tracked last_now from handle_read/handle_timeout - Fix test_jitter_adapts_target_delay to use strictly increasing arrival times - Fix misleading comment in test_unbound_ssrc_passes_through and add assertion Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ates Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1e02959 to
0bdfc8d
Compare
|
Rebased onto upstream/master so this PR contains only its own changes. Previous branch structure caused merge conflicts when PRs were merged in sequence. Each PR is now independently mergeable. |
The JitterBufferStream constructor did not clamp initial_delay to the configured bounds, so setting initial_delay=5s with max_delay=200ms would use the unclamped 5s for the first packet's release time. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
JitterBufferInterceptor— a receiver-side interceptor that buffers incoming RTP packets and releases them in sequence-number order after a playout delayjitter += (d - jitter) / 16.0)wrapping_sub(seq) < 0x8000comparisonsmax_delayto prevent starvationJitterBufferBuilderwith configurablemin_delay(20ms),initial_delay(50ms),max_delay(500ms)Design
In
poll_read: callsstream.pop_ready(now)for all streams, injects ready packets viainner.handle_read(), then returnsinner.poll_read(). This ensures all downstream interceptors see every packet.Review feedback addressed
ifwarnings (3 in stream.rs, 1 in mod.rs)checked_duration_since)clock_rate(skip jitter update to prevent div-by-zero)poll_read(tracklast_nowfromhandle_read/handle_timeout)test_unbound_ssrc_passes_throughTest plan
cargo test -p rtc-interceptor jitter_buffercargo test -p rtc-interceptor(all 129 unit tests pass)cargo clippy -p rtc-interceptor(zero warnings)cargo fmt --check(clean)🤖 Generated with Claude Code