diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index 278c2ea4325b0a..3fc91eab181c11 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -29,6 +29,8 @@ import { op_http_set_response_trailers, op_http_try_wait, op_http_upgrade_raw, + op_http_upgrade_raw_connect, + op_http_upgrade_raw_get_head, op_http_upgrade_websocket_next, op_http_wait, } from "ext:core/ops"; @@ -151,6 +153,16 @@ function upgradeHttpRaw(req) { throw new TypeError("'upgradeHttpRaw' may only be used with Deno.serve"); } +function upgradeHttpRawConnect(req) { + const inner = toInnerRequest(req); + if (inner?._wantsUpgrade) { + return inner._wantsUpgrade("upgradeConnect"); + } + throw new TypeError( + "'upgradeHttpRawConnect' may only be used with Deno.serve", + ); +} + function addTrailers(resp, headerList) { const inner = toInnerResponse(resp); op_http_set_response_trailers(inner.external, headerList); @@ -226,6 +238,31 @@ class InnerRequest { return { response: UPGRADE_RESPONSE_SENTINEL, conn }; } + if (upgradeType == "upgradeConnect") { + const external = this.#external; + const remoteAddr = this.remoteAddr; + const localAddr = this.#context.listener.addr; + + this.url(); + this.headerList; + this.close(); + + this.#upgraded = true; + + return (async () => { + const upgradeRid = await op_http_upgrade_raw_connect(external); + const head = op_http_upgrade_raw_get_head(upgradeRid); + + const conn = new UpgradedConn( + upgradeRid, + remoteAddr, + localAddr, + ); + + return { response: UPGRADE_RESPONSE_SENTINEL, conn, head }; + })(); + } + if (upgradeType == "upgradeWebSocket") { const external = this.#external; @@ -1101,6 +1138,7 @@ function serveHttpOn(context, addr, callback) { internals.addTrailers = addTrailers; internals.upgradeHttpRaw = upgradeHttpRaw; +internals.upgradeHttpRawConnect = upgradeHttpRawConnect; internals.serveHttpOnListener = serveHttpOnListener; internals.serveHttpOnConnection = serveHttpOnConnection; @@ -1178,4 +1216,5 @@ export { serveHttpOnConnection, serveHttpOnListener, upgradeHttpRaw, + upgradeHttpRawConnect, }; diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 1ca0df461bc434..4a4aaf5153544c 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -218,6 +218,48 @@ pub fn op_http_upgrade_raw( Ok(state.resource_table.add(UpgradeStream::new(read, write))) } +/// Upgrade a CONNECT request by sending a 200 response, awaiting the +/// upgrade, and returning the stream resource with head bytes captured. +#[op2] +#[smi] +pub async fn op_http_upgrade_raw_connect( + state: Rc>, + external: *const c_void, +) -> Result { + let (http, upgrade) = { + // SAFETY: external is deleted before calling this op. + let http = + unsafe { take_external!(external, "op_http_upgrade_raw_connect") }; + let upgrade = http.upgrade()?; + (http, upgrade) + }; + + // Send a 200 response to complete the CONNECT handshake. + http.response_parts().status = StatusCode::OK; + http.complete(); + + let upgraded = upgrade.await?; + let (stream, head_bytes) = extract_network_stream(upgraded); + let (read_half, write_half) = stream.into_split(); + + let resource = + UpgradeStream::new_connected(read_half, write_half, head_bytes); + Ok(state.borrow_mut().resource_table.add(resource)) +} + +/// Return the head bytes captured during a CONNECT upgrade. +/// The bytes are returned exactly once; subsequent calls return empty. +#[op2] +#[buffer] +pub fn op_http_upgrade_raw_get_head( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> Result, HttpNextError> { + let resource = state.resource_table.get::(rid)?; + let bytes = resource.head_bytes.borrow_mut().take(); + Ok(bytes.map(|b| b.to_vec()).unwrap_or_default()) +} + #[op2] #[smi] pub async fn op_http_upgrade_websocket_next( @@ -1367,6 +1409,10 @@ enum UpgradeStreamWriteState { OnUpgrade, AsyncMut>, ), + /// Used after a CONNECT upgrade where the 200 response was already sent + /// by hyper. Consumes and discards the HTTP response the application + /// writes (since it's redundant), then switches to Network mode. + ConsumeResponse(BytesMut, NetworkStreamWriteHalf), Network(NetworkStreamWriteHalf), /// The upgrade was rejected with a non-101 status code. /// The response has been sent and the stream is now closed for writing. @@ -1378,6 +1424,9 @@ struct UpgradeStream { read: Rc>>, write: AsyncRefCell, cancel_handle: CancelHandle, + /// Head bytes extracted during a CONNECT upgrade, available via + /// `op_http_upgrade_raw_get_head`. + head_bytes: RefCell>, /// Set to true when the upgrade was rejected with a non-101 status. /// When rejected, reads return EOF and writes are silently ignored. rejected: std::cell::Cell, @@ -1392,6 +1441,25 @@ impl UpgradeStream { read, write: AsyncRefCell::new(write), cancel_handle: CancelHandle::new(), + head_bytes: RefCell::new(None), + rejected: std::cell::Cell::new(false), + } + } + + pub fn new_connected( + read_half: NetworkStreamReadHalf, + write_half: NetworkStreamWriteHalf, + head_bytes: Bytes, + ) -> Self { + let read = Rc::new(AsyncRefCell::new(Some((read_half, Bytes::new())))); + Self { + read, + write: AsyncRefCell::new(UpgradeStreamWriteState::ConsumeResponse( + BytesMut::with_capacity(128), + write_half, + )), + cancel_handle: CancelHandle::new(), + head_bytes: RefCell::new(Some(head_bytes)), rejected: std::cell::Cell::new(false), } } @@ -1459,14 +1527,16 @@ impl UpgradeStream { Ok(buf.len()) } Ok(httparse::Status::Complete(n)) => { - let status_code = response.code.unwrap_or(200); + let status_code = response.code.unwrap_or(0); - if status_code == StatusCode::SWITCHING_PROTOCOLS.as_u16() { - // Upgrade accepted - proceed with upgrade - http.otel_info_set_status( - StatusCode::SWITCHING_PROTOCOLS.as_u16(), - ); - http.response_parts().status = StatusCode::SWITCHING_PROTOCOLS; + // Accept 101 (WebSocket upgrade) and 200 (CONNECT tunnel) + if status_code == StatusCode::SWITCHING_PROTOCOLS.as_u16() + || status_code == StatusCode::OK.as_u16() + { + let status = StatusCode::from_u16(status_code) + .unwrap_or(StatusCode::SWITCHING_PROTOCOLS); + http.otel_info_set_status(status.as_u16()); + http.response_parts().status = status; for header in response.headers { http.response_parts().headers.append( @@ -1526,6 +1596,44 @@ impl UpgradeStream { Err(e) => Err(std::io::Error::other(e)), } } + UpgradeStreamWriteState::ConsumeResponse(mut bytes, mut stream) => { + bytes.extend_from_slice(buf); + + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut response = httparse::Response::new(&mut headers); + match response.parse(&bytes) { + Ok(httparse::Status::Partial) => { + *wr = UpgradeStreamWriteState::ConsumeResponse(bytes, stream); + Ok(buf.len()) + } + Ok(httparse::Status::Complete(n)) => { + // Response consumed. Forward any trailing bytes after + // the response headers to the network. + let trailing = &bytes[n..]; + if !trailing.is_empty() { + let mut written = 0; + while written < trailing.len() { + written += + Pin::new(&mut stream).write(&trailing[written..]).await?; + } + } + let consumed_from_buf = n - (bytes.len() - buf.len()); + *wr = UpgradeStreamWriteState::Network(stream); + Ok(consumed_from_buf) + } + Err(_) => { + // Not an HTTP response — treat as raw data. + // Write everything accumulated so far to the network. + let all = bytes.freeze(); + let mut written = 0; + while written < all.len() { + written += Pin::new(&mut stream).write(&all[written..]).await?; + } + *wr = UpgradeStreamWriteState::Network(stream); + Ok(buf.len()) + } + } + } UpgradeStreamWriteState::Network(mut stream) => { let r = Pin::new(&mut stream).write(buf).await; *wr = UpgradeStreamWriteState::Network(stream); @@ -1556,6 +1664,9 @@ impl UpgradeStream { drop(wr); self.write(if buf1.is_empty() { buf2 } else { buf1 }).await } + UpgradeStreamWriteState::ConsumeResponse(..) => { + self.write(if buf1.is_empty() { buf2 } else { buf1 }).await + } UpgradeStreamWriteState::Network(stream) => { let bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)]; stream.write_vectored(&bufs).await diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 045ed93c373a08..8a073918e28dde 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -171,6 +171,8 @@ deno_core::extension!( http_next::op_http_set_response_trailers, http_next::op_http_upgrade_websocket_next, http_next::op_http_upgrade_raw, + http_next::op_http_upgrade_raw_connect, + http_next::op_http_upgrade_raw_get_head, http_next::op_raw_write_vectored, http_next::op_can_write_vectored, http_next::op_http_try_wait, @@ -221,6 +223,8 @@ deno_core::extension!( http_next::op_http_set_response_trailers, http_next::op_http_upgrade_websocket_next, http_next::op_http_upgrade_raw, + http_next::op_http_upgrade_raw_connect, + http_next::op_http_upgrade_raw_get_head, http_next::op_raw_write_vectored, http_next::op_can_write_vectored, http_next::op_http_try_wait, diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 75ea3ae35d9ff6..16951d26be674f 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -70,7 +70,11 @@ import { } from "ext:deno_node/internal/errors.ts"; import { getTimerDuration } from "ext:deno_node/internal/timers.mjs"; import { getIPFamily } from "ext:deno_node/internal/net.ts"; -import { serveHttpOnListener, upgradeHttpRaw } from "ext:deno_http/00_serve.ts"; +import { + serveHttpOnListener, + upgradeHttpRaw, + upgradeHttpRawConnect, +} from "ext:deno_http/00_serve.ts"; import { op_http_serve_address_override } from "ext:core/ops"; import { listen as listenDeno } from "ext:deno_net/01_net.js"; import { headersEntries } from "ext:deno_fetch/20_headers.js"; @@ -2295,9 +2299,33 @@ export class ServerImpl extends EventEmitter { }); const req = new IncomingMessageForServer(socket); + req.method = request.method; + + if (request.method === "CONNECT") { + // For CONNECT, the URL should be in authority form (host:port). + // Deno's server adds an "http://" prefix, so strip it. + req.url = request.url.replace(/^https?:\/\//, ""); + req[kRawHeaders] = request.headers; + + if (this.listenerCount("connect") > 0) { + return (async () => { + const { conn, response, head } = await upgradeHttpRawConnect( + request, + ); + const socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + req.socket = socket; + this.emit("connect", req, socket, Buffer.from(head)); + return response; + })(); + } else { + return new Response(null, { status: 405 }); + } + } + // Slice off the origin so that we only have pathname + search req.url = request.url?.slice(request.url.indexOf("/", 8)); - req.method = request.method; req.upgrade = request.headers.get("connection")?.toLowerCase().includes("upgrade") && request.headers.get("upgrade"); diff --git a/tests/specs/node/http_server_connect_event/__test__.jsonc b/tests/specs/node/http_server_connect_event/__test__.jsonc new file mode 100644 index 00000000000000..f816bad869762e --- /dev/null +++ b/tests/specs/node/http_server_connect_event/__test__.jsonc @@ -0,0 +1,4 @@ +{ + "args": "run -A main.ts", + "output": "main.out" +} diff --git a/tests/specs/node/http_server_connect_event/main.out b/tests/specs/node/http_server_connect_event/main.out new file mode 100644 index 00000000000000..c03940f03a4223 --- /dev/null +++ b/tests/specs/node/http_server_connect_event/main.out @@ -0,0 +1,3 @@ +CONNECT event received: CONNECT 127.0.0.1:[WILDCARD] +Client received status: HTTP/1.1 200 OK +Tunnel data received successfully diff --git a/tests/specs/node/http_server_connect_event/main.ts b/tests/specs/node/http_server_connect_event/main.ts new file mode 100644 index 00000000000000..7e056d1bab9120 --- /dev/null +++ b/tests/specs/node/http_server_connect_event/main.ts @@ -0,0 +1,66 @@ +import * as http from "node:http"; +import * as net from "node:net"; + +// Test that http.Server emits the "connect" event for CONNECT requests. +// This is essential for HTTP proxy servers (e.g., proxy-chain used by Crawlee). + +// Start a simple TCP echo server to act as the "target" +const target = net.createServer((socket) => { + socket.write("hello from target"); + socket.end(); +}); + +target.listen(0, () => { + const targetPort = (target.address() as net.AddressInfo).port; + + const server = http.createServer((_req, res) => { + res.writeHead(200); + res.end("ok"); + }); + + server.on("connect", (req, clientSocket, _head) => { + console.log(`CONNECT event received: ${req.method} ${req.url}`); + + // Connect to the target + const [hostname, port] = req.url!.split(":"); + const targetSocket = net.connect(Number(port), hostname, () => { + clientSocket.write( + "HTTP/1.1 200 Connection Established\r\n\r\n", + ); + targetSocket.pipe(clientSocket); + clientSocket.pipe(targetSocket); + }); + + targetSocket.on("error", (err) => { + clientSocket.end(`HTTP/1.1 502 Bad Gateway\r\n\r\n`); + }); + }); + + server.listen(0, () => { + const proxyPort = (server.address() as net.AddressInfo).port; + + // Send a CONNECT request to the proxy + const client = net.connect(proxyPort, "127.0.0.1", () => { + client.write( + `CONNECT 127.0.0.1:${targetPort} HTTP/1.1\r\nHost: 127.0.0.1:${targetPort}\r\n\r\n`, + ); + }); + + let data = ""; + client.on("data", (chunk) => { + data += chunk.toString(); + }); + + client.on("end", () => { + // Should have received the 200 Connection Established + target data + const lines = data.split("\r\n"); + console.log(`Client received status: ${lines[0]}`); + if (data.includes("hello from target")) { + console.log("Tunnel data received successfully"); + } + client.end(); + server.close(); + target.close(); + }); + }); +});