Skip to content

Commit

Permalink
wasmtime-wasi: Fix spurious wake-ups in blocking_ of InputStream
Browse files Browse the repository at this point in the history
…and `OutputStream` (#10113)

* fix InputStream blocking_read spurious wake up

* add MAX_BLOCKING_ATTEMPTS and also wrap for `write_ready`

* avoid loop when reading zero size
  • Loading branch information
Heap-Hop authored Feb 3, 2025
1 parent a0338af commit 473675c
Showing 1 changed file with 40 additions and 4 deletions.
44 changes: 40 additions & 4 deletions crates/wasi-io/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ use alloc::boxed::Box;
use anyhow::Result;
use bytes::Bytes;

/// `Pollable::ready()` for `InputStream` and `OutputStream` may return
/// prematurely due to `io::ErrorKind::WouldBlock`.
///
/// To ensure that `blocking_` functions return a valid non-empty result,
/// we use a loop with a maximum iteration limit.
///
/// This constant defines the maximum number of loop attempts allowed.
const MAX_BLOCKING_ATTEMPTS: u8 = 10;

/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
/// bytestream which can be read from.
#[async_trait::async_trait]
Expand All @@ -24,8 +33,24 @@ pub trait InputStream: Pollable {
/// Similar to `read`, except that it blocks until at least one byte can be
/// read.
async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
self.ready().await;
self.read(size)
if size == 0 {
self.ready().await;
return self.read(size);
}

let mut i = 0;
loop {
// This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
self.ready().await;
let data = self.read(size)?;
if !data.is_empty() {
return Ok(data);
}
if i >= MAX_BLOCKING_ATTEMPTS {
return Err(StreamError::trap("max blocking attempts exceeded"));
}
i += 1;
}
}

/// Same as the `read` method except that bytes are skipped.
Expand Down Expand Up @@ -239,8 +264,19 @@ pub trait OutputStream: Pollable {
/// Simultaneously waits for this stream to be writable and then returns how
/// much may be written or the last error that happened.
async fn write_ready(&mut self) -> StreamResult<usize> {
self.ready().await;
self.check_write()
let mut i = 0;
loop {
// This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
self.ready().await;
let n = self.check_write()?;
if n > 0 {
return Ok(n);
}
if i >= MAX_BLOCKING_ATTEMPTS {
return Err(StreamError::trap("max blocking attempts exceeded"));
}
i += 1;
}
}

/// Cancel any asynchronous work and wait for it to wrap up.
Expand Down

0 comments on commit 473675c

Please sign in to comment.