Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rtc-turn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ harness = false
name = "turn_client_udp"
path = "examples/turn_client_udp.rs"
bench = false

[[example]]
name = "turn_client_tcp"
path = "examples/turn_client_tcp.rs"
bench = false
247 changes: 247 additions & 0 deletions rtc-turn/examples/turn_client_tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
use bytes::BytesMut;
use clap::Parser;
use log::trace;
use rtc_turn::client::*;
use sansio::Protocol;
use shared::error::{Error, Result};
use shared::tcp_framing::{TcpFrameDecoder, frame_packet};
use shared::{TransportContext, TransportMessage, TransportProtocol};
use std::io::{ErrorKind, Read, Write};
use std::net::TcpStream;
use std::str::FromStr;
use std::time::{Duration, Instant};

// First, start turn server with TCP support:
//
// Option 1: webrtc-rs/webrtc/turn/examples/turn_server_tcp:
// RUST_LOG=trace cargo run --color=always --package turn --example turn_server_tcp -- --public-ip 127.0.0.1 --users user=pass
//
// Option 2: coturn (reference TURN server):
// turnserver --lt-cred-mech --user user:pass --realm webrtc.rs --no-dtls --no-tls
//
// Then, start this example:
// RUST_LOG=trace cargo run --color=always --package rtc-turn --example turn_client_tcp -- --host 127.0.0.1 --user user=pass

#[derive(Parser)]
#[command(name = "TURN Client TCP")]
#[command(author = "Brainwires <brainwires@github.com>")]
#[command(version = "0.1.0")]
#[command(about = "An example of TURN Client over TCP (RFC 6062)", long_about = None)]
struct Cli {
#[arg(long, default_value_t = format!("127.0.0.1"))]
host: String,
#[arg(long, default_value_t = 3478)]
port: u16,
#[arg(long)]
user: String,
#[arg(long, default_value_t = format!("webrtc.rs"))]
realm: String,

#[arg(short, long)]
debug: bool,
#[arg(long, default_value_t = format!("INFO"))]
log_level: String,
}

fn main() -> Result<()> {
let cli = Cli::parse();
if cli.debug {
let log_level = log::LevelFilter::from_str(&cli.log_level)
.map_err(|e| Error::Other(format!("invalid log level '{}': {e}", cli.log_level)))?;
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{}:{} [{}] {} - {}",
record.file().unwrap_or("unknown"),
record.line().unwrap_or(0),
record.level(),
chrono::Local::now().format("%H:%M:%S.%6f"),
record.args()
)
})
.filter(None, log_level)
.init();
}

let host = cli.host;
let port = cli.port;
let user = cli.user;
let (username, password) = user.split_once('=').ok_or_else(|| {
Error::Other("invalid --user format: expected 'username=password'".to_string())
})?;
let realm = cli.realm;

let turn_server_addr = format!("{host}:{port}");

// Connect a TCP socket to the TURN server.
// Unlike UDP, the TURN client over TCP uses a single persistent connection.
let mut stream = TcpStream::connect(&turn_server_addr)?;
let local_addr = stream.local_addr()?;
let peer_addr = stream.peer_addr()?;

println!("TCP connected: {} → {}", local_addr, peer_addr);

// Use a cloned handle for writes (blocking with timeout) and non-blocking for reads.
let mut stream_write = stream.try_clone()?;
stream_write.set_write_timeout(Some(Duration::from_secs(5)))?;
stream.set_nonblocking(true)?;

let cfg = ClientConfig {
stun_serv_addr: turn_server_addr.clone(),
turn_serv_addr: turn_server_addr,
local_addr,
transport_protocol: TransportProtocol::TCP,
username: username.to_string(),
password: password.to_string(),
realm: realm.to_string(),
software: String::new(),
rto_in_ms: 0,
};

let mut client = Client::new(cfg)?;

// Allocate a relay socket on the TURN server (over TCP).
let allocate_tid = client.allocate()?;
let mut relayed_addr = None;

let (stop_tx, stop_rx) = crossbeam_channel::unbounded::<()>();
println!("Press Ctrl-C to stop");
ctrlc::set_handler(move || {
let _ = stop_tx.send(());
})
.expect("Error setting Ctrl-C handler");

