Skip to content

Commit fce52fa

Browse files
authored
fix(ext/node): support HTTP over Windows named pipes in node:http (#32414)
Add Windows Named Pipe support to Deno's Node.js HTTP compatibility layer, enabling libraries like `dockerode`/`docker-modem` that use `http.request({socketPath: "//./pipe/docker_engine"})` to work on Windows. Add `WindowsPipe` variant to `NetworkStream` with proper handling across HTTP server, request properties, and Node HTTP client ops. Implement EBUSY (ERROR_PIPE_BUSY / os error 231) retry logic in `Pipe.connect()` for Windows, emulating libuv's `WaitNamedPipeW` behavior with polling retries.
1 parent 148e8a0 commit fce52fa

File tree

10 files changed

+351
-71
lines changed

10 files changed

+351
-71
lines changed

ext/http/http_next.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,10 @@ where
10071007
NetworkStream::Tunnel(conn) => {
10081008
serve_http(conn, connection_properties, lifetime, tx, options)
10091009
}
1010+
#[cfg(windows)]
1011+
NetworkStream::WindowsPipe(conn) => {
1012+
serve_http(conn, connection_properties, lifetime, tx, options)
1013+
}
10101014
}
10111015
}
10121016

ext/http/request_properties.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
172172
))]
173173
NetworkStreamAddress::Vsock(vsock) => Some(vsock.port()),
174174
NetworkStreamAddress::Tunnel(ref addr) => Some(addr.port() as _),
175+
#[cfg(windows)]
176+
NetworkStreamAddress::WindowsPipe(_) => None,
175177
};
176178
let peer_address = match peer_address {
177179
NetworkStreamAddress::Ip(addr) => Rc::from(addr.ip().to_string()),
@@ -186,6 +188,8 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
186188
Rc::from(format!("vsock:{}", addr.cid()))
187189
}
188190
NetworkStreamAddress::Tunnel(ref addr) => Rc::from(addr.hostname()),
191+
#[cfg(windows)]
192+
NetworkStreamAddress::WindowsPipe(_) => Rc::from("pipe"),
189193
};
190194
let local_port = listen_properties.local_port;
191195
let stream_type = listen_properties.stream_type;
@@ -231,6 +235,8 @@ fn listener_properties(
231235
))]
232236
NetworkStreamAddress::Vsock(vsock) => Some(vsock.port()),
233237
NetworkStreamAddress::Tunnel(addr) => Some(addr.port() as _),
238+
#[cfg(windows)]
239+
NetworkStreamAddress::WindowsPipe(_) => None,
234240
};
235241
Ok(HttpListenProperties {
236242
scheme,
@@ -290,6 +296,8 @@ fn req_host_from_addr(
290296
format!("{}:{}", addr.hostname(), addr.port())
291297
}
292298
}
299+
#[cfg(windows)]
300+
NetworkStreamAddress::WindowsPipe(_) => "localhost".to_owned(),
293301
}
294302
}
295303

@@ -305,6 +313,8 @@ fn req_scheme_from_stream_type(stream_type: NetworkStreamType) -> &'static str {
305313
target_os = "macos"
306314
))]
307315
NetworkStreamType::Vsock => "http+vsock://",
316+
#[cfg(windows)]
317+
NetworkStreamType::WindowsPipe => "http://",
308318
}
309319
}
310320

@@ -335,6 +345,8 @@ fn req_host<'a>(
335345
target_os = "macos"
336346
))]
337347
NetworkStreamType::Vsock => {}
348+
#[cfg(windows)]
349+
NetworkStreamType::WindowsPipe => {}
338350
}
339351
return Some(Cow::Borrowed(auth.as_str()));
340352
}

ext/net/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub mod resolve_addr;
1313
pub mod tcp;
1414
pub mod tunnel;
1515
#[cfg(windows)]
16-
mod win_pipe;
16+
pub mod win_pipe;
1717

1818
use std::sync::Arc;
1919

