From 6a5e2e6471cab185f2802b760bb4ef7d7d677151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 9 Mar 2026 20:33:56 +0100 Subject: [PATCH 1/3] fix(ext/node): initialize debuglog testEnabled with safe default Fixes a crash where `testEnabled is not a function` is thrown during bootstrap when internal stream code triggers debug logging before `initializeDebugEnv()` has been called. This occurs when stdin is unavailable, such as in compiled binaries run as Windows services (via nssm) or processes detached with nohup/disown. Initialize `testEnabled` and `debugImpls` with safe defaults (`() => false` and empty object) so early calls to `debuglog()` gracefully disable debug logging instead of crashing. Closes #24208 Co-Authored-By: Claude Opus 4.6 --- ext/node/polyfills/internal/util/debuglog.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ext/node/polyfills/internal/util/debuglog.ts b/ext/node/polyfills/internal/util/debuglog.ts index a20bf3e5aa2e94..b4c2a418ad789b 100644 --- a/ext/node/polyfills/internal/util/debuglog.ts +++ b/ext/node/polyfills/internal/util/debuglog.ts @@ -4,10 +4,15 @@ // TODO(petamoriken): enable prefer-primordials for node polyfills // deno-lint-ignore-file prefer-primordials -// `debugImpls` and `testEnabled` are deliberately not initialized so any call -// to `debuglog()` before `initializeDebugEnv()` is called will throw. -let debugImpls: Record void>; -let testEnabled: (str: string) => boolean; +// `debugImpls` and `testEnabled` are initialized with safe defaults so that +// calls to `debuglog()` before `initializeDebugEnv()` do not crash. This can +// happen when internal stream code triggers debug logging during bootstrap +// before the Node process is fully initialized (e.g. when stdin is unavailable +// in compiled binaries run as Windows services or detached processes). +let debugImpls: Record void> = Object.create( + null, +); +let testEnabled: (str: string) => boolean = () => false; // `debugEnv` is initial value of process.env.NODE_DEBUG export function initializeDebugEnv(debugEnv: string) { From 967bfa202342ba94a5a1d634598939baeafd40f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 9 Mar 2026 21:25:57 +0100 Subject: [PATCH 2/3] fix(ext/node): emit "connect" event on http.Server for CONNECT requests Deno's node:http Server polyfill never emitted the "connect" event for HTTP CONNECT method requests, causing HTTP proxy libraries (e.g. proxy-chain, used by Crawlee) to silently drop tunnel requests. This made Playwright/Crawlee's browser unable to navigate (net::ERR_TIMED_OUT) since Chrome's CONNECT requests to the local proxy were never handled. - Handle CONNECT method in ServerImpl._serve() by upgrading to a raw socket and emitting "connect" with (req, socket, head), matching Node.js behavior - Strip "http://" prefix from CONNECT request URLs to return authority form (host:port) as Node.js does - Relax UpgradeStream status code check to accept both 101 (WebSocket) and 200 (CONNECT tunnel) Co-Authored-By: Claude Opus 4.6 --- ext/http/http_next.rs | 12 ++-- ext/node/polyfills/http.ts | 22 ++++++- .../http_server_connect_event/__test__.jsonc | 4 ++ .../node/http_server_connect_event/main.out | 3 + .../node/http_server_connect_event/main.ts | 66 +++++++++++++++++++ 5 files changed, 102 insertions(+), 5 deletions(-) create mode 100644 tests/specs/node/http_server_connect_event/__test__.jsonc create mode 100644 tests/specs/node/http_server_connect_event/main.out create mode 100644 tests/specs/node/http_server_connect_event/main.ts diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index a3d040956e5583..1f6bb0850af8e5 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -1440,16 +1440,20 @@ impl UpgradeStream { Ok(buf.len()) } Ok(httparse::Status::Complete(n)) => { - if response.code != Some(StatusCode::SWITCHING_PROTOCOLS.as_u16()) + let status_code = response.code.unwrap_or(0); + // Accept 101 (WebSocket upgrade) and 200 (CONNECT tunnel) + if status_code != StatusCode::SWITCHING_PROTOCOLS.as_u16() + && status_code != StatusCode::OK.as_u16() { return Err(std::io::Error::other( HttpNextError::InvalidHttpStatusLine, )); } - http - .otel_info_set_status(StatusCode::SWITCHING_PROTOCOLS.as_u16()); - http.response_parts().status = StatusCode::SWITCHING_PROTOCOLS; + let status = StatusCode::from_u16(status_code) + .unwrap_or(StatusCode::SWITCHING_PROTOCOLS); + http.otel_info_set_status(status.as_u16()); + http.response_parts().status = status; for header in response.headers { http.response_parts().headers.append( diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 6b65fbd791594e..57b6aa9b9113d9 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -2274,9 +2274,29 @@ export class ServerImpl extends EventEmitter { }); const req = new IncomingMessageForServer(socket); + req.method = request.method; + + if (request.method === "CONNECT") { + // For CONNECT, the URL should be in authority form (host:port). + // Deno's server adds an "http://" prefix, so strip it. + req.url = request.url.replace(/^https?:\/\//, ""); + req[kRawHeaders] = request.headers; + + if (this.listenerCount("connect") > 0) { + const { conn, response } = upgradeHttpRaw(request); + const socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + req.socket = socket; + this.emit("connect", req, socket, Buffer.from([])); + return response; + } else { + return new Response(null, { status: 405 }); + } + } + // Slice off the origin so that we only have pathname + search req.url = request.url?.slice(request.url.indexOf("/", 8)); - req.method = request.method; req.upgrade = request.headers.get("connection")?.toLowerCase().includes("upgrade") && request.headers.get("upgrade"); diff --git a/tests/specs/node/http_server_connect_event/__test__.jsonc b/tests/specs/node/http_server_connect_event/__test__.jsonc new file mode 100644 index 00000000000000..f816bad869762e --- /dev/null +++ b/tests/specs/node/http_server_connect_event/__test__.jsonc @@ -0,0 +1,4 @@ +{ + "args": "run -A main.ts", + "output": "main.out" +} diff --git a/tests/specs/node/http_server_connect_event/main.out b/tests/specs/node/http_server_connect_event/main.out new file mode 100644 index 00000000000000..c03940f03a4223 --- /dev/null +++ b/tests/specs/node/http_server_connect_event/main.out @@ -0,0 +1,3 @@ +CONNECT event received: CONNECT 127.0.0.1:[WILDCARD] +Client received status: HTTP/1.1 200 OK +Tunnel data received successfully diff --git a/tests/specs/node/http_server_connect_event/main.ts b/tests/specs/node/http_server_connect_event/main.ts new file mode 100644 index 00000000000000..7e056d1bab9120 --- /dev/null +++ b/tests/specs/node/http_server_connect_event/main.ts @@ -0,0 +1,66 @@ +import * as http from "node:http"; +import * as net from "node:net"; + +// Test that http.Server emits the "connect" event for CONNECT requests. +// This is essential for HTTP proxy servers (e.g., proxy-chain used by Crawlee). + +// Start a simple TCP echo server to act as the "target" +const target = net.createServer((socket) => { + socket.write("hello from target"); + socket.end(); +}); + +target.listen(0, () => { + const targetPort = (target.address() as net.AddressInfo).port; + + const server = http.createServer((_req, res) => { + res.writeHead(200); + res.end("ok"); + }); + + server.on("connect", (req, clientSocket, _head) => { + console.log(`CONNECT event received: ${req.method} ${req.url}`); + + // Connect to the target + const [hostname, port] = req.url!.split(":"); + const targetSocket = net.connect(Number(port), hostname, () => { + clientSocket.write( + "HTTP/1.1 200 Connection Established\r\n\r\n", + ); + targetSocket.pipe(clientSocket); + clientSocket.pipe(targetSocket); + }); + + targetSocket.on("error", (err) => { + clientSocket.end(`HTTP/1.1 502 Bad Gateway\r\n\r\n`); + }); + }); + + server.listen(0, () => { + const proxyPort = (server.address() as net.AddressInfo).port; + + // Send a CONNECT request to the proxy + const client = net.connect(proxyPort, "127.0.0.1", () => { + client.write( + `CONNECT 127.0.0.1:${targetPort} HTTP/1.1\r\nHost: 127.0.0.1:${targetPort}\r\n\r\n`, + ); + }); + + let data = ""; + client.on("data", (chunk) => { + data += chunk.toString(); + }); + + client.on("end", () => { + // Should have received the 200 Connection Established + target data + const lines = data.split("\r\n"); + console.log(`Client received status: ${lines[0]}`); + if (data.includes("hello from target")) { + console.log("Tunnel data received successfully"); + } + client.end(); + server.close(); + target.close(); + }); + }); +}); From 8173cd587f2a0763a8161daee1361e4dbe6af2ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 12 Mar 2026 18:00:35 +0100 Subject: [PATCH 3/3] fix: extract head bytes for CONNECT event Add a dedicated CONNECT upgrade path that sends the 200 response via hyper, awaits the upgrade, and captures any trailing bytes (data pipelined after the CONNECT request headers). These bytes are passed as the `head` parameter to the "connect" event, matching Node.js behavior. The new UpgradeStream `ConsumeResponse` write state absorbs the redundant HTTP response the application writes through the socket (since hyper already sent 200), then switches to raw network mode. Co-Authored-By: Claude Opus 4.6 --- ext/http/00_serve.ts | 39 ++++++++++++++ ext/http/http_next.rs | 108 +++++++++++++++++++++++++++++++++++++ ext/http/lib.rs | 4 ++ ext/node/polyfills/http.ts | 24 ++++++--- 4 files changed, 167 insertions(+), 8 deletions(-) diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index 278c2ea4325b0a..3fc91eab181c11 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -29,6 +29,8 @@ import { op_http_set_response_trailers, op_http_try_wait, op_http_upgrade_raw, + op_http_upgrade_raw_connect, + op_http_upgrade_raw_get_head, op_http_upgrade_websocket_next, op_http_wait, } from "ext:core/ops"; @@ -151,6 +153,16 @@ function upgradeHttpRaw(req) { throw new TypeError("'upgradeHttpRaw' may only be used with Deno.serve"); } +function upgradeHttpRawConnect(req) { + const inner = toInnerRequest(req); + if (inner?._wantsUpgrade) { + return inner._wantsUpgrade("upgradeConnect"); + } + throw new TypeError( + "'upgradeHttpRawConnect' may only be used with Deno.serve", + ); +} + function addTrailers(resp, headerList) { const inner = toInnerResponse(resp); op_http_set_response_trailers(inner.external, headerList); @@ -226,6 +238,31 @@ class InnerRequest { return { response: UPGRADE_RESPONSE_SENTINEL, conn }; } + if (upgradeType == "upgradeConnect") { + const external = this.#external; + const remoteAddr = this.remoteAddr; + const localAddr = this.#context.listener.addr; + + this.url(); + this.headerList; + this.close(); + + this.#upgraded = true; + + return (async () => { + const upgradeRid = await op_http_upgrade_raw_connect(external); + const head = op_http_upgrade_raw_get_head(upgradeRid); + + const conn = new UpgradedConn( + upgradeRid, + remoteAddr, + localAddr, + ); + + return { response: UPGRADE_RESPONSE_SENTINEL, conn, head }; + })(); + } + if (upgradeType == "upgradeWebSocket") { const external = this.#external; @@ -1101,6 +1138,7 @@ function serveHttpOn(context, addr, callback) { internals.addTrailers = addTrailers; internals.upgradeHttpRaw = upgradeHttpRaw; +internals.upgradeHttpRawConnect = upgradeHttpRawConnect; internals.serveHttpOnListener = serveHttpOnListener; internals.serveHttpOnConnection = serveHttpOnConnection; @@ -1178,4 +1216,5 @@ export { serveHttpOnConnection, serveHttpOnListener, upgradeHttpRaw, + upgradeHttpRawConnect, }; diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 1f6bb0850af8e5..02140d32d465a8 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -218,6 +218,48 @@ pub fn op_http_upgrade_raw( Ok(state.resource_table.add(UpgradeStream::new(read, write))) } +/// Upgrade a CONNECT request by sending a 200 response, awaiting the +/// upgrade, and returning the stream resource with head bytes captured. +#[op2] +#[smi] +pub async fn op_http_upgrade_raw_connect( + state: Rc>, + external: *const c_void, +) -> Result { + let (http, upgrade) = { + // SAFETY: external is deleted before calling this op. + let http = + unsafe { take_external!(external, "op_http_upgrade_raw_connect") }; + let upgrade = http.upgrade()?; + (http, upgrade) + }; + + // Send a 200 response to complete the CONNECT handshake. + http.response_parts().status = StatusCode::OK; + http.complete(); + + let upgraded = upgrade.await?; + let (stream, head_bytes) = extract_network_stream(upgraded); + let (read_half, write_half) = stream.into_split(); + + let resource = + UpgradeStream::new_connected(read_half, write_half, head_bytes); + Ok(state.borrow_mut().resource_table.add(resource)) +} + +/// Return the head bytes captured during a CONNECT upgrade. +/// The bytes are returned exactly once; subsequent calls return empty. +#[op2] +#[buffer] +pub fn op_http_upgrade_raw_get_head( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> Result, HttpNextError> { + let resource = state.resource_table.get::(rid)?; + let bytes = resource.head_bytes.borrow_mut().take(); + Ok(bytes.map(|b| b.to_vec()).unwrap_or_default()) +} + #[op2] #[smi] pub async fn op_http_upgrade_websocket_next( @@ -1367,6 +1409,10 @@ enum UpgradeStreamWriteState { OnUpgrade, AsyncMut>, ), + /// Used after a CONNECT upgrade where the 200 response was already sent + /// by hyper. Consumes and discards the HTTP response the application + /// writes (since it's redundant), then switches to Network mode. + ConsumeResponse(BytesMut, NetworkStreamWriteHalf), Network(NetworkStreamWriteHalf), Failed, } @@ -1375,6 +1421,9 @@ struct UpgradeStream { read: Rc>>, write: AsyncRefCell, cancel_handle: CancelHandle, + /// Head bytes extracted during a CONNECT upgrade, available via + /// `op_http_upgrade_raw_get_head`. + head_bytes: RefCell>, } impl UpgradeStream { @@ -1386,6 +1435,24 @@ impl UpgradeStream { read, write: AsyncRefCell::new(write), cancel_handle: CancelHandle::new(), + head_bytes: RefCell::new(None), + } + } + + pub fn new_connected( + read_half: NetworkStreamReadHalf, + write_half: NetworkStreamWriteHalf, + head_bytes: Bytes, + ) -> Self { + let read = Rc::new(AsyncRefCell::new(Some((read_half, Bytes::new())))); + Self { + read, + write: AsyncRefCell::new(UpgradeStreamWriteState::ConsumeResponse( + BytesMut::with_capacity(128), + write_half, + )), + cancel_handle: CancelHandle::new(), + head_bytes: RefCell::new(Some(head_bytes)), } } @@ -1478,6 +1545,44 @@ impl UpgradeStream { Err(e) => Err(std::io::Error::other(e)), } } + UpgradeStreamWriteState::ConsumeResponse(mut bytes, mut stream) => { + bytes.extend_from_slice(buf); + + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut response = httparse::Response::new(&mut headers); + match response.parse(&bytes) { + Ok(httparse::Status::Partial) => { + *wr = UpgradeStreamWriteState::ConsumeResponse(bytes, stream); + Ok(buf.len()) + } + Ok(httparse::Status::Complete(n)) => { + // Response consumed. Forward any trailing bytes after + // the response headers to the network. + let trailing = &bytes[n..]; + if !trailing.is_empty() { + let mut written = 0; + while written < trailing.len() { + written += + Pin::new(&mut stream).write(&trailing[written..]).await?; + } + } + let consumed_from_buf = n - (bytes.len() - buf.len()); + *wr = UpgradeStreamWriteState::Network(stream); + Ok(consumed_from_buf) + } + Err(_) => { + // Not an HTTP response — treat as raw data. + // Write everything accumulated so far to the network. + let all = bytes.freeze(); + let mut written = 0; + while written < all.len() { + written += Pin::new(&mut stream).write(&all[written..]).await?; + } + *wr = UpgradeStreamWriteState::Network(stream); + Ok(buf.len()) + } + } + } UpgradeStreamWriteState::Network(mut stream) => { let r = Pin::new(&mut stream).write(buf).await; *wr = UpgradeStreamWriteState::Network(stream); @@ -1503,6 +1608,9 @@ impl UpgradeStream { UpgradeStreamWriteState::Parsing(..) => { self.write(if buf1.is_empty() { buf2 } else { buf1 }).await } + UpgradeStreamWriteState::ConsumeResponse(..) => { + self.write(if buf1.is_empty() { buf2 } else { buf1 }).await + } UpgradeStreamWriteState::Network(stream) => { let bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)]; stream.write_vectored(&bufs).await diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 045ed93c373a08..8a073918e28dde 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -171,6 +171,8 @@ deno_core::extension!( http_next::op_http_set_response_trailers, http_next::op_http_upgrade_websocket_next, http_next::op_http_upgrade_raw, + http_next::op_http_upgrade_raw_connect, + http_next::op_http_upgrade_raw_get_head, http_next::op_raw_write_vectored, http_next::op_can_write_vectored, http_next::op_http_try_wait, @@ -221,6 +223,8 @@ deno_core::extension!( http_next::op_http_set_response_trailers, http_next::op_http_upgrade_websocket_next, http_next::op_http_upgrade_raw, + http_next::op_http_upgrade_raw_connect, + http_next::op_http_upgrade_raw_get_head, http_next::op_raw_write_vectored, http_next::op_can_write_vectored, http_next::op_http_try_wait, diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 57b6aa9b9113d9..631780dbd649fc 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -70,7 +70,11 @@ import { } from "ext:deno_node/internal/errors.ts"; import { getTimerDuration } from "ext:deno_node/internal/timers.mjs"; import { getIPFamily } from "ext:deno_node/internal/net.ts"; -import { serveHttpOnListener, upgradeHttpRaw } from "ext:deno_http/00_serve.ts"; +import { + serveHttpOnListener, + upgradeHttpRaw, + upgradeHttpRawConnect, +} from "ext:deno_http/00_serve.ts"; import { listen as listenDeno } from "ext:deno_net/01_net.js"; import { headersEntries } from "ext:deno_fetch/20_headers.js"; import { Response } from "ext:deno_fetch/23_response.js"; @@ -2283,13 +2287,17 @@ export class ServerImpl extends EventEmitter { req[kRawHeaders] = request.headers; if (this.listenerCount("connect") > 0) { - const { conn, response } = upgradeHttpRaw(request); - const socket = new Socket({ - handle: new TCP(constants.SERVER, conn), - }); - req.socket = socket; - this.emit("connect", req, socket, Buffer.from([])); - return response; + return (async () => { + const { conn, response, head } = await upgradeHttpRawConnect( + request, + ); + const socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + req.socket = socket; + this.emit("connect", req, socket, Buffer.from(head)); + return response; + })(); } else { return new Response(null, { status: 405 }); }