Skip to content

Commit 1ea5ff0

Browse files
bartlomiejuclaude
andauthored
fix(ext/node): emit "connect" event on http.Server for CONNECT requests (denoland#32599)
## Summary - Deno's `node:http` Server polyfill never emitted the `"connect"` event for HTTP CONNECT method requests - This broke HTTP proxy libraries (e.g. `proxy-chain`, used by Crawlee/Playwright) that rely on this event to handle tunnel requests - Chrome launched with `--proxy-server` sends CONNECT requests to establish HTTPS tunnels — without the event, these were silently dropped, causing `net::ERR_TIMED_OUT` ## Changes - Handle CONNECT method in `ServerImpl._serve()` by upgrading to a raw socket and emitting `"connect"` with `(req, socket, head)`, matching Node.js behavior - Strip `http://` prefix from CONNECT request URLs to return authority form (`host:port`) as Node.js does - Relax `UpgradeStream` status code check in `http_next.rs` to accept both `101` (WebSocket upgrade) and `200` (CONNECT tunnel) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4650330 commit 1ea5ff0

File tree

7 files changed

+264
-9
lines changed

7 files changed

+264
-9
lines changed

ext/http/00_serve.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import {
2929
op_http_set_response_trailers,
3030
op_http_try_wait,
3131
op_http_upgrade_raw,
32+
op_http_upgrade_raw_connect,
33+
op_http_upgrade_raw_get_head,
3234
op_http_upgrade_websocket_next,
3335
op_http_wait,
3436
} from "ext:core/ops";
@@ -151,6 +153,16 @@ function upgradeHttpRaw(req) {
151153
throw new TypeError("'upgradeHttpRaw' may only be used with Deno.serve");
152154
}
153155

156+
function upgradeHttpRawConnect(req) {
157+
const inner = toInnerRequest(req);
158+
if (inner?._wantsUpgrade) {
159+
return inner._wantsUpgrade("upgradeConnect");
160+
}
161+
throw new TypeError(
162+
"'upgradeHttpRawConnect' may only be used with Deno.serve",
163+
);
164+
}
165+
154166
function addTrailers(resp, headerList) {
155167
const inner = toInnerResponse(resp);
156168
op_http_set_response_trailers(inner.external, headerList);
@@ -226,6 +238,31 @@ class InnerRequest {
226238
return { response: UPGRADE_RESPONSE_SENTINEL, conn };
227239
}
228240

241+
if (upgradeType == "upgradeConnect") {
242+
const external = this.#external;
243+
const remoteAddr = this.remoteAddr;
244+
const localAddr = this.#context.listener.addr;
245+
246+
this.url();
247+
this.headerList;
248+
this.close();
249+
250+
this.#upgraded = true;
251+
252+
return (async () => {
253+
const upgradeRid = await op_http_upgrade_raw_connect(external);
254+
const head = op_http_upgrade_raw_get_head(upgradeRid);
255+
256+
const conn = new UpgradedConn(
257+
upgradeRid,
258+
remoteAddr,
259+
localAddr,
260+
);
261+
262+
return { response: UPGRADE_RESPONSE_SENTINEL, conn, head };
263+
})();
264+
}
265+
229266
if (upgradeType == "upgradeWebSocket") {
230267
const external = this.#external;
231268

@@ -1101,6 +1138,7 @@ function serveHttpOn(context, addr, callback) {
11011138

11021139
internals.addTrailers = addTrailers;
11031140
internals.upgradeHttpRaw = upgradeHttpRaw;
1141+
internals.upgradeHttpRawConnect = upgradeHttpRawConnect;
11041142
internals.serveHttpOnListener = serveHttpOnListener;
11051143
internals.serveHttpOnConnection = serveHttpOnConnection;
11061144

@@ -1178,4 +1216,5 @@ export {
11781216
serveHttpOnConnection,
11791217
serveHttpOnListener,
11801218
upgradeHttpRaw,
1219+
upgradeHttpRawConnect,
11811220
};

ext/http/http_next.rs

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,48 @@ pub fn op_http_upgrade_raw(
218218
Ok(state.resource_table.add(UpgradeStream::new(read, write)))
219219
}
220220

221+
/// Upgrade a CONNECT request by sending a 200 response, awaiting the
222+
/// upgrade, and returning the stream resource with head bytes captured.
223+
#[op2]
224+
#[smi]
225+
pub async fn op_http_upgrade_raw_connect(
226+
state: Rc<RefCell<OpState>>,
227+
external: *const c_void,
228+
) -> Result<ResourceId, HttpNextError> {
229+
let (http, upgrade) = {
230+
// SAFETY: external is deleted before calling this op.
231+
let http =
232+
unsafe { take_external!(external, "op_http_upgrade_raw_connect") };
233+
let upgrade = http.upgrade()?;
234+
(http, upgrade)
235+
};
236+
237+
// Send a 200 response to complete the CONNECT handshake.
238+
http.response_parts().status = StatusCode::OK;
239+
http.complete();
240+
241+
let upgraded = upgrade.await?;
242+
let (stream, head_bytes) = extract_network_stream(upgraded);
243+
let (read_half, write_half) = stream.into_split();
244+
245+
let resource =
246+
UpgradeStream::new_connected(read_half, write_half, head_bytes);
247+
Ok(state.borrow_mut().resource_table.add(resource))
248+
}
249+
250+
/// Return the head bytes captured during a CONNECT upgrade.
251+
/// The bytes are returned exactly once; subsequent calls return empty.
252+
#[op2]
253+
#[buffer]
254+
pub fn op_http_upgrade_raw_get_head(
255+
state: &mut OpState,
256+
#[smi] rid: ResourceId,
257+
) -> Result<Vec<u8>, HttpNextError> {
258+
let resource = state.resource_table.get::<UpgradeStream>(rid)?;
259+
let bytes = resource.head_bytes.borrow_mut().take();
260+
Ok(bytes.map(|b| b.to_vec()).unwrap_or_default())
261+
}
262+
221263
#[op2]
222264
#[smi]
223265
pub async fn op_http_upgrade_websocket_next(
@@ -1367,6 +1409,10 @@ enum UpgradeStreamWriteState {
13671409
OnUpgrade,
13681410
AsyncMut<Option<(NetworkStreamReadHalf, Bytes)>>,
13691411
),
1412+
/// Used after a CONNECT upgrade where the 200 response was already sent
1413+
/// by hyper. Consumes and discards the HTTP response the application
1414+
/// writes (since it's redundant), then switches to Network mode.
1415+
ConsumeResponse(BytesMut, NetworkStreamWriteHalf),
13701416
Network(NetworkStreamWriteHalf),
13711417
/// The upgrade was rejected with a non-101 status code.
13721418
/// The response has been sent and the stream is now closed for writing.
@@ -1378,6 +1424,9 @@ struct UpgradeStream {
13781424
read: Rc<AsyncRefCell<Option<(NetworkStreamReadHalf, Bytes)>>>,
13791425
write: AsyncRefCell<UpgradeStreamWriteState>,
13801426
cancel_handle: CancelHandle,
1427+
/// Head bytes extracted during a CONNECT upgrade, available via
1428+
/// `op_http_upgrade_raw_get_head`.
1429+
head_bytes: RefCell<Option<Bytes>>,
13811430
/// Set to true when the upgrade was rejected with a non-101 status.
13821431
/// When rejected, reads return EOF and writes are silently ignored.
13831432
rejected: std::cell::Cell<bool>,
@@ -1392,6 +1441,25 @@ impl UpgradeStream {
13921441
read,
13931442
write: AsyncRefCell::new(write),
13941443
cancel_handle: CancelHandle::new(),
1444+
head_bytes: RefCell::new(None),
1445+
rejected: std::cell::Cell::new(false),
1446+
}
1447+
}
1448+
1449+
pub fn new_connected(
1450+
read_half: NetworkStreamReadHalf,
1451+
write_half: NetworkStreamWriteHalf,
1452+
head_bytes: Bytes,
1453+
) -> Self {
1454+
let read = Rc::new(AsyncRefCell::new(Some((read_half, Bytes::new()))));
1455+
Self {
1456+
read,
1457+
write: AsyncRefCell::new(UpgradeStreamWriteState::ConsumeResponse(
1458+
BytesMut::with_capacity(128),
1459+
write_half,
1460+
)),
1461+
cancel_handle: CancelHandle::new(),
1462+
head_bytes: RefCell::new(Some(head_bytes)),
13951463
rejected: std::cell::Cell::new(false),
13961464
}
13971465
}
@@ -1459,14 +1527,16 @@ impl UpgradeStream {
14591527
Ok(buf.len())
14601528
}
14611529
Ok(httparse::Status::Complete(n)) => {
1462-
let status_code = response.code.unwrap_or(200);
1530+
let status_code = response.code.unwrap_or(0);
14631531

1464-
if status_code == StatusCode::SWITCHING_PROTOCOLS.as_u16() {
1465-
// Upgrade accepted - proceed with upgrade
1466-
http.otel_info_set_status(
1467-
StatusCode::SWITCHING_PROTOCOLS.as_u16(),
1468-
);
1469-
http.response_parts().status = StatusCode::SWITCHING_PROTOCOLS;
1532+
// Accept 101 (WebSocket upgrade) and 200 (CONNECT tunnel)
1533+
if status_code == StatusCode::SWITCHING_PROTOCOLS.as_u16()
1534+
|| status_code == StatusCode::OK.as_u16()
1535+
{
1536+
let status = StatusCode::from_u16(status_code)
1537+
.unwrap_or(StatusCode::SWITCHING_PROTOCOLS);
1538+
http.otel_info_set_status(status.as_u16());
1539+
http.response_parts().status = status;
14701540

14711541
for header in response.headers {
14721542
http.response_parts().headers.append(
@@ -1526,6 +1596,44 @@ impl UpgradeStream {
15261596
Err(e) => Err(std::io::Error::other(e)),
15271597
}
15281598
}
1599+
UpgradeStreamWriteState::ConsumeResponse(mut bytes, mut stream) => {
1600+
bytes.extend_from_slice(buf);
1601+
1602+
let mut headers = [httparse::EMPTY_HEADER; 16];
1603+
let mut response = httparse::Response::new(&mut headers);
1604+
match response.parse(&bytes) {
1605+
Ok(httparse::Status::Partial) => {
1606+
*wr = UpgradeStreamWriteState::ConsumeResponse(bytes, stream);
1607+
Ok(buf.len())
1608+
}
1609+
Ok(httparse::Status::Complete(n)) => {
1610+
// Response consumed. Forward any trailing bytes after
1611+
// the response headers to the network.
1612+
let trailing = &bytes[n..];
1613+
if !trailing.is_empty() {
1614+
let mut written = 0;
1615+
while written < trailing.len() {
1616+
written +=
1617+
Pin::new(&mut stream).write(&trailing[written..]).await?;
1618+
}
1619+
}
1620+
let consumed_from_buf = n - (bytes.len() - buf.len());
1621+
*wr = UpgradeStreamWriteState::Network(stream);
1622+
Ok(consumed_from_buf)
1623+
}
1624+
Err(_) => {
1625+
// Not an HTTP response — treat as raw data.
1626+
// Write everything accumulated so far to the network.
1627+
let all = bytes.freeze();
1628+
let mut written = 0;
1629+
while written < all.len() {
1630+
written += Pin::new(&mut stream).write(&all[written..]).await?;
1631+
}
1632+
*wr = UpgradeStreamWriteState::Network(stream);
1633+
Ok(buf.len())
1634+
}
1635+
}
1636+
}
15291637
UpgradeStreamWriteState::Network(mut stream) => {
15301638
let r = Pin::new(&mut stream).write(buf).await;
15311639
*wr = UpgradeStreamWriteState::Network(stream);
@@ -1556,6 +1664,9 @@ impl UpgradeStream {
15561664
drop(wr);
15571665
self.write(if buf1.is_empty() { buf2 } else { buf1 }).await
15581666
}
1667+
UpgradeStreamWriteState::ConsumeResponse(..) => {
1668+
self.write(if buf1.is_empty() { buf2 } else { buf1 }).await
1669+
}
15591670
UpgradeStreamWriteState::Network(stream) => {
15601671
let bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)];
15611672
stream.write_vectored(&bufs).await

ext/http/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ deno_core::extension!(
171171
http_next::op_http_set_response_trailers,
172172
http_next::op_http_upgrade_websocket_next,
173173
http_next::op_http_upgrade_raw,
174+
http_next::op_http_upgrade_raw_connect,
175+
http_next::op_http_upgrade_raw_get_head,
174176
http_next::op_raw_write_vectored,
175177
http_next::op_can_write_vectored,
176178
http_next::op_http_try_wait,
@@ -221,6 +223,8 @@ deno_core::extension!(
221223
http_next::op_http_set_response_trailers,
222224
http_next::op_http_upgrade_websocket_next,
223225
http_next::op_http_upgrade_raw,
226+
http_next::op_http_upgrade_raw_connect,
227+
http_next::op_http_upgrade_raw_get_head,
224228
http_next::op_raw_write_vectored,
225229
http_next::op_can_write_vectored,
226230
http_next::op_http_try_wait,

ext/node/polyfills/http.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ import {
7070
} from "ext:deno_node/internal/errors.ts";
7171
import { getTimerDuration } from "ext:deno_node/internal/timers.mjs";
7272
import { getIPFamily } from "ext:deno_node/internal/net.ts";
73-
import { serveHttpOnListener, upgradeHttpRaw } from "ext:deno_http/00_serve.ts";
73+
import {
74+
serveHttpOnListener,
75+
upgradeHttpRaw,
76+
upgradeHttpRawConnect,
77+
} from "ext:deno_http/00_serve.ts";
7478
import { op_http_serve_address_override } from "ext:core/ops";
7579
import { listen as listenDeno } from "ext:deno_net/01_net.js";
7680
import { headersEntries } from "ext:deno_fetch/20_headers.js";
@@ -2295,9 +2299,33 @@ export class ServerImpl extends EventEmitter {
22952299
});
22962300

22972301
const req = new IncomingMessageForServer(socket);
2302+
req.method = request.method;
2303+
2304+
if (request.method === "CONNECT") {
2305+
// For CONNECT, the URL should be in authority form (host:port).
2306+
// Deno's server adds an "http://" prefix, so strip it.
2307+
req.url = request.url.replace(/^https?:\/\//, "");
2308+
req[kRawHeaders] = request.headers;
2309+
2310+
if (this.listenerCount("connect") > 0) {
2311+
return (async () => {
2312+
const { conn, response, head } = await upgradeHttpRawConnect(
2313+
request,
2314+
);
2315+
const socket = new Socket({
2316+
handle: new TCP(constants.SERVER, conn),
2317+
});
2318+
req.socket = socket;
2319+
this.emit("connect", req, socket, Buffer.from(head));
2320+
return response;
2321+
})();
2322+
} else {
2323+
return new Response(null, { status: 405 });
2324+
}
2325+
}
2326+
22982327
// Slice off the origin so that we only have pathname + search
22992328
req.url = request.url?.slice(request.url.indexOf("/", 8));
2300-
req.method = request.method;
23012329
req.upgrade =
23022330
request.headers.get("connection")?.toLowerCase().includes("upgrade") &&
23032331
request.headers.get("upgrade");
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"args": "run -A main.ts",
3+
"output": "main.out"
4+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
CONNECT event received: CONNECT 127.0.0.1:[WILDCARD]
2+
Client received status: HTTP/1.1 200 OK
3+
Tunnel data received successfully
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import * as http from "node:http";
2+
import * as net from "node:net";
3+
4+
// Test that http.Server emits the "connect" event for CONNECT requests.
5+
// This is essential for HTTP proxy servers (e.g., proxy-chain used by Crawlee).
6+
7+
// Start a simple TCP echo server to act as the "target"
8+
const target = net.createServer((socket) => {
9+
socket.write("hello from target");
10+
socket.end();
11+
});
12+
13+
target.listen(0, () => {
14+
const targetPort = (target.address() as net.AddressInfo).port;
15+
16+
const server = http.createServer((_req, res) => {
17+
res.writeHead(200);
18+
res.end("ok");
19+
});
20+
21+
server.on("connect", (req, clientSocket, _head) => {
22+
console.log(`CONNECT event received: ${req.method} ${req.url}`);
23+
24+
// Connect to the target
25+
const [hostname, port] = req.url!.split(":");
26+
const targetSocket = net.connect(Number(port), hostname, () => {
27+
clientSocket.write(
28+
"HTTP/1.1 200 Connection Established\r\n\r\n",
29+
);
30+
targetSocket.pipe(clientSocket);
31+
clientSocket.pipe(targetSocket);
32+
});
33+
34+
targetSocket.on("error", (err) => {
35+
clientSocket.end(`HTTP/1.1 502 Bad Gateway\r\n\r\n`);
36+
});
37+
});
38+
39+
server.listen(0, () => {
40+
const proxyPort = (server.address() as net.AddressInfo).port;
41+
42+
// Send a CONNECT request to the proxy
43+
const client = net.connect(proxyPort, "127.0.0.1", () => {
44+
client.write(
45+
`CONNECT 127.0.0.1:${targetPort} HTTP/1.1\r\nHost: 127.0.0.1:${targetPort}\r\n\r\n`,
46+
);
47+
});
48+
49+
let data = "";
50+
client.on("data", (chunk) => {
51+
data += chunk.toString();
52+
});
53+
54+
client.on("end", () => {
55+
// Should have received the 200 Connection Established + target data
56+
const lines = data.split("\r\n");
57+
console.log(`Client received status: ${lines[0]}`);
58+
if (data.includes("hello from target")) {
59+
console.log("Tunnel data received successfully");
60+
}
61+
client.end();
62+
server.close();
63+
target.close();
64+
});
65+
});
66+
});

0 commit comments

Comments
 (0)