ext/net/raw.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,17 @@ network_stream!(
468468
crate::tunnel::TunnelStreamResource,
469469
crate::tunnel::OwnedReadHalf,
470470
crate::tunnel::OwnedWriteHalf,
471+
],
472+
#[cfg(windows)]
473+
[
474+
WindowsPipe,
475+
windowsPipe,
476+
crate::win_pipe::WindowsPipeStream,
477+
crate::win_pipe::WindowsPipeListener,
478+
crate::win_pipe::WindowsPipeAddr,
479+
crate::win_pipe::NamedPipe,
480+
tokio::io::ReadHalf<crate::win_pipe::WindowsPipeStream>,
481+
tokio::io::WriteHalf<crate::win_pipe::WindowsPipeStream>,
471482
]
472483
);
473484

@@ -478,6 +489,8 @@ pub enum NetworkStreamAddress {
478489
#[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))]
479490
Vsock(tokio_vsock::VsockAddr),
480491
Tunnel(crate::tunnel::TunnelAddr),
492+
#[cfg(windows)]
493+
WindowsPipe(crate::win_pipe::WindowsPipeAddr),
481494
}
482495

483496
impl From<std::net::SocketAddr> for NetworkStreamAddress {
@@ -506,6 +519,13 @@ impl From<crate::tunnel::TunnelAddr> for NetworkStreamAddress {
506519
}
507520
}
508521

522+
#[cfg(windows)]
523+
impl From<crate::win_pipe::WindowsPipeAddr> for NetworkStreamAddress {
524+
fn from(value: crate::win_pipe::WindowsPipeAddr) -> Self {
525+
NetworkStreamAddress::WindowsPipe(value)
526+
}
527+
}
528+
509529
#[derive(Debug, thiserror::Error, deno_error::JsError)]
510530
pub enum TakeNetworkStreamError {
511531
#[class("Busy")]
@@ -525,6 +545,10 @@ pub enum TakeNetworkStreamError {
525545
#[class("Busy")]
526546
#[error("Tunnel socket is currently in use")]
527547
TunnelBusy,
548+
#[cfg(windows)]
549+
#[class("Busy")]
550+
#[error("Windows named pipe is currently in use")]
551+
WindowsPipeBusy,
528552
#[class(generic)]
529553
#[error(transparent)]
530554
ReuniteTcp(#[from] tokio::net::tcp::ReuniteError),
@@ -606,6 +630,20 @@ pub fn take_network_stream_resource(
606630
return Ok(NetworkStream::Tunnel(stream));
607631
}
608632

633+
#[cfg(windows)]
634+
if let Ok(resource_rc) =
635+
resource_table.take::<crate::win_pipe::NamedPipe>(stream_rid)
636+
{
637+
let resource = Rc::try_unwrap(resource_rc)
638+
.map_err(|_| TakeNetworkStreamError::WindowsPipeBusy)?;
639+
let client = resource.into_client().map_err(|_| {
640+
TakeNetworkStreamError::Resource(ResourceError::BadResourceId)
641+
})?;
642+
return Ok(NetworkStream::WindowsPipe(
643+
crate::win_pipe::WindowsPipeStream::new(client),
644+
));
645+
}
646+
609647
Err(TakeNetworkStreamError::Resource(
610648
ResourceError::BadResourceId,
611649
))

ext/net/win_pipe.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,22 @@
33
use std::borrow::Cow;
44
use std::ffi::OsStr;
55
use std::io;
6+
use std::pin::Pin;
67
use std::rc::Rc;
8+
use std::task::Context;
9+
use std::task::Poll;
710

811
use deno_core::AsyncRefCell;
912
use deno_core::AsyncResult;
1013
use deno_core::CancelHandle;
1114
use deno_core::CancelTryFuture;
1215
use deno_core::RcRef;
1316
use deno_core::Resource;
17+
use tokio::io::AsyncRead;
1418
use tokio::io::AsyncReadExt;
19+
use tokio::io::AsyncWrite;
1520
use tokio::io::AsyncWriteExt;
21+
use tokio::io::ReadBuf;
1622
use tokio::io::ReadHalf;
1723
use tokio::io::WriteHalf;
1824
use tokio::net::windows::named_pipe;
@@ -120,6 +126,28 @@ impl NamedPipe {
120126
)),
121127
}
122128
}
129+
130+
/// Cancel all pending read/write operations on this pipe.
131+
/// This triggers the `CancelHandle`, causing any in-flight async ops
132+
/// (e.g., reads started by `readStart()`) to complete with a cancellation
133+
/// error, releasing their `Rc` references to this resource.
134+
pub fn cancel_pending_ops(&self) {
135+
self.cancel.cancel();
136+
}
137+
138+
/// Consume this `NamedPipe` and reunite the split read/write halves back
139+
/// into a `NamedPipeClient`. Only works for client pipes.
140+
pub fn into_client(self) -> io::Result<named_pipe::NamedPipeClient> {
141+
let read_half = self.read_half.into_inner();
142+
let write_half = self.write_half.into_inner();
143+
match (read_half, write_half) {
144+
(NamedPipeRead::Client(r), NamedPipeWrite::Client(w)) => Ok(r.unsplit(w)),
145+
_ => Err(io::Error::new(
146+
io::ErrorKind::InvalidInput,
147+
"cannot extract client from non-client pipe",
148+
)),
149+
}
150+
}
123151
}
124152

