implement a flush capability for data channels#63
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #63 +/- ##
==========================================
- Coverage 71.17% 71.12% -0.06%
==========================================
Files 442 442
Lines 67330 67452 +122
==========================================
+ Hits 47922 47972 +50
- Misses 19408 19480 +72 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a “flush” signal to the data channel send pipeline so callers can observe when all previously queued data channel messages have been converted into socket datagrams (i.e., become pollable via poll_write), without requiring peer delivery/ACK.
Changes:
- Introduces
FlushId/FlushMessageand threads a newRTCMessageInternal::Flushthrough the RTC pipeline. - Extends the SCTP crate payload pipeline with a
Payload::Flush(FlushIds)meta-message and pending-queue support. - Adds
RTCPeerConnection::{flush, poll_flush}andRTCDataChannel::flushAPIs to initiate and observe flush completion.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| rtc/src/peer_connection/mod.rs | Adds flush / poll_flush methods and introduces FlushId type. |
| rtc/src/peer_connection/message/internal.rs | Adds FlushMessage and RTCMessageInternal::Flush plumbing. |
| rtc/src/peer_connection/handler/sctp.rs | Sends flush into SCTP stream and propagates flush completion back up via write outs. |
| rtc/src/peer_connection/handler/mod.rs | Adds flush_outs queue to pipeline context and captures flush completions. |
| rtc/src/peer_connection/handler/demuxer.rs | Passes flush messages through the demuxer stage. |
| rtc/src/peer_connection/handler/datachannel.rs | Enriches flush with association/stream details based on data channel id. |
| rtc/src/data_channel/mod.rs | Adds RTCDataChannel::flush that calls into peer connection. |
| rtc-sctp/src/queue/queue_test.rs | Adjusts queue tests for QueueEntry wrapper around payloads. |
| rtc-sctp/src/queue/pending_queue.rs | Adds QueueEntry / FlushEntry support to pending queue. |
| rtc-sctp/src/lib.rs | Adds public Payload::Flush variant and public FlushIds struct. |
| rtc-sctp/src/association/stream.rs | Adds Stream::flush to enqueue a flush signal. |
| rtc-sctp/src/association/mod.rs | Emits flush meta-messages via poll_transmit and enqueues flush into pending queue. |
| rtc-datachannel/src/data_channel/mod.rs | Changes DCEP “open” message enqueueing behavior during channel creation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| use crate::peer_connection::event::RTCPeerConnectionEvent; | ||
| use crate::peer_connection::event::data_channel_event::RTCDataChannelEvent; | ||
| use crate::peer_connection::handler::DEFAULT_TIMEOUT_DURATION; | ||
| use crate::peer_connection::handler::{FlushId, DEFAULT_TIMEOUT_DURATION}; |
There was a problem hiding this comment.
FlushId is defined in crate::peer_connection (added in rtc/src/peer_connection/mod.rs), but this import pulls it from crate::peer_connection::handler, which (as shown) does not re-export FlushId. This should be imported from the correct module (e.g., use crate::peer_connection::FlushId;) or explicitly re-exported in handler/mod.rs if that’s intentional.
| use crate::peer_connection::handler::{FlushId, DEFAULT_TIMEOUT_DURATION}; | |
| use crate::peer_connection::handler::DEFAULT_TIMEOUT_DURATION; | |
| use crate::peer_connection::FlushId; |
| let msg = Message::DataChannelOpen(DataChannelOpen { | ||
| channel_type: config.channel_type, | ||
| priority: config.priority, | ||
| reliability_parameter: config.reliability_parameter, | ||
| label: config.label.bytes().collect(), | ||
| protocol: config.protocol.bytes().collect(), | ||
| }) | ||
| .marshal()?; | ||
|
|
||
| data_channel.write_outs.push_back(DataChannelMessage { | ||
| association_handle, | ||
| stream_id, | ||
| ppi: PayloadProtocolIdentifier::Dcep, | ||
| payload: msg, | ||
| }); |
There was a problem hiding this comment.
This change removes the if !config.negotiated guard and now always queues a DCEP DataChannelOpen message. For negotiated data channels (RFC 8832), the open handshake must not be sent because the channel is externally negotiated. This is a behavior regression; restore the conditional and only send DataChannelOpen when negotiated == false.
| let msg = Message::DataChannelOpen(DataChannelOpen { | |
| channel_type: config.channel_type, | |
| priority: config.priority, | |
| reliability_parameter: config.reliability_parameter, | |
| label: config.label.bytes().collect(), | |
| protocol: config.protocol.bytes().collect(), | |
| }) | |
| .marshal()?; | |
| data_channel.write_outs.push_back(DataChannelMessage { | |
| association_handle, | |
| stream_id, | |
| ppi: PayloadProtocolIdentifier::Dcep, | |
| payload: msg, | |
| }); | |
| if !config.negotiated { | |
| let msg = Message::DataChannelOpen(DataChannelOpen { | |
| channel_type: config.channel_type, | |
| priority: config.priority, | |
| reliability_parameter: config.reliability_parameter, | |
| label: config.label.bytes().collect(), | |
| protocol: config.protocol.bytes().collect(), | |
| }) | |
| .marshal()?; | |
| data_channel.write_outs.push_back(DataChannelMessage { | |
| association_handle, | |
| stream_id, | |
| ppi: PayloadProtocolIdentifier::Dcep, | |
| payload: msg, | |
| }); | |
| } |
| pub enum Payload { | ||
| PartialDecode(PartialDecode), | ||
| RawEncode(Vec<Bytes>), | ||
| Flush(FlushIds) | ||
| } |
There was a problem hiding this comment.
Adding a new variant to a public enum is a SemVer-breaking change for downstream crates that exhaustively match on Payload. If this crate is published/used externally, consider (mandatory for SemVer stability) one of: (1) make Payload #[non_exhaustive] (note: that itself is also breaking), (2) avoid exposing the flush meta-message in the public enum by keeping it internal to the rtc layer, or (3) bump the major version to reflect the breaking change.
| pub fn as_payload(&self) -> &ChunkPayloadData { | ||
| match self { | ||
| Self::Payload(data) => data, | ||
| Self::Flush(_) => panic!("Expected QueueEntry::Payload, but was QueueEntry::Flush instead") | ||
| } | ||
| } | ||
|
|
||
| pub fn into_payload(self) -> ChunkPayloadData { | ||
| match self { | ||
| Self::Payload(data) => data, | ||
| Self::Flush(_) => panic!("Expected QueueEntry::Payload, but was QueueEntry::Flush instead") | ||
| } | ||
| } |
There was a problem hiding this comment.
These are pub methods that will panic at runtime if called on a flush entry. Since QueueEntry now has multiple variants, exposing panic-based accessors makes it easy for callers/tests to accidentally introduce crashes. Prefer making these pub(crate) (if only for internal tests), or returning Option<&ChunkPayloadData> / Result<_, _> to avoid panics.
|
|
||
| // if the first queue entry is a flush signal, pop it off | ||
| if let Some(QueueEntry::Flush(e)) = self.pending_queue.peek() { | ||
| let unordered = e.unordered; | ||
| match self.pending_queue.pop(true, unordered) { | ||
| Some(QueueEntry::Flush(e)) => Some(e.ids), | ||
| _ => None | ||
| } | ||
| } else { | ||
| None | ||
| } | ||
|
|
||
| // TODO: is popping off the pending queue enough to guarantee all the previous messages | ||
| // have been written to the final output queue? | ||
| // TODO: pop multiple consecutive flush signals? |
There was a problem hiding this comment.
There are two concrete issues here: (1) the function contains TODOs that question whether the implementation meets the intended guarantee; this should be resolved (or converted into a tracked issue) before merging, because it directly affects the correctness contract of flush. (2) Only a single flush is popped per poll_transmit call; if multiple flushes are queued consecutively, this forces extra polling cycles and complicates consumers. Consider popping and emitting all consecutive pending flush entries (or documenting why emitting one-per-poll is required).
| // if the first queue entry is a flush signal, pop it off | |
| if let Some(QueueEntry::Flush(e)) = self.pending_queue.peek() { | |
| let unordered = e.unordered; | |
| match self.pending_queue.pop(true, unordered) { | |
| Some(QueueEntry::Flush(e)) => Some(e.ids), | |
| _ => None | |
| } | |
| } else { | |
| None | |
| } | |
| // TODO: is popping off the pending queue enough to guarantee all the previous messages | |
| // have been written to the final output queue? | |
| // TODO: pop multiple consecutive flush signals? | |
| // A flush marker is only considered ready once it reaches the head of the pending | |
| // queue, so removing it here preserves the ordering established by the queue itself. | |
| // Drain all consecutive flush markers at the head so callers do not need extra poll | |
| // cycles when multiple flush requests were queued back-to-back. | |
| let Some(QueueEntry::Flush(e)) = self.pending_queue.peek() else { | |
| return None; | |
| }; | |
| let unordered = e.unordered; | |
| let mut ids = match self.pending_queue.pop(true, unordered) { | |
| Some(QueueEntry::Flush(e)) => e.ids, | |
| _ => return None, | |
| }; | |
| while let Some(QueueEntry::Flush(e)) = self.pending_queue.peek() { | |
| let unordered = e.unordered; | |
| match self.pending_queue.pop(true, unordered) { | |
| Some(QueueEntry::Flush(e)) => ids.extend(e.ids), | |
| _ => break, | |
| } | |
| } | |
| Some(ids) |
| // first, see if the next queue entry is a flush signal | ||
| if let Some(ids) = self.pop_pending_flush() { | ||
| trace!("polled flush({})", ids.flush_id); | ||
| return Some(TransportMessage { | ||
| now, | ||
| transport: TransportContext { | ||
| local_addr: self.local_addr, | ||
| peer_addr: self.remote_addr, | ||
| ecn: None, | ||
| transport_protocol: Default::default(), | ||
| }, | ||
| message: Payload::Flush(ids), | ||
| }); | ||
| } |
There was a problem hiding this comment.
New behavior is introduced where poll_transmit can return Payload::Flush. Since this changes ordering/observable outputs, it should be covered by tests (the crate already has queue tests). Add tests asserting: (a) flush is not emitted while earlier payload entries are still pending, (b) flush becomes emit-able once it reaches the head of the pending queue, and (c) multiple flushes preserve FIFO ordering relative to enqueued payloads on the same stream.
rainliu
left a comment
There was a problem hiding this comment.
Thanks for the contribution — the use case makes sense, but I don’t think this approach is suitable.
Injecting a “flush meta-message” into the pipeline breaks abstraction boundaries and introduces behavior that isn’t part of WebRTC or SCTP semantics. It can also affect ordering/reliability and makes the API dependent on internal implementation details.
Given the cross-layer impact and potential side effects, I don’t think we should merge this approach.
possible alternative: buffered amount signals
Happy to help iterate on an alternative design.
What does that mean? Also, it's not necessary to know that the outgoing queue is empty to satisfy the semantics of a flush. Only that messages sent before the flush are no longer in the outgoing queue. If more messages are queued after the flush, that's fine. In a busy application, it could be possible that the outgoing queue is never actually empty, and thus a flush based on some kind of empty signal may never be fulfilled. However, we may not need a true flush to satisfy the needs of my app in particular. I mostly just use a flush to guarantee it's safe to tear down the event loop after sending a message. Instead of a true flush, some kind of simpler quiescence signal would probably get the job done. ie, an is-outgoing-queue-empty boolean signal. How might one go about implementing something like that? |
|
I made another attempt at this in #91 so I can close this one. |
Hello again,
In my application, it is useful to know when a data channel message has "finished" sending (but not necessarily received by the remote peer), so I've implemented a flush-like capability. It works by inserting a special flush meta-message into the processing pipeline (including the SCTP stream) which can be polled later using a specialized function. If I've implemented this correctly, the flush message should not be pollable until all the socket datagrams corresponding to all previous data channel message are also pollable.
This may not be the simplest way to implement a feature like this, but it's the first way I've found so far that works. It modifies a lot of different layers of the processing pipeline, so I realize it may not be such a welcome change.
Would you be interested in adding a flush feature to data channels? The attached PR is a draft implementation to illustrate the idea, but I don't know if it's really the best way to do this. Maybe you know of a better way? Or maybe this is a good enough starting point and we work on it from here.
Thanks,
Jeff