Skip to content
This repository was archived by the owner on May 28, 2026. It is now read-only.

Commit 29dd8c8

Browse files
committed
feat: Enable QUIC to TCP data forwarding and enhance DNS response processing to handle fragmented payloads.
1 parent 1be2ce3 commit 29dd8c8

3 files changed

Lines changed: 95 additions & 12 deletions

File tree

crates/slipstream-client/src/runtime/mod.rs

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use crate::pacing::{cwnd_target_polls, inflight_packet_estimate};
1515
use crate::streams::{spawn_acceptor, Command};
1616
use slipstream_core::ResolverMode;
1717
use slipstream_dns::{
18-
build_qname, encode_query, fragment_packet, max_payload_len_for_domain, QueryParams, CLASS_IN,
19-
RR_TXT,
18+
build_qname, decode_response, encode_query, fragment_packet, is_fragmented,
19+
max_payload_len_for_domain, FragmentBuffer, QueryParams, CLASS_IN, RR_TXT,
2020
};
2121
use slipstream_quic::{Client, ClientConnection, Config as QuicConfig};
2222
use std::collections::HashMap;
@@ -25,7 +25,7 @@ use std::time::Duration;
2525
use tokio::net::{TcpListener as TokioTcpListener, UdpSocket};
2626
use tokio::sync::{mpsc, Notify};
2727
use tokio::time::sleep;
28-
use tracing::{debug, info, warn};
28+
use tracing::{debug, info, trace, warn};
2929

