diff --git a/rtc-datachannel/src/data_channel/data_channel_test.rs b/rtc-datachannel/src/data_channel/data_channel_test.rs index 7987f3d7..00854af3 100644 --- a/rtc-datachannel/src/data_channel/data_channel_test.rs +++ b/rtc-datachannel/src/data_channel/data_channel_test.rs @@ -208,6 +208,100 @@ fn test_data_channel_channel_type_reliable_ordered() -> Result<()> { Ok(()) } +/// Pre-negotiated (out-of-band) channels rely on the `SctpHandler` to suppress +/// the DCEP DataChannelOpen payload on the wire. This test verifies the +/// `dial()` side of that contract: the outbound `DataChannelMessage` must carry +/// `negotiated = true` so the SCTP handler knows to open the stream without +/// sending the DCEP payload to the remote peer. +/// +/// Note: the handler-side suppression is exercised by integration tests in the +/// `rtc` crate; this unit test only covers the flagging produced by `dial()`. +/// +/// Regression test for . +#[test] +fn test_data_channel_negotiated_dial_flags_message() -> Result<()> { + let (a0, a1) = create_new_association_pair()?; + let stream_id = 42; + + let cfg = DataChannelConfig { + channel_type: ChannelType::Reliable, + negotiated: true, + label: "negotiated-ch".to_string(), + protocol: "test-proto".to_string(), + ..Default::default() + }; + + let mut dc = DataChannel::dial(cfg.clone(), a0, stream_id)?; + + // dial() must still produce exactly one outbound DCEP message so the SCTP + // handler can register the stream. + let msg = dc.poll_write().ok_or(Error::ErrAssociationNotExisted)?; + assert_eq!(msg.ppi, PayloadProtocolIdentifier::Dcep); + assert_eq!(msg.stream_id, stream_id); + + // The message must be flagged as negotiated so the SCTP handler opens the + // stream but does NOT send the DCEP payload to the peer. + assert!( + msg.negotiated, + "negotiated DataChannelOpen must be marked as internal-only" + ); + + // No further outbound messages should be queued. + assert!( + dc.poll_write().is_none(), + "negotiated dial should produce exactly one outbound message" + ); + + // Verify the config round-trips correctly. + assert_eq!(dc.config(), &cfg); + + dc.close()?; + close_association_pair(a0, a1); + + Ok(()) +} + +/// Non-negotiated (in-band) channels must send a regular DCEP DataChannelOpen +/// that can be accepted by the peer. +#[test] +fn test_data_channel_non_negotiated_sends_dcep() -> Result<()> { + let (a0, a1) = create_new_association_pair()?; + + let cfg = DataChannelConfig { + channel_type: ChannelType::Reliable, + negotiated: false, + label: "in-band".to_string(), + ..Default::default() + }; + + let mut dc0 = DataChannel::dial(cfg.clone(), a0, 100)?; + + let msg = dc0.poll_write().ok_or(Error::ErrAssociationNotExisted)?; + assert_eq!(msg.ppi, PayloadProtocolIdentifier::Dcep); + // In-band channel: negotiated flag must be false so the SCTP handler sends + // the DCEP payload over the wire. + assert!( + !msg.negotiated, + "in-band DataChannelOpen must NOT be marked as internal-only" + ); + + // The peer should be able to accept this DCEP message. + let mut dc1 = DataChannel::accept( + DataChannelConfig::default(), + a1, + msg.stream_id, + PayloadProtocolIdentifier::Dcep, + &msg.payload, + )?; + assert_eq!(dc1.config(), &cfg, "remote config should match"); + + dc0.close()?; + dc1.close()?; + close_association_pair(a0, a1); + + Ok(()) +} + /* #[tokio::test] async fn test_data_channel_channel_type_reliable_unordered() -> Result<()> { diff --git a/rtc-datachannel/src/data_channel/mod.rs b/rtc-datachannel/src/data_channel/mod.rs index 1411abd1..6e91e500 100644 --- a/rtc-datachannel/src/data_channel/mod.rs +++ b/rtc-datachannel/src/data_channel/mod.rs @@ -25,13 +25,41 @@ pub struct DataChannelConfig { pub protocol: String, } -/// DataChannelMessage is used to data sent over SCTP +/// DataChannelMessage is used for data sent over SCTP. +// Note: #[non_exhaustive] is intentional — this crate is pre-1.0 (0.20.0-alpha) +// and adding the `negotiated` field would otherwise be a semver-breaking change +// for downstream code that constructs `DataChannelMessage` via struct literals. +#[non_exhaustive] #[derive(Debug, Default, Clone)] pub struct DataChannelMessage { pub association_handle: usize, pub stream_id: u16, pub ppi: PayloadProtocolIdentifier, pub payload: BytesMut, + /// When `true`, the channel was created via out-of-band (pre-negotiated) + /// negotiation. The SCTP handler opens the stream but does **not** send + /// the DCEP open message over the wire, because both peers already agree + /// on the channel parameters. + pub negotiated: bool, +} + +impl DataChannelMessage { + /// Creates a new `DataChannelMessage` with all fields specified. + pub fn new( + association_handle: usize, + stream_id: u16, + ppi: PayloadProtocolIdentifier, + payload: BytesMut, + negotiated: bool, + ) -> Self { + Self { + association_handle, + stream_id, + ppi, + payload, + negotiated, + } + } } /// DataChannel represents a data channel @@ -52,6 +80,8 @@ pub struct DataChannel { } impl DataChannel { + /// Creates a new `DataChannel` with the given configuration, association + /// handle, and SCTP stream identifier. Counters start at zero. fn new(config: DataChannelConfig, association_handle: usize, stream_id: u16) -> Self { Self { config, @@ -63,7 +93,12 @@ impl DataChannel { } } - /// Dial opens a data channels over SCTP + /// Dial opens a data channel over SCTP. + /// + /// A DCEP `DataChannelOpen` message is always constructed so that the SCTP + /// handler registers the underlying stream. For **in-band** channels the + /// message is also sent over the wire (RFC 8832 sec. 3); for + /// **pre-negotiated** channels it stays internal-only (see issue #61). pub fn dial( config: DataChannelConfig, association_handle: usize, @@ -71,23 +106,34 @@ impl DataChannel { ) -> Result { let mut data_channel = DataChannel::new(config.clone(), association_handle, stream_id); - if !config.negotiated { - let msg = Message::DataChannelOpen(DataChannelOpen { - channel_type: config.channel_type, - priority: config.priority, - reliability_parameter: config.reliability_parameter, - label: config.label.bytes().collect(), - protocol: config.protocol.bytes().collect(), - }) - .marshal()?; - - data_channel.write_outs.push_back(DataChannelMessage { - association_handle, - stream_id, - ppi: PayloadProtocolIdentifier::Dcep, - payload: msg, - }); - } + // Build a DCEP DataChannelOpen that the SCTP handler intercepts to + // register the underlying SCTP stream. + // + // For in-band channels (negotiated=false) the message is also sent over + // the wire to initiate the DCEP handshake per RFC 8832 sec. 3. + // + // For pre-negotiated channels (negotiated=true) the message is + // internal-only: the SCTP handler opens the stream and sets its + // reliability parameters but does NOT send DCEP to the peer. Without + // this the SCTP association never registers the stream, causing every + // subsequent write to fail with "Stream not existed" (issue #61). + let msg = Message::DataChannelOpen(DataChannelOpen { + channel_type: config.channel_type, + priority: config.priority, + reliability_parameter: config.reliability_parameter, + label: config.label.bytes().collect(), + protocol: config.protocol.bytes().collect(), + }) + .marshal()?; + + data_channel.write_outs.push_back(DataChannelMessage { + association_handle, + stream_id, + ppi: PayloadProtocolIdentifier::Dcep, + payload: msg, + // Forward the negotiated flag so the SCTP handler can skip the DCEP write. + negotiated: config.negotiated, + }); Ok(data_channel) } @@ -189,6 +235,7 @@ impl DataChannel { stream_id: self.stream_id, ppi: PayloadProtocolIdentifier::Dcep, payload: ack, + ..Default::default() }); Ok(()) } @@ -200,6 +247,7 @@ impl DataChannel { stream_id: self.stream_id, ppi: PayloadProtocolIdentifier::Dcep, payload: close, + ..Default::default() }); Ok(()) } @@ -212,6 +260,7 @@ impl DataChannel { stream_id: self.stream_id, ppi: PayloadProtocolIdentifier::Dcep, payload: low_threshold, + ..Default::default() }); Ok(()) } @@ -224,6 +273,7 @@ impl DataChannel { stream_id: self.stream_id, ppi: PayloadProtocolIdentifier::Dcep, payload: low_threshold, + ..Default::default() }); Ok(()) } diff --git a/rtc/src/peer_connection/handler/sctp.rs b/rtc/src/peer_connection/handler/sctp.rs index 8f1d4f41..06b1f34b 100644 --- a/rtc/src/peer_connection/handler/sctp.rs +++ b/rtc/src/peer_connection/handler/sctp.rs @@ -152,14 +152,15 @@ impl<'a> sansio::Protocol { @@ -287,6 +288,11 @@ impl<'a> sansio::Protocol { is_dcep_internal_control_message = true; @@ -485,3 +491,178 @@ fn split_transmit(transmit: TransportMessage) -> Vec (SctpHandlerContext, usize) { + let mut sctp_transport = RTCSctpTransport::new( + crate::peer_connection::configuration::setting_engine::SctpMaxMessageSize::default(), + ); + sctp_transport + .start( + RTCDtlsRole::Client, + SCTPTransportCapabilities { + max_message_size: 65536, + }, + 5000, + 5000, + ) + .expect("start sctp transport"); + + // Trigger a client-side connect to create an association. + let endpoint = sctp_transport.sctp_endpoint.as_mut().unwrap(); + let config = sctp_transport.sctp_transport_config.clone().unwrap(); + let (ch, conn) = endpoint + .connect( + sctp::ClientConfig::new(config), + TransportContext::default().peer_addr, + ) + .expect("connect"); + let association_handle = ch.0; + sctp_transport.sctp_associations.insert(ch, conn); + + (SctpHandlerContext::new(sctp_transport), association_handle) + } + + /// Build a serialised DCEP DataChannelOpen payload. + fn make_dcep_open_payload() -> BytesMut { + let msg = Message::DataChannelOpen(DataChannelOpen { + channel_type: datachannel::message::message_channel_open::ChannelType::Reliable, + priority: 0, + reliability_parameter: 0, + label: b"test-label".to_vec(), + protocol: b"".to_vec(), + }); + msg.marshal().expect("marshal DataChannelOpen") + } + + /// A negotiated (pre-negotiated / out-of-band) DataChannelOpen must set + /// `is_dcep_internal_control_message = true` so the SCTP handler does NOT + /// forward the DCEP payload over the wire. + /// + /// With a not-yet-established association the stream is writable (default + /// `ReadWritable` state) but `send_payload_data` would fail with + /// `ErrPayloadDataStateNotExist` if the handler tried to write the DCEP + /// payload. The fact that `handle_write` returns `Ok(())` proves the + /// negotiated branch suppressed the wire write. + #[test] + fn negotiated_datachannel_open_suppresses_wire_write() { + let (mut ctx, association_handle) = make_ctx_with_association(); + let payload = make_dcep_open_payload(); + + let msg = TaggedRTCMessageInternal { + now: Instant::now(), + transport: TransportContext::default(), + message: RTCMessageInternal::Dtls(DTLSMessage::Sctp(DataChannelMessage::new( + association_handle, + 42, + PayloadProtocolIdentifier::Dcep, + payload, + true, + ))), + }; + + let mut handler = SctpHandler::new(&mut ctx); + handler + .handle_write(msg) + .expect("handle_write must succeed for negotiated channel (wire write suppressed)"); + } + + /// A non-negotiated (in-band) DataChannelOpen does NOT suppress the wire + /// write. Because the stream defaults to `ReadWritable`, `write_with_ppi` + /// is attempted but fails with `ErrPayloadDataStateNotExist` since the + /// SCTP association has not completed its handshake. + /// + /// This confirms that the `negotiated` flag is the deciding factor: the + /// negotiated test above succeeds precisely because the write is suppressed. + #[test] + fn non_negotiated_datachannel_open_attempts_wire_write() { + let (mut ctx, association_handle) = make_ctx_with_association(); + let payload = make_dcep_open_payload(); + + let msg = TaggedRTCMessageInternal { + now: Instant::now(), + transport: TransportContext::default(), + message: RTCMessageInternal::Dtls(DTLSMessage::Sctp(DataChannelMessage::new( + association_handle, + 43, + PayloadProtocolIdentifier::Dcep, + payload, + false, + ))), + }; + + let mut handler = SctpHandler::new(&mut ctx); + let result = handler.handle_write(msg); + + // The non-negotiated path tries to write the DCEP payload over the + // wire, which fails because the association is not yet established. + assert!( + matches!( + result, + Err(shared::error::Error::ErrPayloadDataStateNotExist) + ), + "in-band dial must fail with ErrPayloadDataStateNotExist when the \ + association is not yet established, but got: {result:?}" + ); + } + + /// When both peers open the same negotiated stream, the second + /// `open_stream` call must fail with `ErrStreamAlreadyExist`. This + /// exercises the peer-race scenario described in RFC 8832 where both + /// sides send `DATA_CHANNEL_OPEN` for the same pre-negotiated stream ID. + #[test] + fn negotiated_dial_duplicate_stream_returns_already_exist() { + let (mut ctx, association_handle) = make_ctx_with_association(); + let payload = make_dcep_open_payload(); + + // First dial: opens stream 50 successfully. + let msg1 = TaggedRTCMessageInternal { + now: Instant::now(), + transport: TransportContext::default(), + message: RTCMessageInternal::Dtls(DTLSMessage::Sctp(DataChannelMessage::new( + association_handle, + 50, + PayloadProtocolIdentifier::Dcep, + payload.clone(), + true, + ))), + }; + + let mut handler = SctpHandler::new(&mut ctx); + handler + .handle_write(msg1) + .expect("first negotiated open must succeed"); + + // Second dial on the same stream ID: simulates the peer race where + // both sides send DATA_CHANNEL_OPEN for the same negotiated channel. + let msg2 = TaggedRTCMessageInternal { + now: Instant::now(), + transport: TransportContext::default(), + message: RTCMessageInternal::Dtls(DTLSMessage::Sctp(DataChannelMessage::new( + association_handle, + 50, + PayloadProtocolIdentifier::Dcep, + payload, + true, + ))), + }; + + let mut handler = SctpHandler::new(&mut ctx); + let result = handler.handle_write(msg2); + + assert!( + matches!(result, Err(shared::error::Error::ErrStreamAlreadyExist)), + "expected ErrStreamAlreadyExist for duplicate negotiated stream, got: {result:?}" + ); + } +} diff --git a/rtc/tests/negotiated_data_channel_rtc2rtc.rs b/rtc/tests/negotiated_data_channel_rtc2rtc.rs new file mode 100644 index 00000000..dcd54b53 --- /dev/null +++ b/rtc/tests/negotiated_data_channel_rtc2rtc.rs @@ -0,0 +1,448 @@ +/// Integration test for negotiated DataChannels between two rtc (sansio) peers +/// +/// This test verifies that two rtc peers can create negotiated DataChannels +/// (out-of-band negotiation with matching channel IDs) and exchange messages. +/// Unlike in-band channels, negotiated channels must be created on BOTH peers +/// with the same channel ID before the connection is established. +use anyhow::Result; +use bytes::BytesMut; +use sansio::Protocol; +use shared::{TaggedBytesMut, TransportContext, TransportProtocol}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::net::UdpSocket; +use tokio::sync::Mutex; + +use rtc::data_channel::RTCDataChannelInit; +use rtc::peer_connection::RTCPeerConnectionBuilder; +use rtc::peer_connection::configuration::RTCConfigurationBuilder; +use rtc::peer_connection::configuration::setting_engine::SettingEngine; +use rtc::peer_connection::event::RTCDataChannelEvent; +use rtc::peer_connection::event::RTCPeerConnectionEvent; +use rtc::peer_connection::message::RTCMessage; +use rtc::peer_connection::state::RTCIceConnectionState; +use rtc::peer_connection::state::RTCPeerConnectionState; +use rtc::peer_connection::transport::RTCDtlsRole; +use rtc::peer_connection::transport::RTCIceCandidateInit; +use rtc::peer_connection::transport::RTCIceServer; +use rtc::peer_connection::transport::{CandidateConfig, CandidateHostConfig, RTCIceCandidate}; + +const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(30); +const NEGOTIATED_CHANNEL_ID: u16 = 5; +const TEST_MESSAGE_FROM_OFFERER: &str = "Hello from offerer (negotiated)!"; +const TEST_MESSAGE_FROM_ANSWERER: &str = "Hello from answerer (negotiated)!"; + +/// Test negotiated DataChannel communication between two rtc (sansio) peers. +/// +/// Both peers create a DataChannel with `negotiated: Some(5)` (same channel ID). +/// After connecting, both sides send a message and verify the other received it. +#[tokio::test] +async fn test_negotiated_data_channel_rtc_to_rtc() -> Result<()> { + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .is_test(true) + .try_init() + .ok(); + + log::info!("Starting negotiated DataChannel test: rtc offer <-> rtc answer"); + + // Track received messages + let offerer_received_messages = Arc::new(Mutex::new(Vec::::new())); + let answerer_received_messages = Arc::new(Mutex::new(Vec::::new())); + + // --- Create offerer peer --- + let offerer_socket = UdpSocket::bind("127.0.0.1:0").await?; + let offerer_local_addr = offerer_socket.local_addr()?; + log::info!("Offerer peer bound to {}", offerer_local_addr); + + let mut offerer_setting_engine = SettingEngine::default(); + offerer_setting_engine.set_answering_dtls_role(RTCDtlsRole::Server)?; + + let offerer_config = RTCConfigurationBuilder::new() + .with_ice_servers(vec![RTCIceServer { + urls: vec!["stun:stun.l.google.com:19302".to_owned()], + ..Default::default() + }]) + .build(); + + let mut offerer_pc = RTCPeerConnectionBuilder::new() + .with_configuration(offerer_config) + .with_setting_engine(offerer_setting_engine) + .build()?; + log::info!("Created offerer peer connection"); + + // Create negotiated DataChannel on offerer side + let dc_label = "negotiated-channel"; + let _offerer_dc = offerer_pc.create_data_channel( + dc_label, + Some(RTCDataChannelInit { + negotiated: Some(NEGOTIATED_CHANNEL_ID), + ..Default::default() + }), + )?; + log::info!( + "Offerer created negotiated DataChannel '{}' with id={}", + dc_label, + NEGOTIATED_CHANNEL_ID + ); + + // Add local candidate for offerer + let offerer_candidate = CandidateHostConfig { + base_config: CandidateConfig { + network: "udp".to_owned(), + address: offerer_local_addr.ip().to_string(), + port: offerer_local_addr.port(), + component: 1, + ..Default::default() + }, + ..Default::default() + } + .new_candidate_host()?; + let offerer_candidate_init = RTCIceCandidate::from(&offerer_candidate).to_json()?; + offerer_pc.add_local_candidate(offerer_candidate_init)?; + + // Create offer + let offer = offerer_pc.create_offer(None)?; + log::info!("Offerer created offer"); + offerer_pc.set_local_description(offer.clone())?; + log::info!("Offerer set local description"); + + // --- Create answerer peer --- + let answerer_socket = UdpSocket::bind("127.0.0.1:0").await?; + let answerer_local_addr = answerer_socket.local_addr()?; + log::info!("Answerer peer bound to {}", answerer_local_addr); + + let mut answerer_setting_engine = SettingEngine::default(); + answerer_setting_engine.set_answering_dtls_role(RTCDtlsRole::Client)?; + + let answerer_config = RTCConfigurationBuilder::new() + .with_ice_servers(vec![RTCIceServer { + urls: vec!["stun:stun.l.google.com:19302".to_owned()], + ..Default::default() + }]) + .build(); + + let mut answerer_pc = RTCPeerConnectionBuilder::new() + .with_configuration(answerer_config) + .with_setting_engine(answerer_setting_engine) + .build()?; + log::info!("Created answerer peer connection"); + + // Create negotiated DataChannel on answerer side (same label and ID) + let _answerer_dc = answerer_pc.create_data_channel( + dc_label, + Some(RTCDataChannelInit { + negotiated: Some(NEGOTIATED_CHANNEL_ID), + ..Default::default() + }), + )?; + log::info!( + "Answerer created negotiated DataChannel '{}' with id={}", + dc_label, + NEGOTIATED_CHANNEL_ID + ); + + // Set remote description on answerer (the offer) + answerer_pc.set_remote_description(offer)?; + log::info!("Answerer set remote description"); + + // Add local candidate for answerer + let answerer_candidate = CandidateHostConfig { + base_config: CandidateConfig { + network: "udp".to_owned(), + address: answerer_local_addr.ip().to_string(), + port: answerer_local_addr.port(), + component: 1, + ..Default::default() + }, + ..Default::default() + } + .new_candidate_host()?; + let answerer_candidate_init = RTCIceCandidate::from(&answerer_candidate).to_json()?; + answerer_pc.add_local_candidate(answerer_candidate_init)?; + + // Create and set answer + let answer = answerer_pc.create_answer(None)?; + log::info!("Answerer created answer"); + answerer_pc.set_local_description(answer.clone())?; + log::info!("Answerer set local description"); + + // Set remote description on offerer (the answer) + offerer_pc.set_remote_description(answer)?; + log::info!("Offerer set remote description"); + + // Exchange ICE candidates between peers + let offerer_remote_candidate = RTCIceCandidateInit { + candidate: format!( + "candidate:1 1 udp 2130706431 {} {} typ host", + answerer_local_addr.ip(), + answerer_local_addr.port() + ), + ..Default::default() + }; + offerer_pc.add_local_candidate(offerer_remote_candidate)?; + log::info!("Offerer added answerer's candidate"); + + let answerer_remote_candidate = RTCIceCandidateInit { + candidate: format!( + "candidate:1 1 udp 2130706431 {} {} typ host", + offerer_local_addr.ip(), + offerer_local_addr.port() + ), + ..Default::default() + }; + answerer_pc.add_local_candidate(answerer_remote_candidate)?; + log::info!("Answerer added offerer's candidate"); + + // --- Run event loops --- + let mut offerer_buf = vec![0u8; 2000]; + let mut answerer_buf = vec![0u8; 2000]; + let mut offerer_connected = false; + let mut answerer_connected = false; + let mut offerer_dc_opened = false; + let mut answerer_dc_opened = false; + let mut offerer_message_sent = false; + let mut answerer_message_sent = false; + + let start_time = Instant::now(); + let test_timeout = Duration::from_secs(30); + + while start_time.elapsed() < test_timeout { + // --- Process offerer --- + while let Some(msg) = offerer_pc.poll_write() { + match offerer_socket + .send_to(&msg.message, msg.transport.peer_addr) + .await + { + Ok(n) => { + log::trace!("Offerer sent {} bytes to {}", n, msg.transport.peer_addr); + } + Err(err) => { + log::error!("Offerer socket write error: {}", err); + } + } + } + + while let Some(event) = offerer_pc.poll_event() { + match event { + RTCPeerConnectionEvent::OnIceConnectionStateChangeEvent(state) => { + log::info!("Offerer ICE connection state: {}", state); + if state == RTCIceConnectionState::Failed { + return Err(anyhow::anyhow!("Offerer ICE connection failed")); + } + } + RTCPeerConnectionEvent::OnConnectionStateChangeEvent(state) => { + log::info!("Offerer peer connection state: {}", state); + if state == RTCPeerConnectionState::Failed { + return Err(anyhow::anyhow!("Offerer peer connection failed")); + } + if state == RTCPeerConnectionState::Connected { + log::info!("Offerer peer connection connected!"); + offerer_connected = true; + } + } + RTCPeerConnectionEvent::OnDataChannel(dc_event) => match dc_event { + RTCDataChannelEvent::OnOpen(channel_id) => { + log::info!("Offerer data channel {} opened", channel_id); + if channel_id == NEGOTIATED_CHANNEL_ID { + offerer_dc_opened = true; + } + } + _ => {} + }, + _ => {} + } + } + + while let Some(message) = offerer_pc.poll_read() { + match message { + RTCMessage::RtpPacket(_, _) => {} + RTCMessage::RtcpPacket(_, _) => {} + RTCMessage::DataChannelMessage(channel_id, data_channel_message) => { + let msg_str = String::from_utf8(data_channel_message.data.to_vec())?; + log::info!( + "Offerer received message on channel {}: '{}'", + channel_id, + msg_str + ); + let mut msgs = offerer_received_messages.lock().await; + msgs.push(msg_str); + } + } + } + + // --- Process answerer --- + while let Some(msg) = answerer_pc.poll_write() { + match answerer_socket + .send_to(&msg.message, msg.transport.peer_addr) + .await + { + Ok(n) => { + log::trace!("Answerer sent {} bytes to {}", n, msg.transport.peer_addr); + } + Err(err) => { + log::error!("Answerer socket write error: {}", err); + } + } + } + + while let Some(event) = answerer_pc.poll_event() { + match event { + RTCPeerConnectionEvent::OnIceConnectionStateChangeEvent(state) => { + log::info!("Answerer ICE connection state: {}", state); + if state == RTCIceConnectionState::Failed { + return Err(anyhow::anyhow!("Answerer ICE connection failed")); + } + } + RTCPeerConnectionEvent::OnConnectionStateChangeEvent(state) => { + log::info!("Answerer peer connection state: {}", state); + if state == RTCPeerConnectionState::Failed { + return Err(anyhow::anyhow!("Answerer peer connection failed")); + } + if state == RTCPeerConnectionState::Connected { + log::info!("Answerer peer connection connected!"); + answerer_connected = true; + } + } + RTCPeerConnectionEvent::OnDataChannel(dc_event) => match dc_event { + RTCDataChannelEvent::OnOpen(channel_id) => { + log::info!("Answerer data channel {} opened", channel_id); + if channel_id == NEGOTIATED_CHANNEL_ID { + answerer_dc_opened = true; + } + } + _ => {} + }, + _ => {} + } + } + + while let Some(message) = answerer_pc.poll_read() { + match message { + RTCMessage::RtpPacket(_, _) => {} + RTCMessage::RtcpPacket(_, _) => {} + RTCMessage::DataChannelMessage(channel_id, data_channel_message) => { + let msg_str = String::from_utf8(data_channel_message.data.to_vec())?; + log::info!( + "Answerer received message on channel {}: '{}'", + channel_id, + msg_str + ); + let mut msgs = answerer_received_messages.lock().await; + msgs.push(msg_str); + } + } + } + + // Send messages once both are connected and both channels are open + if offerer_connected + && answerer_connected + && offerer_dc_opened + && answerer_dc_opened + && !offerer_message_sent + { + if let Some(mut dc) = offerer_pc.data_channel(NEGOTIATED_CHANNEL_ID) { + log::info!("Offerer sending message: '{}'", TEST_MESSAGE_FROM_OFFERER); + dc.send_text(TEST_MESSAGE_FROM_OFFERER.to_string())?; + offerer_message_sent = true; + } + } + + if offerer_connected + && answerer_connected + && offerer_dc_opened + && answerer_dc_opened + && !answerer_message_sent + { + if let Some(mut dc) = answerer_pc.data_channel(NEGOTIATED_CHANNEL_ID) { + log::info!("Answerer sending message: '{}'", TEST_MESSAGE_FROM_ANSWERER); + dc.send_text(TEST_MESSAGE_FROM_ANSWERER.to_string())?; + answerer_message_sent = true; + } + } + + // Check if both sides received the expected messages + if offerer_message_sent && answerer_message_sent { + let offerer_msgs = offerer_received_messages.lock().await; + let answerer_msgs = answerer_received_messages.lock().await; + + let offerer_got_msg = offerer_msgs.iter().any(|m| m == TEST_MESSAGE_FROM_ANSWERER); + let answerer_got_msg = answerer_msgs.iter().any(|m| m == TEST_MESSAGE_FROM_OFFERER); + + if offerer_got_msg && answerer_got_msg { + log::info!( + "Test complete - both peers received messages via negotiated DataChannel" + ); + log::info!(" Offerer received: {:?}", offerer_msgs.as_slice()); + log::info!(" Answerer received: {:?}", answerer_msgs.as_slice()); + + assert!( + answerer_msgs.iter().any(|m| m == TEST_MESSAGE_FROM_OFFERER), + "Answerer should have received offerer's message" + ); + assert!( + offerer_msgs.iter().any(|m| m == TEST_MESSAGE_FROM_ANSWERER), + "Offerer should have received answerer's message" + ); + + offerer_pc.close()?; + answerer_pc.close()?; + return Ok(()); + } + } + + // Handle timeouts + let offerer_timeout = offerer_pc + .poll_timeout() + .unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION); + let answerer_timeout = answerer_pc + .poll_timeout() + .unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION); + let next_timeout = offerer_timeout.min(answerer_timeout); + let delay = next_timeout.saturating_duration_since(Instant::now()); + + if delay.is_zero() { + offerer_pc.handle_timeout(Instant::now()).ok(); + answerer_pc.handle_timeout(Instant::now()).ok(); + continue; + } + + let sleep = tokio::time::sleep(delay.min(Duration::from_millis(10))); + tokio::pin!(sleep); + + tokio::select! { + _ = sleep => { + offerer_pc.handle_timeout(Instant::now()).ok(); + answerer_pc.handle_timeout(Instant::now()).ok(); + } + Ok((n, peer_addr)) = offerer_socket.recv_from(&mut offerer_buf) => { + offerer_pc.handle_read(TaggedBytesMut { + now: Instant::now(), + transport: TransportContext { + local_addr: offerer_local_addr, + peer_addr, + ecn: None, + transport_protocol: TransportProtocol::UDP, + }, + message: BytesMut::from(&offerer_buf[..n]), + }).ok(); + } + Ok((n, peer_addr)) = answerer_socket.recv_from(&mut answerer_buf) => { + answerer_pc.handle_read(TaggedBytesMut { + now: Instant::now(), + transport: TransportContext { + local_addr: answerer_local_addr, + peer_addr, + ecn: None, + transport_protocol: TransportProtocol::UDP, + }, + message: BytesMut::from(&answerer_buf[..n]), + }).ok(); + } + } + } + + Err(anyhow::anyhow!( + "Test timeout - negotiated DataChannel bidirectional message exchange did not complete in time" + )) +}