Skip to content

Commit 5f13991

Browse files
fix(ext/node): support http over unix sockets (#29182)
Closes #20255 --------- Co-authored-by: Divy Srivastava <[email protected]>
1 parent 59ffc19 commit 5f13991

File tree

3 files changed

+106
-36
lines changed

3 files changed

+106
-36
lines changed

ext/node/ops/http.rs

+72-35
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ pub enum ConnError {
139139
#[class(generic)]
140140
#[error(transparent)]
141141
ReuniteTcp(#[from] tokio::net::tcp::ReuniteError),
142+
#[cfg(unix)]
143+
#[class(generic)]
144+
#[error(transparent)]
145+
ReuniteUnix(#[from] tokio::net::unix::ReuniteError),
142146
#[class(inherit)]
143147
#[error(transparent)]
144148
Canceled(#[from] deno_core::Canceled),
@@ -149,6 +153,9 @@ pub enum ConnError {
149153

150154
#[op2(async, stack_trace)]
151155
#[serde]
156+
// This is triggering a known false positive for explicit drop(state) calls.
157+
// See https://rust-lang.github.io/rust-clippy/master/index.html#await_holding_refcell_ref
158+
#[allow(clippy::await_holding_refcell_ref)]
152159
pub async fn op_node_http_request_with_conn<P>(
153160
state: Rc<RefCell<OpState>>,
154161
#[serde] method: ByteString,
@@ -162,43 +169,73 @@ pub async fn op_node_http_request_with_conn<P>(
162169
where
163170
P: crate::NodePermissions + 'static,
164171
{
165-
let (_handle, mut sender) = if encrypted {
166-
let resource_rc = state
167-
.borrow_mut()
168-
.resource_table
169-
.take::<TlsStreamResource>(conn_rid)
170-
.map_err(ConnError::Resource)?;
171-
let resource =
172-
Rc::try_unwrap(resource_rc).map_err(|_e| ConnError::TlsStreamBusy)?;
173-
let (read_half, write_half) = resource.into_inner();
174-
let tcp_stream = read_half.unsplit(write_half);
175-
let io = TokioIo::new(tcp_stream);
176-
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
177-
(
178-
tokio::task::spawn(async move { conn.with_upgrades().await }),
179-
sender,
180-
)
181-
} else {
182-
let resource_rc = state
183-
.borrow_mut()
172+
let (_handle, mut sender) = {
173+
let mut state = state.borrow_mut();
174+
if encrypted {
175+
let resource_rc = state
176+
.resource_table
177+
.take::<TlsStreamResource>(conn_rid)
178+
.map_err(ConnError::Resource)?;
179+
let resource =
180+
Rc::try_unwrap(resource_rc).map_err(|_e| ConnError::TlsStreamBusy)?;
181+
let (read_half, write_half) = resource.into_inner();
182+
let tcp_stream = read_half.unsplit(write_half);
183+
let io = TokioIo::new(tcp_stream);
184+
drop(state);
185+
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
186+
(
187+
tokio::task::spawn(async move { conn.with_upgrades().await }),
188+
sender,
189+
)
190+
} else if let Ok(resource_rc) = state
184191
.resource_table
185192
.take::<TcpStreamResource>(conn_rid)
186-
.map_err(ConnError::Resource)?;
187-
let resource =
188-
Rc::try_unwrap(resource_rc).map_err(|_| ConnError::TcpStreamBusy)?;
189-
let (read_half, write_half) = resource.into_inner();
190-
let tcp_stream = read_half.reunite(write_half)?;
191-
let io = TokioIo::new(tcp_stream);
192-
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
193-
194-
// Spawn a task to poll the connection, driving the HTTP state
195-
(
196-
tokio::task::spawn(async move {
197-
conn.with_upgrades().await?;
198-
Ok::<_, _>(())
199-
}),
200-
sender,
201-
)
193+
.map_err(ConnError::Resource)
194+
{
195+
let resource =
196+
Rc::try_unwrap(resource_rc).map_err(|_| ConnError::TcpStreamBusy)?;
197+
let (read_half, write_half) = resource.into_inner();
198+
let tcp_stream = read_half.reunite(write_half)?;
199+
let io = TokioIo::new(tcp_stream);
200+
drop(state);
201+
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
202+
203+
// Spawn a task to poll the connection, driving the HTTP state
204+
(
205+
tokio::task::spawn(async move {
206+
conn.with_upgrades().await?;
207+
Ok::<_, _>(())
208+
}),
209+
sender,
210+
)
211+
} else {
212+
#[cfg(unix)]
213+
{
214+
let resource_rc = state
215+
.resource_table
216+
.take::<deno_net::io::UnixStreamResource>(conn_rid)
217+
.map_err(ConnError::Resource)?;
218+
let resource =
219+
Rc::try_unwrap(resource_rc).map_err(|_| ConnError::TcpStreamBusy)?;
220+
let (read_half, write_half) = resource.into_inner();
221+
let tcp_stream = read_half.reunite(write_half)?;
222+
let io = TokioIo::new(tcp_stream);
223+
drop(state);
224+
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
225+
226+
// Spawn a task to poll the connection, driving the HTTP state
227+
(
228+
tokio::task::spawn(async move {
229+
conn.with_upgrades().await?;
230+
Ok::<_, _>(())
231+
}),
232+
sender,
233+
)
234+
}
235+
236+
#[cfg(not(unix))]
237+
return Err(ConnError::Resource(ResourceError::BadResourceId));
238+
}
202239
};
203240

204241
// Create the request.

ext/node/polyfills/http.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,12 @@ class ClientRequest extends OutgoingMessage {
515515
span.setAttribute("url.query", parsedUrl.search.slice(1));
516516
}
517517

518-
let baseConnRid = handle[kStreamBaseField][internalRidSymbol];
518+
let baseConnRid;
519+
try {
520+
baseConnRid = handle[kStreamBaseField][internalRidSymbol];
521+
} catch (err) {
522+
throw (this.socket.errored || err);
523+
}
519524
if (this._encrypted) {
520525
const hasCaCerts = this.agent?.options?.ca !== undefined;
521526
const caCerts = hasCaCerts

tests/unit_node/http_test.ts

+28
Original file line numberDiff line numberDiff line change
@@ -2080,3 +2080,31 @@ Deno.test("[node/http] rawHeaders are in flattened format", async () => {
20802080
await promise;
20812081
await new Promise((resolve) => server.close(resolve));
20822082
});
2083+
2084+
Deno.test("[node/http] client http over unix socket works", {
2085+
ignore: Deno.build.os == "windows",
2086+
}, async () => {
2087+
const { promise, resolve } = Promise.withResolvers<void>();
2088+
const socketPath = Deno.makeTempDirSync() + "/server.sock";
2089+
const server = Deno.serve({
2090+
transport: "unix",
2091+
path: socketPath,
2092+
onListen,
2093+
}, (_req) => new Response("ok"));
2094+
2095+
function onListen() {
2096+
const options = {
2097+
socketPath,
2098+
path: "/",
2099+
method: "GET",
2100+
};
2101+
http.request(options, async (res) => {
2102+
assertEquals(res.statusCode, 200);
2103+
assertEquals(await text(res), "ok");
2104+
resolve();
2105+
server.shutdown();
2106+
}).end();
2107+
}
2108+
await promise;
2109+
await server.finished;
2110+
});

0 commit comments

Comments
 (0)