3030
// Protocol defaults matching picoquic runtime
3131
const DNS_WAKE_DELAY_MAX_US: u64 = 10_000_000;
@@ -125,6 +125,7 @@ pub async fn run_client(config: &TquicClientConfig<'_>) -> Result<i32, ClientErr
125125

126126
let mut dns_id = 1u16;
127127
let mut packet_id = 0u16; // For fragment tracking
128+
let mut recv_fragment_buffer = FragmentBuffer::new(); // For reassembling fragmented responses
128129
let mut recv_buf = vec![0u8; 4096];
129130
let _send_buf = vec![0u8; MAX_PACKET_SIZE];
130131
let packet_loop_send_max = loop_burst_total(&resolvers, PACKET_LOOP_SEND_MAX);
@@ -229,18 +230,48 @@ pub async fn run_client(config: &TquicClientConfig<'_>) -> Result<i32, ClientErr
229230
recv = udp.recv_from(&mut recv_buf) => {
230231
match recv {
231232
Ok((size, from)) => {
232-
// TODO: Decode DNS response and extract QUIC payload
233-
// For now, try processing raw packet
234-
if let Err(e) = conn.recv(&recv_buf[..size], from) {
235-
debug!("Failed to process packet from {}: {}", from, e);
233+
// Decode DNS response to extract QUIC payload
234+
if let Some(quic_payload) = decode_response(&recv_buf[..size]) {
235+
// Handle fragmented responses
236+
let complete_packet = if is_fragmented(&quic_payload) {
237+
recv_fragment_buffer.receive_fragment(&quic_payload)
238+
} else {
239+
Some(quic_payload)
240+
};
241+
242+
if let Some(data) = complete_packet {
243+
if let Err(e) = conn.recv(&data, from) {
244+
debug!("Failed to process QUIC packet from {}: {}", from, e);
245+
}
246+
}
247+
} else {
248+
// Not a valid DNS response - try as raw QUIC packet
249+
// (fallback for empty responses or direct UDP)
250+
if let Err(e) = conn.recv(&recv_buf[..size], from) {
251+
trace!("Failed to process raw packet from {}: {}", from, e);
252+
}
236253
}
237254

238255
// Try to receive more packets in burst
239256
for _ in 1..packet_loop_recv_max {
240257
match udp.try_recv_from(&mut recv_buf) {
241258
Ok((size, from)) => {
242-
if let Err(e) = conn.recv(&recv_buf[..size], from) {
243-
debug!("Failed to process packet: {}", e);
259+
// Decode DNS response
260+
if let Some(quic_payload) = decode_response(&recv_buf[..size]) {
261+
let complete_packet = if is_fragmented(&quic_payload) {
262+
recv_fragment_buffer.receive_fragment(&quic_payload)
263+
} else {
264+
Some(quic_payload)
265+
};
266+
267+
if let Some(data) = complete_packet {
268+
if let Err(e) = conn.recv(&data, from) {
269+
debug!("Failed to process QUIC packet: {}", e);
270+
}
271+
}
272+
} else {
273+
// Fallback to raw packet
274+
let _ = conn.recv(&recv_buf[..size], from);
244275
}
245276
}
246277
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
@@ -262,6 +293,27 @@ pub async fn run_client(config: &TquicClientConfig<'_>) -> Result<i32, ClientErr
262293
}
263294
}
264295

296+
// Read from QUIC streams and forward to TCP connections
297+
for stream_id in conn.readable_streams() {
298+
let mut read_buf = vec![0u8; 4096];
299+
match conn.stream_read(stream_id, &mut read_buf) {
300+
Ok((n, fin)) if n > 0 => {
301+
if let Some(state) = streams.get(&stream_id) {
302+
// Send data to TCP writer via channel
303+
let _ = state.write_tx.send(read_buf[..n].to_vec());
304+
}
305+
if fin {
306+
streams.remove(&stream_id);
307+
}
308+
}
309+
Ok((_, true)) => {
310+
// Stream finished
311+
streams.remove(&stream_id);
312+
}
313+
_ => {}
314+
}
315+
}
316+
265317
// Drain pending commands
266318
while let Ok(command) = command_rx.try_recv() {
267319
handle_command(
@@ -357,7 +409,7 @@ fn handle_command(
357409
let _ = tcp_stream.set_nodelay(true);
358410
match conn.open_bi() {
359411
Ok(stream_id) => {
360-
let (write_tx, _write_rx) = mpsc::unbounded_channel();
412+
let (write_tx, write_rx) = mpsc::unbounded_channel();
361413
streams.insert(
362414
stream_id,
363415
StreamState {
@@ -373,13 +425,18 @@ fn handle_command(
373425
info!("Accepted TCP stream {}", stream_id);
374426
}
375427

376-
// Split TCP stream and spawn reader to forward TCP→QUIC
377-
let (tcp_read, _tcp_write) = tcp_stream.into_split();
428+
// Split TCP stream and spawn reader/writer for bidirectional forwarding
429+
let (tcp_read, tcp_write) = tcp_stream.into_split();
430+
431+
// TCP→QUIC: Read TCP data and send to QUIC stream
378432
crate::streams::spawn_tcp_to_quic_reader(
379433
stream_id,
380434
tcp_read,
381435
command_tx.clone(),
382436
);
437+
438+
// QUIC→TCP: Write data from QUIC stream to TCP
439+
crate::streams::spawn_quic_to_tcp_writer(tcp_write, write_rx);
383440
}
384441
Err(e) => {
385442
warn!("Failed to open QUIC stream: {}", e);

crates/slipstream-client/src/streams.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,21 @@ pub(crate) fn spawn_tcp_to_quic_reader(
6767
});
6868
}
6969

70+
/// Spawn a task that writes data from QUIC to TCP.
71+
pub(crate) fn spawn_quic_to_tcp_writer(
72+
mut tcp_write: tokio::net::tcp::OwnedWriteHalf,
73+
mut data_rx: mpsc::UnboundedReceiver<Vec<u8>>,
74+
) {
75+
tokio::spawn(async move {
76+
while let Some(data) = data_rx.recv().await {
77+
if tcp_write.write_all(&data).await.is_err() {
78+
break;
79+
}
80+
}
81+
let _ = tcp_write.shutdown().await;
82+
});
83+
}
84+
7085
pub(crate) fn spawn_client_reader(
7186
stream_id: u64,
7287
mut read_half: tokio::net::tcp::OwnedReadHalf,

crates/slipstream-quic/src/client.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,17 @@ impl ClientConnection {
285285
}
286286
}
287287

288+
/// Get stream IDs that have readable data.
289+
pub fn readable_streams(&self) -> Vec<u64> {
290+
self.state
291+
.borrow()
292+
.streams
293+
.iter()
294+
.filter(|(_, s)| s.readable)
295+
.map(|(id, _)| *id)
296+
.collect()
297+
}
298+
288299
/// Drain path events.
289300
pub fn drain_path_events(&mut self) -> Vec<PathEvent> {
290301
std::mem::take(&mut self.state.borrow_mut().path_events)

0 commit comments

Comments
 (0)