125153
impl Resource for NamedPipe {
@@ -134,3 +162,90 @@ impl Resource for NamedPipe {
134162
self.cancel.cancel();
135163
}
136164
}
165+
166+
/// A stub address type for Windows named pipes.
167+
/// Pipes don't have traditional network addresses.
168+
#[derive(Copy, Clone, PartialEq, Eq)]
169+
pub struct WindowsPipeAddr;
170+
171+
/// A wrapper around `NamedPipeClient` that implements the stream traits
172+
/// required by the network stream abstraction.
173+
pub struct WindowsPipeStream(named_pipe::NamedPipeClient);
174+
175+
impl WindowsPipeStream {
176+
pub fn new(client: named_pipe::NamedPipeClient) -> Self {
177+
Self(client)
178+
}
179+
180+
pub fn local_addr(&self) -> io::Result<WindowsPipeAddr> {
181+
Ok(WindowsPipeAddr)
182+
}
183+
184+
pub fn peer_addr(&self) -> io::Result<WindowsPipeAddr> {
185+
Ok(WindowsPipeAddr)
186+
}
187+
188+
pub fn into_split(
189+
self,
190+
) -> (tokio::io::ReadHalf<Self>, tokio::io::WriteHalf<Self>) {
191+
tokio::io::split(self)
192+
}
193+
}
194+
195+
impl AsyncRead for WindowsPipeStream {
196+
fn poll_read(
197+
self: Pin<&mut Self>,
198+
cx: &mut Context<'_>,
199+
buf: &mut ReadBuf<'_>,
200+
) -> Poll<io::Result<()>> {
201+
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
202+
}
203+
}
204+
205+
impl AsyncWrite for WindowsPipeStream {
206+
fn poll_write(
207+
self: Pin<&mut Self>,
208+
cx: &mut Context<'_>,
209+
buf: &[u8],
210+
) -> Poll<io::Result<usize>> {
211+
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
212+
}
213+
214+
fn poll_flush(
215+
self: Pin<&mut Self>,
216+
cx: &mut Context<'_>,
217+
) -> Poll<io::Result<()>> {
218+
Pin::new(&mut self.get_mut().0).poll_flush(cx)
219+
}
220+
221+
fn poll_shutdown(
222+
self: Pin<&mut Self>,
223+
cx: &mut Context<'_>,
224+
) -> Poll<io::Result<()>> {
225+
Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
226+
}
227+
}
228+
229+
/// A stub listener type for Windows named pipes.
230+
/// Not used for HTTP client connections, but required by the
231+
/// network stream abstraction.
232+
pub struct WindowsPipeListener;
233+
234+
impl WindowsPipeListener {
235+
#[allow(clippy::unused_async)]
236+
pub async fn accept(
237+
&self,
238+
) -> io::Result<(WindowsPipeStream, WindowsPipeAddr)> {
239+
Err(io::Error::new(
240+
io::ErrorKind::Unsupported,
241+
"WindowsPipeListener::accept is not supported",
242+
))
243+
}
244+
245+
pub fn local_addr(&self) -> io::Result<WindowsPipeAddr> {
246+
Err(io::Error::new(
247+
io::ErrorKind::Unsupported,
248+
"WindowsPipeListener::local_addr is not supported",
249+
))
250+
}
251+
}