// RFC 4571 decoder for inbound TCP frames.
let mut decoder = TcpFrameDecoder::new();
let mut buf = vec![0u8; 4096];

loop {
match stop_rx.try_recv() {
Ok(_) => break,
Err(err) => {
if err.is_disconnected() {
break;
}
}
};

// Flush outbound TURN messages (each wrapped in a 2-byte length prefix per RFC 4571).
while let Some(transmit) = client.poll_write() {
let framed = frame_packet(&transmit.message);
stream_write.write_all(&framed)?;
trace!(
"tcp.sent {} bytes to {}",
transmit.message.len(),
transmit.transport.peer_addr
);
}

// Process TURN events.
while let Some(event) = client.poll_event() {
match event {
Event::TransactionTimeout(_) => return Err(Error::ErrTimeout),
Event::BindingResponse(_, reflexive_addr) => {
println!("reflexive address {}", reflexive_addr);
}
Event::BindingError(_, err) => return Err(err),
Event::AllocateResponse(tid, addr) => {
println!("relayed address {}", addr);
if relayed_addr.is_none() {
assert_eq!(tid, allocate_tid);
relayed_addr = Some(addr);
println!(
"TURN relay allocated over TCP: {} (refresh will keep it alive)",
addr
);
}
}
Event::AllocateError(_, err) => return Err(err),
Event::CreatePermissionResponse(tid, peer_addr) => {
println!(
"CreatePermission for peer addr {} is granted (tid={:?})",
peer_addr, tid
);
}
Event::CreatePermissionError(_, err) => return Err(err),
Event::DataIndicationOrChannelData(_, from, data) => {
println!("relay read: {:?} from {}", &data[..], from);
// Echo back
if let Some(&relay_addr) = relayed_addr.as_ref() {
client.relay(relay_addr)?.send_to(&data[..], from)?;
}
}
}
}

// Compute next timeout.
let mut eto = Instant::now() + Duration::from_millis(100);
if let Some(to) = client.poll_timeout() {
if to < eto {
eto = to;
}
}
let delay_from_now = eto
.checked_duration_since(Instant::now())
.unwrap_or(Duration::from_secs(0));

// Non-blocking read from TCP socket.
// RFC 4571: each TURN message is prefixed with a 2-byte big-endian length.
match read_tcp_input(&mut stream, &mut buf, &mut decoder) {
Ok(Some(data)) => {
trace!("tcp.recv {} bytes from {}", data.len(), peer_addr);
let msg = TransportMessage {
now: Instant::now(),
transport: TransportContext {
local_addr,
peer_addr,
transport_protocol: TransportProtocol::TCP,
ecn: None,
},
message: BytesMut::from(data.as_slice()),
};
client.handle_read(msg)?;
}
Ok(None) => {
// No complete frame yet — sleep briefly to avoid busy-polling.
if !delay_from_now.is_zero() {
std::thread::sleep(std::cmp::min(delay_from_now, Duration::from_millis(5)));
}
}
Err(e) => {
eprintln!("TCP connection closed: {e}");
break;
}
}

// Drive time forward.
client.handle_timeout(Instant::now())?;
}

client.close()
}

/// Read from a non-blocking TCP stream, decode RFC 4571 frames.
/// Returns `Ok(Some(data))` with the next complete TURN message payload,
/// `Ok(None)` if no complete frame is available yet, or `Err` on EOF / fatal error.
fn read_tcp_input(
stream: &mut TcpStream,
buf: &mut Vec<u8>,
decoder: &mut TcpFrameDecoder,
) -> std::result::Result<Option<Vec<u8>>, std::io::Error> {
// Drain available bytes into the decoder.
loop {
match stream.read(buf.as_mut_slice()) {
Ok(0) => {
return Err(std::io::Error::new(
ErrorKind::ConnectionReset,
"TCP connection closed by peer (EOF)",
));
}
Ok(n) => decoder.extend_from_slice(&buf[..n]),
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
Err(e) => return Err(e),
}
}
Ok(decoder.next_packet())
}
76 changes: 76 additions & 0 deletions rtc-turn/src/client/client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@ use sansio::Protocol;
use std::collections::HashSet;
use std::net::UdpSocket;

