From aaf001bdcc695a71570960d2a6b4765ccf2b93d5 Mon Sep 17 00:00:00 2001 From: Arshia Date: Fri, 17 Apr 2026 11:53:35 +0400 Subject: [PATCH 1/6] use __asyncify in fd_read, allowing signal processing and DL operations to happen while waiting for data --- lib/wasix/src/syscalls/mod.rs | 1 + lib/wasix/src/syscalls/wasi/fd_read.rs | 568 +++++++++++-------------- 2 files changed, 260 insertions(+), 309 deletions(-) diff --git a/lib/wasix/src/syscalls/mod.rs b/lib/wasix/src/syscalls/mod.rs index a3d58df119af..abc50c228193 100644 --- a/lib/wasix/src/syscalls/mod.rs +++ b/lib/wasix/src/syscalls/mod.rs @@ -340,6 +340,7 @@ where if let Poll::Ready(res) = Pin::new(&mut self.pinned_work).poll(cx) { return Poll::Ready(Ok(res)); } + WasiEnv::do_pending_link_operations(self.ctx, false); if let Some(signals) = self.ctx.data().thread.pop_signals_or_subscribe(cx.waker()) { if let Err(err) = WasiEnv::process_signals_internal(self.ctx, signals) { return Poll::Ready(Err(err)); diff --git a/lib/wasix/src/syscalls/wasi/fd_read.rs b/lib/wasix/src/syscalls/wasi/fd_read.rs index f5de216676dc..fe562cb63a8b 100644 --- a/lib/wasix/src/syscalls/wasi/fd_read.rs +++ b/lib/wasix/src/syscalls/wasi/fd_read.rs @@ -131,6 +131,45 @@ pub(crate) fn fd_read_internal_handler( Ok(ret) } +/// Extracts validated iov buffer specs from WASM memory as raw host pointers, +/// releasing the `MemoryView` borrow before returning so that `ctx` can be +/// passed exclusively to `__asyncify`. +/// +/// # Safety +/// The returned `*mut u8` base pointer points into WASM linear memory. +/// It remains valid as long as the memory is not grown. Callers must only use +/// the returned pointers while the calling thread is blocked inside +/// `__asyncify` / `block_on`, where WASM execution (and thus `memory.grow`) +/// cannot occur on this thread. +unsafe fn extract_iov_bufs( + ctx: &FunctionEnvMut<'_, WasiEnv>, + iovs: WasmPtr<__wasi_iovec_t, M>, + iovs_len: M::Offset, +) -> Result<(*mut u8, Vec<(usize, usize)>), Errno> { + let env = ctx.data(); + let memory = unsafe { env.memory_view(ctx) }; + let base = memory.data_ptr(); + let mem_size = memory.data_size() as usize; + + let iovs_arr = iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?; + let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?; + + let specs = iovs_arr + .iter() + .map(|iov| { + let buf_offset: usize = iov.buf.try_into().map_err(|_| Errno::Overflow)?; + let buf_len: usize = iov.buf_len.try_into().map_err(|_| Errno::Overflow)?; + if buf_offset.saturating_add(buf_len) > mem_size { + return Err(Errno::Fault); + } + Ok((buf_offset, buf_len)) + }) + .collect::, _>>()?; + + // `iovs_arr` and `memory` drop here, releasing the borrow on `ctx`. + Ok((base, specs)) +} + #[allow(clippy::await_holding_lock)] pub(crate) fn fd_read_internal( ctx: &mut FunctionEnvMut<'_, WasiEnv>, @@ -142,331 +181,242 @@ pub(crate) fn fd_read_internal( nread: WasmPtr, should_update_cursor: bool, ) -> WasiResult { - let env = ctx.data(); - let memory = unsafe { env.memory_view(&ctx) }; - let state = env.state(); let is_stdio = fd_entry.is_stdio; - let bytes_read = { - if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_READ) { - // TODO: figure out the error to return when lacking rights - return Ok(Err(Errno::Access)); - } + if !is_stdio && !fd_entry.inner.rights.contains(Rights::FD_READ) { + // TODO: figure out the error to return when lacking rights + return Ok(Err(Errno::Access)); + } - let inode = fd_entry.inode; - let fd_flags = fd_entry.inner.flags; - - let (bytes_read, can_update_cursor) = { - let mut guard = inode.write(); - match guard.deref_mut() { - Kind::File { handle, .. } => { - let Some(handle) = handle else { - tracing::warn!("fd_read: file handle is None"); - return Ok(Err(Errno::Badf)); - }; - let handle = handle.clone(); - - drop(guard); - - let res = __asyncify_light( - env, - if fd_flags.contains(Fdflags::NONBLOCK) { - Some(Duration::ZERO) - } else { - None - }, - async move { - let mut handle = match handle.write() { - Ok(a) => a, - Err(_) => return Err(Errno::Fault), - }; - if !is_stdio { - handle - .seek(std::io::SeekFrom::Start(offset as u64)) - .await - .map_err(map_io_err)?; - } - - let mut total_read = 0usize; - - let iovs_arr = - iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?; - let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?; - for iovs in iovs_arr.iter() { - let mut buf = WasmPtr::::new(iovs.buf) - .slice(&memory, iovs.buf_len) - .map_err(mem_error_to_wasi)? - .access() - .map_err(mem_error_to_wasi)?; - let r = handle.read(buf.as_mut()).await.map_err(|err| { - let err = From::::from(err); - match err { - Errno::Again => { - if is_stdio { - Errno::Badf - } else { - Errno::Again - } - } - a => a, - } - }); - let local_read = match r { - Ok(s) => s, - Err(_) if total_read > 0 => break, - Err(err) => return Err(err), - }; - total_read += local_read; - if local_read != buf.len() { - break; - } - } - Ok(total_read) - }, - ); - let read = wasi_try_ok_ok!(res?.map_err(|err| match err { - Errno::Timedout => Errno::Again, - a => a, - })); - (read, true) + let inode = fd_entry.inode; + let fd_flags = fd_entry.inner.flags; + let nonblocking = fd_flags.contains(Fdflags::NONBLOCK); + let asyncify_timeout = if nonblocking { + Some(Duration::ZERO) + } else { + None + }; + + // Extract iov buffer specs (as raw pointers + lengths) from WASM memory + // before releasing the memory borrow. This allows `ctx` to be passed + // exclusively into `__asyncify`, which provides signal and DL-op handling. + // + // Safety: we only use these pointers inside `__asyncify`, which drives the + // future synchronously via `block_on`, parking this thread. While parked, + // WASM cannot execute `memory.grow`, so the base pointer is stable. + let (mem_base, iov_specs) = + wasi_try_ok_ok!(unsafe { extract_iov_bufs::(ctx, iovs, iovs_len) }); + + let (bytes_read, can_update_cursor) = { + let mut guard = inode.write(); + match guard.deref_mut() { + Kind::File { handle, .. } => { + let Some(handle) = handle else { + tracing::warn!("fd_read: file handle is None"); + return Ok(Err(Errno::Badf)); + }; + let handle = handle.clone(); + drop(guard); + + let res = __asyncify(ctx, asyncify_timeout, async move { + let mut handle = handle.write().map_err(|_| Errno::Fault)?; + if !is_stdio { + handle + .seek(std::io::SeekFrom::Start(offset as u64)) + .await + .map_err(map_io_err)?; + } + + let mut total_read = 0usize; + for (buf_offset, buf_len) in iov_specs { + let buf = unsafe { + std::slice::from_raw_parts_mut(mem_base.add(buf_offset), buf_len) + }; + let local_read = + handle.read(buf).await.map_err( + |err| match From::::from(err) { + Errno::Again if is_stdio => Errno::Badf, + e => e, + }, + ); + let local_read = match local_read { + Ok(n) => n, + Err(_) if total_read > 0 => break, + Err(err) => return Err(err), + }; + total_read += local_read; + if local_read < buf_len { + break; + } + } + Ok(total_read) + }); + let read = wasi_try_ok_ok!(res?.map_err(|err| match err { + Errno::Timedout => Errno::Again, + a => a, + })); + (read, true) + } + Kind::Socket { socket } => { + let socket = socket.clone(); + drop(guard); + + let timeout = socket + .opt_time(TimeType::ReadTimeout) + .ok() + .flatten() + .unwrap_or(Duration::from_secs(30)); + let tasks = ctx.data().tasks().clone(); + + let res = __asyncify(ctx, asyncify_timeout, async move { + let mut total_read = 0usize; + for (buf_offset, buf_len) in iov_specs { + let buf = unsafe { + std::slice::from_raw_parts_mut( + mem_base.add(buf_offset) as *mut std::mem::MaybeUninit, + buf_len, + ) + }; + let local_read = socket + .recv(tasks.deref(), buf, Some(timeout), nonblocking, false) + .await?; + total_read += local_read; + // A zero-byte return signals connection closed (EOF); + // a short read is normal for stream sockets and does NOT + // indicate end-of-stream. + if local_read == 0 { + break; + } + } + Ok(total_read) + }); + let res = res?.map_err(|err| match err { + Errno::Timedout => Errno::Again, + a => a, + }); + match res { + Err(Errno::Connaborted) | Err(Errno::Connreset) => (0, false), + res => { + let bytes_read = wasi_try_ok_ok!(res); + (bytes_read, false) + } } - Kind::Socket { socket } => { - let socket = socket.clone(); - - drop(guard); - - let nonblocking = fd_flags.contains(Fdflags::NONBLOCK); - let timeout = socket - .opt_time(TimeType::ReadTimeout) - .ok() - .flatten() - .unwrap_or(Duration::from_secs(30)); - - let tasks = env.tasks().clone(); - let res = __asyncify_light( - env, - if fd_flags.contains(Fdflags::NONBLOCK) { - Some(Duration::ZERO) + } + Kind::PipeTx { .. } => return Ok(Err(Errno::Badf)), + Kind::PipeRx { rx } => { + let mut rx = rx.clone(); + drop(guard); + + let res = __asyncify(ctx, asyncify_timeout, async move { + let mut total_read = 0usize; + for (buf_offset, buf_len) in iov_specs { + let buf = unsafe { + std::slice::from_raw_parts_mut(mem_base.add(buf_offset), buf_len) + }; + let local_read = if nonblocking { + rx.try_read(buf).ok_or(Errno::Again)? } else { - None - }, - async move { - let mut total_read = 0usize; - - let iovs_arr = - iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?; - let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?; - for iovs in iovs_arr.iter() { - let mut buf = WasmPtr::::new(iovs.buf) - .slice(&memory, iovs.buf_len) - .map_err(mem_error_to_wasi)? - .access() - .map_err(mem_error_to_wasi)?; - - let local_read = socket - .recv( - tasks.deref(), - buf.as_mut_uninit(), - Some(timeout), - nonblocking, - false, - ) - .await?; - total_read += local_read; - if total_read != buf.len() { - break; - } - } - Ok(total_read) - }, - ); - let res = res?.map_err(|err| match err { - Errno::Timedout => Errno::Again, - a => a, - }); - match res { - Err(Errno::Connaborted) | Err(Errno::Connreset) => (0, false), - res => { - let bytes_read = wasi_try_ok_ok!(res); - (bytes_read, false) + virtual_fs::AsyncReadExt::read(&mut rx, buf).await? + }; + total_read += local_read; + if local_read < buf_len { + break; } } - } - Kind::PipeTx { .. } => return Ok(Err(Errno::Badf)), - Kind::PipeRx { rx } => { - let mut rx = rx.clone(); - drop(guard); - - let nonblocking = fd_flags.contains(Fdflags::NONBLOCK); - - let res = __asyncify_light( - env, - if fd_flags.contains(Fdflags::NONBLOCK) { - Some(Duration::ZERO) + Ok(total_read) + }); + let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err { + Errno::Timedout => Errno::Again, + a => a, + })); + (bytes_read, false) + } + Kind::DuplexPipe { pipe } => { + let mut pipe = pipe.clone(); + drop(guard); + + let res = __asyncify(ctx, asyncify_timeout, async move { + let mut total_read = 0usize; + for (buf_offset, buf_len) in iov_specs { + let buf = unsafe { + std::slice::from_raw_parts_mut(mem_base.add(buf_offset), buf_len) + }; + let local_read = if nonblocking { + pipe.try_read(buf).ok_or(Errno::Again)? } else { - None - }, - async move { - let mut total_read = 0usize; - - let iovs_arr = - iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?; - let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?; - for iovs in iovs_arr.iter() { - let mut buf = WasmPtr::::new(iovs.buf) - .slice(&memory, iovs.buf_len) - .map_err(mem_error_to_wasi)? - .access() - .map_err(mem_error_to_wasi)?; - - let local_read = match nonblocking { - true => match rx.try_read(buf.as_mut()) { - Some(amt) => amt, - None => { - return Err(Errno::Again); - } - }, - false => { - virtual_fs::AsyncReadExt::read(&mut rx, buf.as_mut()) - .await? - } - }; - total_read += local_read; - if local_read != buf.len() { - break; - } - } - Ok(total_read) - }, - ); - - let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err { - Errno::Timedout => Errno::Again, - a => a, - })); - - (bytes_read, false) + virtual_fs::AsyncReadExt::read(&mut pipe, buf).await? + }; + total_read += local_read; + if local_read < buf_len { + break; + } + } + Ok(total_read) + }); + let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err { + Errno::Timedout => Errno::Again, + a => a, + })); + (bytes_read, false) + } + Kind::Dir { .. } | Kind::Root { .. } => { + // TODO: verify + return Ok(Err(Errno::Isdir)); + } + Kind::EventNotifications { inner } => { + struct NotifyPoller { + inner: Arc, + non_blocking: bool, } - Kind::DuplexPipe { pipe } => { - let mut pipe = pipe.clone(); - drop(guard); - - let nonblocking = fd_flags.contains(Fdflags::NONBLOCK); - - let res = __asyncify_light( - env, - if fd_flags.contains(Fdflags::NONBLOCK) { - Some(Duration::ZERO) + impl Future for NotifyPoller { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.non_blocking { + Poll::Ready(self.inner.try_read().ok_or(Errno::Again)) } else { - None - }, - async move { - let mut total_read = 0usize; - - let iovs_arr = - iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?; - let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?; - for iovs in iovs_arr.iter() { - let mut buf = WasmPtr::::new(iovs.buf) - .slice(&memory, iovs.buf_len) - .map_err(mem_error_to_wasi)? - .access() - .map_err(mem_error_to_wasi)?; - - let local_read = match nonblocking { - true => match pipe.try_read(buf.as_mut()) { - Some(amt) => amt, - None => { - return Err(Errno::Again); - } - }, - false => { - virtual_fs::AsyncReadExt::read(&mut pipe, buf.as_mut()) - .await? - } - }; - total_read += local_read; - if local_read != buf.len() { - break; - } - } - Ok(total_read) - }, - ); - - let bytes_read = wasi_try_ok_ok!(res?.map_err(|err| match err { - Errno::Timedout => Errno::Again, - a => a, - })); - - (bytes_read, false) - } - Kind::Dir { .. } | Kind::Root { .. } => { - // TODO: verify - return Ok(Err(Errno::Isdir)); - } - Kind::EventNotifications { inner } => { - // Create a poller - struct NotifyPoller { - inner: Arc, - non_blocking: bool, - } - let poller = NotifyPoller { - inner: inner.clone(), - non_blocking: fd_flags.contains(Fdflags::NONBLOCK), - }; - - drop(guard); - - // The poller will register itself for notifications and wait for the - // counter to drop - impl Future for NotifyPoller { - type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.non_blocking { - Poll::Ready(self.inner.try_read().ok_or(Errno::Again)) - } else { - self.inner.read(cx.waker()).map(Ok) - } + self.inner.read(cx.waker()).map(Ok) } } - - // Yield until the notifications are triggered - let tasks_inner = env.tasks().clone(); - - let res = __asyncify_light(env, None, poller)?.map_err(|err| match err { - Errno::Timedout => Errno::Again, - a => a, - }); - let val = wasi_try_ok_ok!(res); - - let mut memory = unsafe { env.memory_view(ctx) }; - let reader = val.to_ne_bytes(); - let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len)); - let ret = wasi_try_ok_ok!(read_bytes(&reader[..], &memory, iovs_arr)); - (ret, false) } - Kind::Symlink { .. } | Kind::Epoll { .. } => { - return Ok(Err(Errno::Notsup)); - } - Kind::Buffer { buffer } => { - let memory = unsafe { env.memory_view(ctx) }; - let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len)); - let read = wasi_try_ok_ok!(read_bytes(&buffer[offset..], &memory, iovs_arr)); - (read, true) - } - } - }; - if !is_stdio && should_update_cursor && can_update_cursor { - fd_entry - .inner - .offset - .fetch_add(bytes_read as u64, Ordering::AcqRel); + let poller = NotifyPoller { + inner: inner.clone(), + non_blocking: nonblocking, + }; + drop(guard); + + let res = __asyncify(ctx, None, poller)?.map_err(|err| match err { + Errno::Timedout => Errno::Again, + a => a, + }); + let val = wasi_try_ok_ok!(res); + + let env = ctx.data(); + let memory = unsafe { env.memory_view(ctx) }; + let reader = val.to_ne_bytes(); + let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len)); + let ret = wasi_try_ok_ok!(read_bytes(&reader[..], &memory, iovs_arr)); + (ret, false) + } + Kind::Symlink { .. } | Kind::Epoll { .. } => { + return Ok(Err(Errno::Notsup)); + } + Kind::Buffer { buffer } => { + let env = ctx.data(); + let memory = unsafe { env.memory_view(ctx) }; + let iovs_arr = wasi_try_mem_ok_ok!(iovs.slice(&memory, iovs_len)); + let read = wasi_try_ok_ok!(read_bytes(&buffer[offset..], &memory, iovs_arr)); + (read, true) + } } - - bytes_read }; + if !is_stdio && should_update_cursor && can_update_cursor { + fd_entry + .inner + .offset + .fetch_add(bytes_read as u64, Ordering::AcqRel); + } + Ok(Ok(bytes_read)) } From 98db5043766415aa1856a9915a001e5ea1ba8c07 Mon Sep 17 00:00:00 2001 From: Arshia Date: Fri, 17 Apr 2026 17:50:30 +0400 Subject: [PATCH 2/6] Make clippy happy --- lib/wasix/src/syscalls/wasi/fd_read.rs | 88 ++++++++++++++++---------- 1 file changed, 56 insertions(+), 32 deletions(-) diff --git a/lib/wasix/src/syscalls/wasi/fd_read.rs b/lib/wasix/src/syscalls/wasi/fd_read.rs index fe562cb63a8b..2173989bd6d2 100644 --- a/lib/wasix/src/syscalls/wasi/fd_read.rs +++ b/lib/wasix/src/syscalls/wasi/fd_read.rs @@ -132,29 +132,66 @@ pub(crate) fn fd_read_internal_handler( } /// Extracts validated iov buffer specs from WASM memory as raw host pointers, -/// releasing the `MemoryView` borrow before returning so that `ctx` can be -/// passed exclusively to `__asyncify`. +/// Raw iov buffer specs extracted from WASM memory before entering `__asyncify`. +/// +/// `mem_base` points to the start of WASM linear memory. Each entry in `bufs` +/// is a `(offset_into_memory, length)` pair describing one iov slice. /// /// # Safety -/// The returned `*mut u8` base pointer points into WASM linear memory. -/// It remains valid as long as the memory is not grown. Callers must only use -/// the returned pointers while the calling thread is blocked inside -/// `__asyncify` / `block_on`, where WASM execution (and thus `memory.grow`) -/// cannot occur on this thread. +/// `mem_base` remains valid only while the calling thread is blocked inside +/// `__asyncify` / `block_on`. WASM cannot execute `memory.grow` while the +/// thread is parked there, so the base pointer is stable for the duration. +struct IovBufs { + mem_base: *mut u8, + bufs: Vec<(usize, usize)>, +} + +impl IovBufs { + /// Yields a mutable byte slice for each iov entry. + /// + /// # Safety + /// Must only be called while `mem_base` is still valid (i.e. inside + /// `__asyncify`). + unsafe fn iter_slices_mut(&self) -> impl Iterator { + self.bufs.iter().map(|&(offset, len)| unsafe { + std::slice::from_raw_parts_mut(self.mem_base.add(offset), len) + }) + } + + /// Yields a mutable `MaybeUninit` slice for each iov entry (for + /// socket `recv`, which writes via uninitialized bytes). + /// + /// # Safety + /// Must only be called while `mem_base` is still valid. + unsafe fn iter_uninit_slices_mut( + &self, + ) -> impl Iterator]> { + self.bufs.iter().map(|&(offset, len)| unsafe { + std::slice::from_raw_parts_mut( + self.mem_base.add(offset) as *mut std::mem::MaybeUninit, + len, + ) + }) + } +} + +/// Extracts validated iov buffer specs from WASM memory as an [`IovBufs`], +/// releasing the `MemoryView` borrow before returning so that `ctx` can be +/// passed exclusively to `__asyncify`. unsafe fn extract_iov_bufs( ctx: &FunctionEnvMut<'_, WasiEnv>, iovs: WasmPtr<__wasi_iovec_t, M>, iovs_len: M::Offset, -) -> Result<(*mut u8, Vec<(usize, usize)>), Errno> { +) -> Result { let env = ctx.data(); let memory = unsafe { env.memory_view(ctx) }; - let base = memory.data_ptr(); + let mem_base = memory.data_ptr(); let mem_size = memory.data_size() as usize; let iovs_arr = iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?; let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?; - let specs = iovs_arr + let bufs = iovs_arr .iter() .map(|iov| { let buf_offset: usize = iov.buf.try_into().map_err(|_| Errno::Overflow)?; @@ -167,7 +204,7 @@ unsafe fn extract_iov_bufs( .collect::, _>>()?; // `iovs_arr` and `memory` drop here, releasing the borrow on `ctx`. - Ok((base, specs)) + Ok(IovBufs { mem_base, bufs }) } #[allow(clippy::await_holding_lock)] @@ -204,8 +241,7 @@ pub(crate) fn fd_read_internal( // Safety: we only use these pointers inside `__asyncify`, which drives the // future synchronously via `block_on`, parking this thread. While parked, // WASM cannot execute `memory.grow`, so the base pointer is stable. - let (mem_base, iov_specs) = - wasi_try_ok_ok!(unsafe { extract_iov_bufs::(ctx, iovs, iovs_len) }); + let iov_bufs = wasi_try_ok_ok!(unsafe { extract_iov_bufs::(ctx, iovs, iovs_len) }); let (bytes_read, can_update_cursor) = { let mut guard = inode.write(); @@ -228,10 +264,8 @@ pub(crate) fn fd_read_internal( } let mut total_read = 0usize; - for (buf_offset, buf_len) in iov_specs { - let buf = unsafe { - std::slice::from_raw_parts_mut(mem_base.add(buf_offset), buf_len) - }; + for buf in unsafe { iov_bufs.iter_slices_mut() } { + let buf_len = buf.len(); let local_read = handle.read(buf).await.map_err( |err| match From::::from(err) { @@ -270,13 +304,7 @@ pub(crate) fn fd_read_internal( let res = __asyncify(ctx, asyncify_timeout, async move { let mut total_read = 0usize; - for (buf_offset, buf_len) in iov_specs { - let buf = unsafe { - std::slice::from_raw_parts_mut( - mem_base.add(buf_offset) as *mut std::mem::MaybeUninit, - buf_len, - ) - }; + for buf in unsafe { iov_bufs.iter_uninit_slices_mut() } { let local_read = socket .recv(tasks.deref(), buf, Some(timeout), nonblocking, false) .await?; @@ -309,10 +337,8 @@ pub(crate) fn fd_read_internal( let res = __asyncify(ctx, asyncify_timeout, async move { let mut total_read = 0usize; - for (buf_offset, buf_len) in iov_specs { - let buf = unsafe { - std::slice::from_raw_parts_mut(mem_base.add(buf_offset), buf_len) - }; + for buf in unsafe { iov_bufs.iter_slices_mut() } { + let buf_len = buf.len(); let local_read = if nonblocking { rx.try_read(buf).ok_or(Errno::Again)? } else { @@ -337,10 +363,8 @@ pub(crate) fn fd_read_internal( let res = __asyncify(ctx, asyncify_timeout, async move { let mut total_read = 0usize; - for (buf_offset, buf_len) in iov_specs { - let buf = unsafe { - std::slice::from_raw_parts_mut(mem_base.add(buf_offset), buf_len) - }; + for buf in unsafe { iov_bufs.iter_slices_mut() } { + let buf_len = buf.len(); let local_read = if nonblocking { pipe.try_read(buf).ok_or(Errno::Again)? } else { From 02f13df12d3748a3faa5bc44228d4771512571ec Mon Sep 17 00:00:00 2001 From: Arshia Date: Mon, 27 Apr 2026 13:59:22 +0400 Subject: [PATCH 3/6] Add test, address review comments, fix implementation Co-authored-by: Copilot --- lib/wasix/src/syscalls/mod.rs | 13 ++- lib/wasix/src/syscalls/wasi/fd_read.rs | 30 +++--- lib/wasix/tests/wasm_tests/fd_tests.rs | 29 +++++- .../fd_tests/stdin-dlopen-race/build.sh | 10 ++ .../fd_tests/stdin-dlopen-race/main.c | 71 ++++++++++++++ .../fd_tests/stdin-dlopen-race/side.c | 1 + lib/wasix/tests/wasm_tests/mod.rs | 96 +++++++++++++++++++ 7 files changed, 236 insertions(+), 14 deletions(-) create mode 100644 lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/build.sh create mode 100644 lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/main.c create mode 100644 lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/side.c diff --git a/lib/wasix/src/syscalls/mod.rs b/lib/wasix/src/syscalls/mod.rs index abc50c228193..41216138f23b 100644 --- a/lib/wasix/src/syscalls/mod.rs +++ b/lib/wasix/src/syscalls/mod.rs @@ -340,11 +340,18 @@ where if let Poll::Ready(res) = Pin::new(&mut self.pinned_work).poll(cx) { return Poll::Ready(Ok(res)); } - WasiEnv::do_pending_link_operations(self.ctx, false); + if let Err(err) = WasiEnv::do_pending_link_operations(self.ctx, false) { + return Poll::Ready(Err(err)); + } if let Some(signals) = self.ctx.data().thread.pop_signals_or_subscribe(cx.waker()) { + let only_sigwakeup = signals.iter().all(|sig| matches!(sig, Signal::Sigwakeup)); if let Err(err) = WasiEnv::process_signals_internal(self.ctx, signals) { return Poll::Ready(Err(err)); } + if only_sigwakeup { + self.ctx.data().thread.signals_subscribe(cx.waker()); + return Poll::Pending; + } return Poll::Ready(Ok(Err(Errno::Intr))); } Poll::Pending @@ -382,7 +389,9 @@ where return Poll::Ready(Ok(res)); } - WasiEnv::do_pending_link_operations(self.ctx, false); + if let Err(err) = WasiEnv::do_pending_link_operations(self.ctx, false) { + return Poll::Ready(Err(err)); + } let env = self.ctx.data(); if let Some(forced_exit) = env.thread.try_join() { diff --git a/lib/wasix/src/syscalls/wasi/fd_read.rs b/lib/wasix/src/syscalls/wasi/fd_read.rs index 2173989bd6d2..4290f5e63a04 100644 --- a/lib/wasix/src/syscalls/wasi/fd_read.rs +++ b/lib/wasix/src/syscalls/wasi/fd_read.rs @@ -307,12 +307,14 @@ pub(crate) fn fd_read_internal( for buf in unsafe { iov_bufs.iter_uninit_slices_mut() } { let local_read = socket .recv(tasks.deref(), buf, Some(timeout), nonblocking, false) - .await?; + .await; + let local_read = match local_read { + Ok(n) => n, + Err(Errno::Again) | Err(Errno::Timedout) if total_read > 0 => break, + Err(err) => return Err(err), + }; total_read += local_read; - // A zero-byte return signals connection closed (EOF); - // a short read is normal for stream sockets and does NOT - // indicate end-of-stream. - if local_read == 0 { + if local_read < buf.len() { break; } } @@ -338,14 +340,17 @@ pub(crate) fn fd_read_internal( let res = __asyncify(ctx, asyncify_timeout, async move { let mut total_read = 0usize; for buf in unsafe { iov_bufs.iter_slices_mut() } { - let buf_len = buf.len(); let local_read = if nonblocking { - rx.try_read(buf).ok_or(Errno::Again)? + match rx.try_read(buf) { + Some(n) => n, + None if total_read > 0 => break, + None => return Err(Errno::Again), + } } else { virtual_fs::AsyncReadExt::read(&mut rx, buf).await? }; total_read += local_read; - if local_read < buf_len { + if local_read < buf.len() { break; } } @@ -364,14 +369,17 @@ pub(crate) fn fd_read_internal( let res = __asyncify(ctx, asyncify_timeout, async move { let mut total_read = 0usize; for buf in unsafe { iov_bufs.iter_slices_mut() } { - let buf_len = buf.len(); let local_read = if nonblocking { - pipe.try_read(buf).ok_or(Errno::Again)? + match pipe.try_read(buf) { + Some(n) => n, + None if total_read > 0 => break, + None => return Err(Errno::Again), + } } else { virtual_fs::AsyncReadExt::read(&mut pipe, buf).await? }; total_read += local_read; - if local_read < buf_len { + if local_read < buf.len() { break; } } diff --git a/lib/wasix/tests/wasm_tests/fd_tests.rs b/lib/wasix/tests/wasm_tests/fd_tests.rs index 36dc20848001..22831c2c397a 100644 --- a/lib/wasix/tests/wasm_tests/fd_tests.rs +++ b/lib/wasix/tests/wasm_tests/fd_tests.rs @@ -1,7 +1,34 @@ -use super::{run_build_script, run_wasm}; +use super::{run_build_script, run_wasm, run_wasm_with_stdin}; +use wasmer_wasix::Pipe; #[test] fn test_fd_allocate() { let wasm = run_build_script(file!(), "fd-allocate").unwrap(); run_wasm(&wasm, wasm.parent().unwrap()).unwrap(); } + +/// Regression test for fd_read blocking DL operations. +/// +/// One WASM thread blocks inside `fd_read` on stdin (which never produces +/// data). While that thread is parked, the main WASM thread must be able to +/// call `dlopen` and load a shared library without deadlocking. Before the +/// fix, `fd_read` held a lock that prevented DL operations from proceeding. +#[test] +fn test_stdin_read_does_not_block_dlopen() { + let wasm = run_build_script(file!(), "stdin-dlopen-race").unwrap(); + + // Keep the write end alive so stdin remains blocked for the whole guest + // run; the guest exits explicitly after proving dlopen works. + let (_pipe_tx, pipe_rx) = Pipe::channel(); + + let result = run_wasm_with_stdin(&wasm, wasm.parent().unwrap(), Box::new(pipe_rx)).unwrap(); + + let stdout = String::from_utf8_lossy(&result.stdout); + assert_eq!( + stdout.trim(), + "reader_ready\ndlopen_succeeded_after_reader_ready\nside_value=42\nsequence_ok", + "stderr: {}", + String::from_utf8_lossy(&result.stderr) + ); + assert_eq!(result.exit_code, Some(0)); +} diff --git a/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/build.sh b/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/build.sh new file mode 100644 index 000000000000..5ba07ea0b943 --- /dev/null +++ b/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/build.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -ex + +export WASIXCC_PIC=yes + +# Compile the shared library +$CC -shared side.c -o libside.so + +# Compile the main executable +$CC main.c -o main diff --git a/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/main.c b/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/main.c new file mode 100644 index 000000000000..46aad17c6b21 --- /dev/null +++ b/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/main.c @@ -0,0 +1,71 @@ +#include +#include +#include +#include +#include + +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +static int reader_ready = 0; + +typedef int (*side_value_t)(void); + +static void* reader_thread(void* arg) { + (void)arg; + + /* Signal to the main thread that we are about to enter fd_read. */ + pthread_mutex_lock(&mutex); + reader_ready = 1; + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + + /* Block in fd_read on stdin. The host provides a pipe whose write end is + * never written to, so this call parks indefinitely. The process will be + * terminated by main() returning before this ever unblocks. */ + char buf[64]; + read(STDIN_FILENO, buf, sizeof(buf)); + + return NULL; +} + +int main(void) { + pthread_t t; + pthread_create(&t, NULL, reader_thread, NULL); + + /* Wait until the reader thread has set reader_ready (it is about to call + * read(), if it hasn't already). */ + pthread_mutex_lock(&mutex); + while (!reader_ready) pthread_cond_wait(&cond, &mutex); + pthread_mutex_unlock(&mutex); + + printf("reader_ready\n"); + + /* Give the reader thread time to actually enter the blocking read before + * attempting dlopen, so this test exercises the intended race reliably. */ + sleep(1); + + /* Load a shared library while the other thread is blocked in fd_read. + * Before the fix this would deadlock because fd_read held a lock that + * the DL subsystem also needed. */ + void* handle = dlopen("libside.so", RTLD_NOW | RTLD_LOCAL); + if (!handle) { + fprintf(stderr, "dlopen failed: %s\n", dlerror()); + return 1; + } + + printf("dlopen_succeeded_after_reader_ready\n"); + + side_value_t side_value = (side_value_t)dlsym(handle, "side_value"); + if (!side_value) { + fprintf(stderr, "dlsym failed: %s\n", dlerror()); + dlclose(handle); + return 1; + } + + printf("side_value=%d\n", side_value()); + printf("sequence_ok\n"); + dlclose(handle); + + fflush(stdout); + _Exit(0); +} diff --git a/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/side.c b/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/side.c new file mode 100644 index 000000000000..2379f10d4023 --- /dev/null +++ b/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/side.c @@ -0,0 +1 @@ +int side_value(void) { return 42; } diff --git a/lib/wasix/tests/wasm_tests/mod.rs b/lib/wasix/tests/wasm_tests/mod.rs index e118b9dbb733..26bf88638430 100644 --- a/lib/wasix/tests/wasm_tests/mod.rs +++ b/lib/wasix/tests/wasm_tests/mod.rs @@ -422,6 +422,102 @@ pub fn run_wasm_with_result( }) } +/// Run a compiled WASM file using WasiRunner with a custom stdin. +/// +/// Useful for testing syscalls that block on stdin (e.g. fd_read): pass the +/// receiving end of a `Pipe::channel()` and keep the sending end alive in the +/// caller so that stdin never returns EOF while the guest is running. +#[allow(unused)] +pub fn run_wasm_with_stdin( + wasm_path: &PathBuf, + dir: &Path, + stdin: Box, +) -> Result { + let wasm_bytes = std::fs::read(wasm_path)?; + let engine = create_engine_for_wasm(&wasm_bytes); + let module_data = HashedModuleData::new(wasm_bytes); + let hash = *module_data.hash(); + + let stdout_buffer = Arc::new(Mutex::new(Vec::new())); + let stderr_buffer = Arc::new(Mutex::new(Vec::new())); + let stdout_capture = Box::new(CaptureFile::new(stdout_buffer.clone())); + let stderr_capture = Box::new(CaptureFile::new(stderr_buffer.clone())); + + let rt = create_runtime(); + let result = rt.block_on(async { + let cache_dir = get_cache_dir(); + std::fs::create_dir_all(&cache_dir).ok(); + + let rt_handle = wasmer_wasix::runtime::task_manager::tokio::RuntimeOrHandle::Handle( + tokio::runtime::Handle::current(), + ); + let tokio_task_manager = + Arc::new(wasmer_wasix::runtime::task_manager::tokio::TokioTaskManager::new(rt_handle)); + let module_cache = wasmer_wasix::runtime::module_cache::SharedCache::default() + .with_fallback(wasmer_wasix::runtime::module_cache::FileSystemCache::new( + cache_dir, + tokio_task_manager, + )); + let arc_cache = Arc::new(module_cache); + + let module = wasmer_wasix::runtime::load_module( + &engine, + &arc_cache, + wasmer_wasix::runtime::ModuleInput::Hashed(Cow::Borrowed(&module_data)), + None, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to load module: {}", e))?; + + tokio::task::block_in_place(move || { + let mut runner = WasiRunner::new(); + runner + .with_mapped_directories([MappedDirectory { + guest: dir.to_string_lossy().to_string(), + host: dir.to_path_buf(), + }]) + .with_mapped_directories([MappedDirectory { + guest: "/lib".to_string(), + host: dir.to_path_buf(), + }]) + .with_current_dir(dir.to_string_lossy().to_string()) + .with_stdin(stdin) + .with_stdout(stdout_capture) + .with_stderr(stderr_capture); + runner.run_wasm( + RuntimeOrEngine::Engine(engine), + wasm_path.to_string_lossy().as_ref(), + module, + hash, + ) + }) + }); + + let stdout = stdout_buffer.lock().unwrap().clone(); + let stderr = stderr_buffer.lock().unwrap().clone(); + let exit_code = match &result { + Ok(_) => Some(0), + Err(e) => { + let error_msg = e.to_string(); + if let Some(code_str) = error_msg.split("ExitCode::").nth(1) { + if let Some(code) = code_str.split_whitespace().next() { + code.parse::().ok() + } else { + None + } + } else { + None + } + } + }; + + Ok(WasmRunResult { + stdout, + stderr, + exit_code, + }) +} + /// Run a compiled WASM file using WasiRunner #[allow(unused)] pub fn run_wasm(wasm_path: &PathBuf, dir: &Path) -> Result<(), anyhow::Error> { From a6cb4c3740564e3c83cbf392750da35e23d3d024 Mon Sep 17 00:00:00 2001 From: Arshia Date: Mon, 27 Apr 2026 17:28:39 +0400 Subject: [PATCH 4/6] WIP: address review comments Co-authored-by: Copilot --- .../fd_tests/stdin-dlopen-race/build.sh | 4 +- lib/wasix/tests/wasm_tests/mod.rs | 186 +++++------------- 2 files changed, 55 insertions(+), 135 deletions(-) diff --git a/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/build.sh b/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/build.sh index 5ba07ea0b943..2d874f937dde 100644 --- a/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/build.sh +++ b/lib/wasix/tests/wasm_tests/fd_tests/stdin-dlopen-race/build.sh @@ -1,5 +1,5 @@ -#!/bin/bash -set -ex +#!/usr/bin/env bash +set -euo pipefail export WASIXCC_PIC=yes diff --git a/lib/wasix/tests/wasm_tests/mod.rs b/lib/wasix/tests/wasm_tests/mod.rs index 26bf88638430..7c0376cbc0a3 100644 --- a/lib/wasix/tests/wasm_tests/mod.rs +++ b/lib/wasix/tests/wasm_tests/mod.rs @@ -310,40 +310,37 @@ pub struct WasmRunResult { pub exit_code: Option, } -/// Run a compiled WASM file using WasiRunner and return output buffers and exit status -/// -/// This function uses the same caching mechanism as the Wasmer CLI: -/// - In-memory cache (SharedCache) for fast repeated loads within the same process -/// - Filesystem cache as a fallback for persistence across test runs -/// - Cache directory follows the same precedence as the CLI: -/// 1. WASMER_CACHE_DIR environment variable -/// 2. WASMER_DIR/cache/compiled -/// 3. ~/.wasmer/cache/compiled -/// 4. temp_dir/wasmer/cache/compiled (fallback) -/// -/// The caching significantly improves test performance by avoiding recompilation -/// of the same WASM modules across multiple test runs. -pub fn run_wasm_with_result( +fn parse_exit_code(result: &Result<(), anyhow::Error>) -> Option { + match result { + Ok(_) => Some(0), + Err(e) => { + let error_msg = e.to_string(); + error_msg + .split("ExitCode::") + .nth(1) + .and_then(|code_str| code_str.split_whitespace().next()) + .and_then(|code| code.parse::().ok()) + } + } +} + +fn run_wasm_with_overrides( wasm_path: &PathBuf, dir: &Path, + stdin: Option>, ) -> Result { - // Load the compiled WASM module let wasm_bytes = std::fs::read(wasm_path)?; let engine = create_engine_for_wasm(&wasm_bytes); let module_data = HashedModuleData::new(wasm_bytes); let hash = *module_data.hash(); - // Create buffers to capture stdout and stderr let stdout_buffer = Arc::new(Mutex::new(Vec::new())); let stderr_buffer = Arc::new(Mutex::new(Vec::new())); - let stdout_capture = Box::new(CaptureFile::new(stdout_buffer.clone())); let stderr_capture = Box::new(CaptureFile::new(stderr_buffer.clone())); let rt = create_runtime(); - let result = rt.block_on(async { - // Set up module cache with in-memory + filesystem fallback (same as CLI) let cache_dir = get_cache_dir(); std::fs::create_dir_all(&cache_dir).ok(); @@ -357,7 +354,6 @@ pub fn run_wasm_with_result( cache_dir, tokio_task_manager, )); - let arc_cache = Arc::new(module_cache); let module = wasmer_wasix::runtime::load_module( @@ -370,9 +366,8 @@ pub fn run_wasm_with_result( .map_err(|e| anyhow::anyhow!("Failed to load module: {}", e))?; tokio::task::block_in_place(move || { - // Run the WASM module using WasiRunner let mut runner = WasiRunner::new(); - runner + let runner = runner .with_mapped_directories([MappedDirectory { guest: dir.to_string_lossy().to_string(), host: dir.to_path_buf(), @@ -381,47 +376,54 @@ pub fn run_wasm_with_result( guest: "/lib".to_string(), host: dir.to_path_buf(), }]) - .with_current_dir(dir.to_string_lossy().to_string()) + .with_current_dir(dir.to_string_lossy().to_string()); + + if let Some(stdin) = stdin { + runner.with_stdin(stdin); + } + + runner .with_stdout(stdout_capture) - .with_stderr(stderr_capture); - runner.run_wasm( - RuntimeOrEngine::Engine(engine), - wasm_path.to_string_lossy().as_ref(), - module, - hash, - ) + .with_stderr(stderr_capture) + .run_wasm( + RuntimeOrEngine::Engine(engine), + wasm_path.to_string_lossy().as_ref(), + module, + hash, + ) }) }); - // Extract the captured output let stdout = stdout_buffer.lock().unwrap().clone(); let stderr = stderr_buffer.lock().unwrap().clone(); - // Extract exit code from result - let exit_code = match &result { - Ok(_) => Some(0), - Err(e) => { - // Try to extract exit code from error message - let error_msg = e.to_string(); - if let Some(code_str) = error_msg.split("ExitCode::").nth(1) { - if let Some(code) = code_str.split_whitespace().next() { - code.parse::().ok() - } else { - None - } - } else { - None - } - } - }; - Ok(WasmRunResult { stdout, stderr, - exit_code, + exit_code: parse_exit_code(&result), }) } +/// Run a compiled WASM file using WasiRunner and return output buffers and exit status +/// +/// This function uses the same caching mechanism as the Wasmer CLI: +/// - In-memory cache (SharedCache) for fast repeated loads within the same process +/// - Filesystem cache as a fallback for persistence across test runs +/// - Cache directory follows the same precedence as the CLI: +/// 1. WASMER_CACHE_DIR environment variable +/// 2. WASMER_DIR/cache/compiled +/// 3. ~/.wasmer/cache/compiled +/// 4. temp_dir/wasmer/cache/compiled (fallback) +/// +/// The caching significantly improves test performance by avoiding recompilation +/// of the same WASM modules across multiple test runs. +pub fn run_wasm_with_result( + wasm_path: &PathBuf, + dir: &Path, +) -> Result { + run_wasm_with_overrides(wasm_path, dir, None) +} + /// Run a compiled WASM file using WasiRunner with a custom stdin. /// /// Useful for testing syscalls that block on stdin (e.g. fd_read): pass the @@ -433,89 +435,7 @@ pub fn run_wasm_with_stdin( dir: &Path, stdin: Box, ) -> Result { - let wasm_bytes = std::fs::read(wasm_path)?; - let engine = create_engine_for_wasm(&wasm_bytes); - let module_data = HashedModuleData::new(wasm_bytes); - let hash = *module_data.hash(); - - let stdout_buffer = Arc::new(Mutex::new(Vec::new())); - let stderr_buffer = Arc::new(Mutex::new(Vec::new())); - let stdout_capture = Box::new(CaptureFile::new(stdout_buffer.clone())); - let stderr_capture = Box::new(CaptureFile::new(stderr_buffer.clone())); - - let rt = create_runtime(); - let result = rt.block_on(async { - let cache_dir = get_cache_dir(); - std::fs::create_dir_all(&cache_dir).ok(); - - let rt_handle = wasmer_wasix::runtime::task_manager::tokio::RuntimeOrHandle::Handle( - tokio::runtime::Handle::current(), - ); - let tokio_task_manager = - Arc::new(wasmer_wasix::runtime::task_manager::tokio::TokioTaskManager::new(rt_handle)); - let module_cache = wasmer_wasix::runtime::module_cache::SharedCache::default() - .with_fallback(wasmer_wasix::runtime::module_cache::FileSystemCache::new( - cache_dir, - tokio_task_manager, - )); - let arc_cache = Arc::new(module_cache); - - let module = wasmer_wasix::runtime::load_module( - &engine, - &arc_cache, - wasmer_wasix::runtime::ModuleInput::Hashed(Cow::Borrowed(&module_data)), - None, - ) - .await - .map_err(|e| anyhow::anyhow!("Failed to load module: {}", e))?; - - tokio::task::block_in_place(move || { - let mut runner = WasiRunner::new(); - runner - .with_mapped_directories([MappedDirectory { - guest: dir.to_string_lossy().to_string(), - host: dir.to_path_buf(), - }]) - .with_mapped_directories([MappedDirectory { - guest: "/lib".to_string(), - host: dir.to_path_buf(), - }]) - .with_current_dir(dir.to_string_lossy().to_string()) - .with_stdin(stdin) - .with_stdout(stdout_capture) - .with_stderr(stderr_capture); - runner.run_wasm( - RuntimeOrEngine::Engine(engine), - wasm_path.to_string_lossy().as_ref(), - module, - hash, - ) - }) - }); - - let stdout = stdout_buffer.lock().unwrap().clone(); - let stderr = stderr_buffer.lock().unwrap().clone(); - let exit_code = match &result { - Ok(_) => Some(0), - Err(e) => { - let error_msg = e.to_string(); - if let Some(code_str) = error_msg.split("ExitCode::").nth(1) { - if let Some(code) = code_str.split_whitespace().next() { - code.parse::().ok() - } else { - None - } - } else { - None - } - } - }; - - Ok(WasmRunResult { - stdout, - stderr, - exit_code, - }) + run_wasm_with_overrides(wasm_path, dir, Some(stdin)) } /// Run a compiled WASM file using WasiRunner From 37aea2a2df55b2810e2b6ec126a40be03c49c115 Mon Sep 17 00:00:00 2001 From: Arshia Date: Mon, 27 Apr 2026 17:37:00 +0400 Subject: [PATCH 5/6] WIP: address review comments --- lib/wasix/src/syscalls/wasi/fd_read.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/wasix/src/syscalls/wasi/fd_read.rs b/lib/wasix/src/syscalls/wasi/fd_read.rs index 4290f5e63a04..fe9d05cae8fa 100644 --- a/lib/wasix/src/syscalls/wasi/fd_read.rs +++ b/lib/wasix/src/syscalls/wasi/fd_read.rs @@ -131,8 +131,8 @@ pub(crate) fn fd_read_internal_handler( Ok(ret) } -/// Extracts validated iov buffer specs from WASM memory as raw host pointers, -/// Raw iov buffer specs extracted from WASM memory before entering `__asyncify`. +/// Stores validated iov buffer specifications from WASM memory as raw host +/// pointers for use while executing inside `__asyncify`. /// /// `mem_base` points to the start of WASM linear memory. Each entry in `bufs` /// is a `(offset_into_memory, length)` pair describing one iov slice. @@ -140,7 +140,10 @@ pub(crate) fn fd_read_internal_handler( /// # Safety /// `mem_base` remains valid only while the calling thread is blocked inside /// `__asyncify` / `block_on`. WASM cannot execute `memory.grow` while the -/// thread is parked there, so the base pointer is stable for the duration. +/// thread is parked there in a single-threaded application. In a multi- +/// threaded application, the memory base can't be changed at all since +/// replicating a new memory base address to all threads in a way that doesn't +/// break them is near-impossible. struct IovBufs { mem_base: *mut u8, bufs: Vec<(usize, usize)>, From a3513f52c73455c78fb97505891633f2624a248d Mon Sep 17 00:00:00 2001 From: Arshia Date: Mon, 27 Apr 2026 17:39:49 +0400 Subject: [PATCH 6/6] Add test for EINTR-during-read Co-authored-by: Copilot --- lib/wasix/tests/wasm_tests/fd_tests.rs | 21 +++++ .../fd_tests/stdin-signal-eintr/build.sh | 4 + .../fd_tests/stdin-signal-eintr/main.c | 94 +++++++++++++++++++ 3 files changed, 119 insertions(+) create mode 100644 lib/wasix/tests/wasm_tests/fd_tests/stdin-signal-eintr/build.sh create mode 100644 lib/wasix/tests/wasm_tests/fd_tests/stdin-signal-eintr/main.c diff --git a/lib/wasix/tests/wasm_tests/fd_tests.rs b/lib/wasix/tests/wasm_tests/fd_tests.rs index 22831c2c397a..e65660ebda0a 100644 --- a/lib/wasix/tests/wasm_tests/fd_tests.rs +++ b/lib/wasix/tests/wasm_tests/fd_tests.rs @@ -32,3 +32,24 @@ fn test_stdin_read_does_not_block_dlopen() { ); assert_eq!(result.exit_code, Some(0)); } + +#[test] +fn test_stdin_read_is_interrupted_by_signal() { + let wasm = run_build_script(file!(), "stdin-signal-eintr").unwrap(); + + // Keep stdin blocked for the duration of the test; the guest reader thread + // should wake because of a signal and return EINTR rather than waiting for + // input or EOF. + let (_pipe_tx, pipe_rx) = Pipe::channel(); + + let result = run_wasm_with_stdin(&wasm, wasm.parent().unwrap(), Box::new(pipe_rx)).unwrap(); + + let stdout = String::from_utf8_lossy(&result.stdout); + assert_eq!( + stdout.trim(), + "reader_ready\nsignal_sent\nhandler_called\nread_errno=EINTR\nsequence_ok", + "stderr: {}", + String::from_utf8_lossy(&result.stderr) + ); + assert_eq!(result.exit_code, Some(0)); +} diff --git a/lib/wasix/tests/wasm_tests/fd_tests/stdin-signal-eintr/build.sh b/lib/wasix/tests/wasm_tests/fd_tests/stdin-signal-eintr/build.sh new file mode 100644 index 000000000000..ddb7e5a11f33 --- /dev/null +++ b/lib/wasix/tests/wasm_tests/fd_tests/stdin-signal-eintr/build.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +set -euo pipefail + +$CC main.c -o main \ No newline at end of file diff --git a/lib/wasix/tests/wasm_tests/fd_tests/stdin-signal-eintr/main.c b/lib/wasix/tests/wasm_tests/fd_tests/stdin-signal-eintr/main.c new file mode 100644 index 000000000000..6e650293b5a5 --- /dev/null +++ b/lib/wasix/tests/wasm_tests/fd_tests/stdin-signal-eintr/main.c @@ -0,0 +1,94 @@ +#include +#include +#include +#include +#include +#include + +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +static int reader_ready = 0; +static volatile sig_atomic_t handler_called = 0; +static int read_errno = 0; + +static void signal_handler(int sig) { + (void)sig; + handler_called = 1; +} + +static void* reader_thread(void* arg) { + (void)arg; + + pthread_mutex_lock(&mutex); + reader_ready = 1; + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + + char buf[64]; + ssize_t ret = read(STDIN_FILENO, buf, sizeof(buf)); + if (ret >= 0) { + fprintf(stderr, "read unexpectedly succeeded: %zd\n", ret); + return (void*)1; + } + + read_errno = errno; + return NULL; +} + +int main(void) { + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + if (sigaction(SIGUSR1, &sa, NULL) != 0) { + perror("sigaction"); + return 1; + } + + pthread_t t; + if (pthread_create(&t, NULL, reader_thread, NULL) != 0) { + perror("pthread_create"); + return 1; + } + + pthread_mutex_lock(&mutex); + while (!reader_ready) pthread_cond_wait(&cond, &mutex); + pthread_mutex_unlock(&mutex); + + printf("reader_ready\n"); + + // Give the reader thread time to actually enter the blocking read. + sleep(1); + + if (pthread_kill(t, SIGUSR1) != 0) { + perror("pthread_kill"); + return 1; + } + + printf("signal_sent\n"); + + void* thread_ret = NULL; + if (pthread_join(t, &thread_ret) != 0) { + perror("pthread_join"); + return 1; + } + if (thread_ret != NULL) { + return 1; + } + + if (!handler_called) { + fprintf(stderr, "signal handler was not called\n"); + return 1; + } + printf("handler_called\n"); + + if (read_errno != EINTR) { + fprintf(stderr, "expected EINTR, got %d\n", read_errno); + return 1; + } + + printf("read_errno=EINTR\n"); + printf("sequence_ok\n"); + return 0; +} \ No newline at end of file