ext/node/ops/http.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,62 @@ pub async fn op_node_http_request_with_conn(
176176
.any(|part| part.trim_ascii() == b"upgrade")
177177
});
178178

179+
// Take the network stream resource for HTTP communication.
180+
// On Windows, NamedPipe resources may have pending read operations
181+
// (from readStart() in stream_wrap.ts) that hold extra Rc references,
182+
// preventing Rc::try_unwrap(). We handle this by cancelling pending ops
183+
// and yielding to let them complete before extracting the pipe.
184+
#[cfg(windows)]
185+
let stream = {
186+
let is_pipe = state
187+
.borrow()
188+
.resource_table
189+
.get::<deno_net::win_pipe::NamedPipe>(conn_rid)
190+
.is_ok();
191+
if is_pipe {
192+
// Take the NamedPipe from the resource table
193+
let pipe_rc = state
194+
.borrow_mut()
195+
.resource_table
196+
.take::<deno_net::win_pipe::NamedPipe>(conn_rid)
197+
.map_err(ConnError::Resource)?;
198+
199+
// Cancel pending read/write operations. This triggers the CancelHandle,
200+
// causing in-flight ops to complete with a cancellation error and
201+
// release their Rc references.
202+
pipe_rc.cancel_pending_ops();
203+
204+
// Yield to the event loop so cancelled ops can be polled, see the
205+
// cancellation, and drop their Rc references to the NamedPipe.
206+
//
207+
// Invariant: a single yield is sufficient because:
208+
// 1. cancel_pending_ops() triggers the CancelHandle, which causes
209+
// all in-flight read/write futures to resolve on their next poll.
210+
// 2. The `if (!this.#reading) return;` guard in stream_wrap.ts's
211+
// #read() (after its own PromiseResolve yield) ensures the JS
212+
// read loop bails out before starting a new op_read.
213+
// 3. yield_now() gives the executor one turn to poll those cancelled
214+
// futures and drop their Rc references.
215+
tokio::task::yield_now().await;
216+
217+
// Now we should be the sole Rc owner
218+
let resource = Rc::try_unwrap(pipe_rc)
219+
.map_err(|_| ConnError::Resource(ResourceError::BadResourceId))?;
220+
let client = resource
221+
.into_client()
222+
.map_err(|_| ConnError::Resource(ResourceError::BadResourceId))?;
223+
NetworkStream::WindowsPipe(deno_net::win_pipe::WindowsPipeStream::new(
224+
client,
225+
))
226+
} else {
227+
take_network_stream_resource(
228+
&mut state.borrow_mut().resource_table,
229+
conn_rid,
230+
)
231+
.map_err(|_| ConnError::Resource(ResourceError::BadResourceId))?
232+
}
233+
};
234+
#[cfg(not(windows))]
179235
let stream = take_network_stream_resource(
180236
&mut state.borrow_mut().resource_table,
181237
conn_rid,
@@ -466,6 +522,10 @@ pub async fn op_node_http_response_reclaim_conn(
466522
NetworkStream::Tunnel(_) => {
467523
return Ok(None);
468524
}
525+
#[cfg(windows)]
526+
NetworkStream::WindowsPipe(_) => {
527+
return Ok(None);
528+
}
469529
};
470530

471531
Ok(Some(rid))

0 commit comments

Comments
 (0)