fn create_test_client_with_turn_server() -> Result<(UdpSocket, Client)> {
let udp_socket = UdpSocket::bind("127.0.0.1:0")?;

let client = Client::new(ClientConfig {
stun_serv_addr: String::new(),
turn_serv_addr: format!("127.0.0.1:{}", udp_socket.local_addr()?.port()),
local_addr: udp_socket.local_addr()?,
transport_protocol: TransportProtocol::UDP,
username: "user".to_string(),
password: "pass".to_string(),
realm: "test".to_string(),
software: "TEST SOFTWARE".to_owned(),
rto_in_ms: 0,
})?;

Ok((udp_socket, client))
}

fn create_listening_test_client(rto_in_ms: u64) -> Result<(UdpSocket, Client)> {
let udp_socket = UdpSocket::bind("0.0.0.0:0")?;

Expand Down Expand Up @@ -160,3 +178,61 @@ fn test_client_with_stun_send_binding_request_to_timeout() -> Result<()> {

client.close()
}

#[test]
fn test_turn_server_addr_returns_some_when_configured() -> Result<()> {
let (_conn, mut client) = create_test_client_with_turn_server()?;
let addr = client.turn_server_addr();
assert!(
addr.is_some(),
"turn_server_addr should return Some when configured"
);
assert_eq!(addr.unwrap().ip(), std::net::IpAddr::from([127, 0, 0, 1]));
client.close()
}

#[test]
fn test_turn_server_addr_returns_none_when_not_configured() -> Result<()> {
let (_conn, mut client) = create_listening_test_client(0)?;
let addr = client.turn_server_addr();
assert!(
addr.is_none(),
"turn_server_addr should return None when not configured"
);
client.close()
}

#[test]
fn test_turn_server_addr_or_err_returns_ok_when_configured() -> Result<()> {
let (_conn, mut client) = create_test_client_with_turn_server()?;
let result = client.turn_server_addr_or_err();
assert!(
result.is_ok(),
"turn_server_addr_or_err should return Ok when configured"
);
client.close()
}

#[test]
fn test_turn_server_addr_or_err_returns_err_when_not_configured() -> Result<()> {
let (_conn, mut client) = create_listening_test_client(0)?;
let result = client.turn_server_addr_or_err();
assert!(
result.is_err(),
"turn_server_addr_or_err should return Err when not configured"
);
assert_eq!(result.unwrap_err(), Error::ErrNilTurnSocket);
client.close()
}

#[test]
fn test_local_addr_returns_bound_address() -> Result<()> {
let (conn, mut client) = create_test_client_with_turn_server()?;
let expected = conn.local_addr()?;
assert_eq!(
client.local_addr(),
expected,
"local_addr should match the configured address"
);
client.close()
}
18 changes: 14 additions & 4 deletions rtc-turn/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl Client {
debug!("client.Allocate call PerformTransaction 1");
let mut tid = self.perform_transaction(
&msg,
self.turn_server_addr()?,
self.turn_server_addr_or_err()?,
TransactionType::AllocateAttempt,
);
tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1);
Expand Down Expand Up @@ -514,7 +514,7 @@ impl Client {
debug!("client.Allocate call PerformTransaction 2");
self.perform_transaction(
&msg,
self.turn_server_addr()?,
self.turn_server_addr_or_err()?,
TransactionType::AllocateRequest(nonce),
);
}
Expand Down Expand Up @@ -554,11 +554,21 @@ impl Client {
Ok(())
}

/// turn_server_addr return the TURN server address
fn turn_server_addr(&self) -> Result<SocketAddr> {
/// Returns the TURN server address, if configured.
pub fn turn_server_addr(&self) -> Option<SocketAddr> {
self.turn_serv_addr
}

/// Returns the TURN server address or an error if not configured.
fn turn_server_addr_or_err(&self) -> Result<SocketAddr> {
self.turn_serv_addr.ok_or(Error::ErrNilTurnSocket)
}

/// Returns the local address this client is bound to.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

/// username returns username
fn username(&self) -> Username {
self.username.clone()
Expand Down
Loading