Skip to content

Commit 8173cd5

Browse files
bartlomiejuclaude
andcommitted
fix: extract head bytes for CONNECT event
Add a dedicated CONNECT upgrade path that sends the 200 response via hyper, awaits the upgrade, and captures any trailing bytes (data pipelined after the CONNECT request headers). These bytes are passed as the `head` parameter to the "connect" event, matching Node.js behavior. The new UpgradeStream `ConsumeResponse` write state absorbs the redundant HTTP response the application writes through the socket (since hyper already sent 200), then switches to raw network mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 967bfa2 commit 8173cd5

File tree

4 files changed

+167
-8
lines changed

4 files changed

+167
-8
lines changed

ext/http/00_serve.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import {
2929
op_http_set_response_trailers,
3030
op_http_try_wait,
3131
op_http_upgrade_raw,
32+
op_http_upgrade_raw_connect,
33+
op_http_upgrade_raw_get_head,
3234
op_http_upgrade_websocket_next,
3335
op_http_wait,
3436
} from "ext:core/ops";
@@ -151,6 +153,16 @@ function upgradeHttpRaw(req) {
151153
throw new TypeError("'upgradeHttpRaw' may only be used with Deno.serve");
152154
}
153155

156+
function upgradeHttpRawConnect(req) {
157+
const inner = toInnerRequest(req);
158+
if (inner?._wantsUpgrade) {
159+
return inner._wantsUpgrade("upgradeConnect");
160+
}
161+
throw new TypeError(
162+
"'upgradeHttpRawConnect' may only be used with Deno.serve",
163+
);
164+
}
165+
154166
function addTrailers(resp, headerList) {
155167
const inner = toInnerResponse(resp);
156168
op_http_set_response_trailers(inner.external, headerList);
@@ -226,6 +238,31 @@ class InnerRequest {
226238
return { response: UPGRADE_RESPONSE_SENTINEL, conn };
227239
}
228240

241+
if (upgradeType == "upgradeConnect") {
242+
const external = this.#external;
243+
const remoteAddr = this.remoteAddr;
244+
const localAddr = this.#context.listener.addr;
245+
246+
this.url();
247+
this.headerList;
248+
this.close();
249+
250+
this.#upgraded = true;
251+
252+
return (async () => {
253+
const upgradeRid = await op_http_upgrade_raw_connect(external);
254+
const head = op_http_upgrade_raw_get_head(upgradeRid);
255+
256+
const conn = new UpgradedConn(
257+
upgradeRid,
258+
remoteAddr,
259+
localAddr,
260+
);
261+
262+
return { response: UPGRADE_RESPONSE_SENTINEL, conn, head };
263+
})();
264+
}
265+
229266
if (upgradeType == "upgradeWebSocket") {
230267
const external = this.#external;
231268

@@ -1101,6 +1138,7 @@ function serveHttpOn(context, addr, callback) {
11011138

11021139
internals.addTrailers = addTrailers;
11031140
internals.upgradeHttpRaw = upgradeHttpRaw;
1141+
internals.upgradeHttpRawConnect = upgradeHttpRawConnect;
11041142
internals.serveHttpOnListener = serveHttpOnListener;
11051143
internals.serveHttpOnConnection = serveHttpOnConnection;
11061144

@@ -1178,4 +1216,5 @@ export {
11781216
serveHttpOnConnection,
11791217
serveHttpOnListener,
11801218
upgradeHttpRaw,
1219+
upgradeHttpRawConnect,
11811220
};

ext/http/http_next.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,48 @@ pub fn op_http_upgrade_raw(
218218
Ok(state.resource_table.add(UpgradeStream::new(read, write)))
219219
}
220220

221+
/// Upgrade a CONNECT request by sending a 200 response, awaiting the
222+
/// upgrade, and returning the stream resource with head bytes captured.
223+
#[op2]
224+
#[smi]
225+
pub async fn op_http_upgrade_raw_connect(
226+
state: Rc<RefCell<OpState>>,
227+
external: *const c_void,
228+
) -> Result<ResourceId, HttpNextError> {
229+
let (http, upgrade) = {
230+
// SAFETY: external is deleted before calling this op.
231+
let http =
232+
unsafe { take_external!(external, "op_http_upgrade_raw_connect") };
233+
let upgrade = http.upgrade()?;
234+
(http, upgrade)
235+
};
236+
237+
// Send a 200 response to complete the CONNECT handshake.
238+
http.response_parts().status = StatusCode::OK;
239+
http.complete();
240+
241+
let upgraded = upgrade.await?;
242+
let (stream, head_bytes) = extract_network_stream(upgraded);
243+
let (read_half, write_half) = stream.into_split();
244+
245+
let resource =
246+
UpgradeStream::new_connected(read_half, write_half, head_bytes);
247+
Ok(state.borrow_mut().resource_table.add(resource))
248+
}
249+
250+
/// Return the head bytes captured during a CONNECT upgrade.
251+
/// The bytes are returned exactly once; subsequent calls return empty.
252+
#[op2]
253+
#[buffer]
254+
pub fn op_http_upgrade_raw_get_head(
255+
state: &mut OpState,
256+
#[smi] rid: ResourceId,
257+
) -> Result<Vec<u8>, HttpNextError> {
258+
let resource = state.resource_table.get::<UpgradeStream>(rid)?;
259+
let bytes = resource.head_bytes.borrow_mut().take();
260+
Ok(bytes.map(|b| b.to_vec()).unwrap_or_default())
261+
}
262+
221263
#[op2]
222264
#[smi]
223265
pub async fn op_http_upgrade_websocket_next(
@@ -1367,6 +1409,10 @@ enum UpgradeStreamWriteState {
13671409
OnUpgrade,
13681410
AsyncMut<Option<(NetworkStreamReadHalf, Bytes)>>,
13691411
),
1412+
/// Used after a CONNECT upgrade where the 200 response was already sent
1413+
/// by hyper. Consumes and discards the HTTP response the application
1414+
/// writes (since it's redundant), then switches to Network mode.
1415+
ConsumeResponse(BytesMut, NetworkStreamWriteHalf),
13701416
Network(NetworkStreamWriteHalf),
13711417
Failed,
13721418
}
@@ -1375,6 +1421,9 @@ struct UpgradeStream {
13751421
read: Rc<AsyncRefCell<Option<(NetworkStreamReadHalf, Bytes)>>>,
13761422
write: AsyncRefCell<UpgradeStreamWriteState>,
13771423
cancel_handle: CancelHandle,
1424+
/// Head bytes extracted during a CONNECT upgrade, available via
1425+
/// `op_http_upgrade_raw_get_head`.
1426+
head_bytes: RefCell<Option<Bytes>>,
13781427
}
13791428

13801429
impl UpgradeStream {
@@ -1386,6 +1435,24 @@ impl UpgradeStream {
13861435
read,
13871436
write: AsyncRefCell::new(write),
13881437
cancel_handle: CancelHandle::new(),
1438+
head_bytes: RefCell::new(None),
1439+
}
1440+
}
1441+
1442+
pub fn new_connected(
1443+
read_half: NetworkStreamReadHalf,
1444+
write_half: NetworkStreamWriteHalf,
1445+
head_bytes: Bytes,
1446+
) -> Self {
1447+
let read = Rc::new(AsyncRefCell::new(Some((read_half, Bytes::new()))));
1448+
Self {
1449+
read,
1450+
write: AsyncRefCell::new(UpgradeStreamWriteState::ConsumeResponse(
1451+
BytesMut::with_capacity(128),
1452+
write_half,
1453+
)),
1454+
cancel_handle: CancelHandle::new(),
1455+
head_bytes: RefCell::new(Some(head_bytes)),
13891456
}
13901457
}
13911458

@@ -1478,6 +1545,44 @@ impl UpgradeStream {
14781545
Err(e) => Err(std::io::Error::other(e)),
14791546
}
14801547
}
1548+
UpgradeStreamWriteState::ConsumeResponse(mut bytes, mut stream) => {
1549+
bytes.extend_from_slice(buf);
1550+
1551+
let mut headers = [httparse::EMPTY_HEADER; 16];
1552+
let mut response = httparse::Response::new(&mut headers);
1553+
match response.parse(&bytes) {
1554+
Ok(httparse::Status::Partial) => {
1555+
*wr = UpgradeStreamWriteState::ConsumeResponse(bytes, stream);
1556+
Ok(buf.len())
1557+
}
1558+
Ok(httparse::Status::Complete(n)) => {
1559+
// Response consumed. Forward any trailing bytes after
1560+
// the response headers to the network.
1561+
let trailing = &bytes[n..];
1562+
if !trailing.is_empty() {
1563+
let mut written = 0;
1564+
while written < trailing.len() {
1565+
written +=
1566+
Pin::new(&mut stream).write(&trailing[written..]).await?;
1567+
}
1568+
}
1569+
let consumed_from_buf = n - (bytes.len() - buf.len());
1570+
*wr = UpgradeStreamWriteState::Network(stream);
1571+
Ok(consumed_from_buf)
1572+
}
1573+
Err(_) => {
1574+
// Not an HTTP response — treat as raw data.
1575+
// Write everything accumulated so far to the network.
1576+
let all = bytes.freeze();
1577+
let mut written = 0;
1578+
while written < all.len() {
1579+
written += Pin::new(&mut stream).write(&all[written..]).await?;
1580+
}
1581+
*wr = UpgradeStreamWriteState::Network(stream);
1582+
Ok(buf.len())
1583+
}
1584+
}
1585+
}
14811586
UpgradeStreamWriteState::Network(mut stream) => {
14821587
let r = Pin::new(&mut stream).write(buf).await;
14831588
*wr = UpgradeStreamWriteState::Network(stream);
@@ -1503,6 +1608,9 @@ impl UpgradeStream {
15031608
UpgradeStreamWriteState::Parsing(..) => {
15041609
self.write(if buf1.is_empty() { buf2 } else { buf1 }).await
15051610
}
1611+
UpgradeStreamWriteState::ConsumeResponse(..) => {
1612+
self.write(if buf1.is_empty() { buf2 } else { buf1 }).await
1613+
}
15061614
UpgradeStreamWriteState::Network(stream) => {
15071615
let bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)];
15081616
stream.write_vectored(&bufs).await

ext/http/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ deno_core::extension!(
171171
http_next::op_http_set_response_trailers,
172172
http_next::op_http_upgrade_websocket_next,
173173
http_next::op_http_upgrade_raw,
174+
http_next::op_http_upgrade_raw_connect,
175+
http_next::op_http_upgrade_raw_get_head,
174176
http_next::op_raw_write_vectored,
175177
http_next::op_can_write_vectored,
176178
http_next::op_http_try_wait,
@@ -221,6 +223,8 @@ deno_core::extension!(
221223
http_next::op_http_set_response_trailers,
222224
http_next::op_http_upgrade_websocket_next,
223225
http_next::op_http_upgrade_raw,
226+
http_next::op_http_upgrade_raw_connect,
227+
http_next::op_http_upgrade_raw_get_head,
224228
http_next::op_raw_write_vectored,
225229
http_next::op_can_write_vectored,
226230
http_next::op_http_try_wait,

ext/node/polyfills/http.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ import {
7070
} from "ext:deno_node/internal/errors.ts";
7171
import { getTimerDuration } from "ext:deno_node/internal/timers.mjs";
7272
import { getIPFamily } from "ext:deno_node/internal/net.ts";
73-
import { serveHttpOnListener, upgradeHttpRaw } from "ext:deno_http/00_serve.ts";
73+
import {
74+
serveHttpOnListener,
75+
upgradeHttpRaw,
76+
upgradeHttpRawConnect,
77+
} from "ext:deno_http/00_serve.ts";
7478
import { listen as listenDeno } from "ext:deno_net/01_net.js";
7579
import { headersEntries } from "ext:deno_fetch/20_headers.js";
7680
import { Response } from "ext:deno_fetch/23_response.js";
@@ -2283,13 +2287,17 @@ export class ServerImpl extends EventEmitter {
22832287
req[kRawHeaders] = request.headers;
22842288

22852289
if (this.listenerCount("connect") > 0) {
2286-
const { conn, response } = upgradeHttpRaw(request);
2287-
const socket = new Socket({
2288-
handle: new TCP(constants.SERVER, conn),
2289-
});
2290-
req.socket = socket;
2291-
this.emit("connect", req, socket, Buffer.from([]));
2292-
return response;
2290+
return (async () => {
2291+
const { conn, response, head } = await upgradeHttpRawConnect(
2292+
request,
2293+
);
2294+
const socket = new Socket({
2295+
handle: new TCP(constants.SERVER, conn),
2296+
});
2297+
req.socket = socket;
2298+
this.emit("connect", req, socket, Buffer.from(head));
2299+
return response;
2300+
})();
22932301
} else {
22942302
return new Response(null, { status: 405 });
22952303
}

0 commit comments

Comments
 (0)