From 21d72a13ad1dfc827bb0443410a27a30b67ee448 Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Tue, 4 Feb 2025 01:25:41 +0900 Subject: [PATCH] fix: error impl on streams/futures Signed-off-by: Victor Adossi --- Cargo.lock | 15 +- Cargo.toml | 9 +- crates/cranelift/src/compiler/component.rs | 79 +-- crates/environ/src/component.rs | 10 +- crates/environ/src/component/dfg.rs | 32 +- crates/environ/src/component/info.rs | 14 + .../environ/src/component/translate/inline.rs | 44 +- crates/environ/src/component/types.rs | 3 + crates/misc/component-async-tests/src/lib.rs | 9 + crates/test-programs/Cargo.toml | 1 - .../bin/async_error_context_stream_callee.rs | 2 +- .../bin/async_error_context_stream_caller.rs | 3 + .../test-programs/src/bin/async_http_echo.rs | 2 +- .../src/bin/async_http_middleware.rs | 4 +- .../src/bin/async_transmit_callee.rs | 6 +- .../src/bin/async_transmit_caller.rs | 6 +- .../src/bin/async_unit_stream_caller.rs | 2 +- .../src/runtime/component/concurrent.rs | 25 +- .../concurrent/futures_and_streams.rs | 454 ++++++++++++++---- .../src/runtime/vm/component/libcalls.rs | 14 +- 20 files changed, 557 insertions(+), 177 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01413bff2..04bc5589b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5262,8 +5262,7 @@ dependencies = [ [[package]] name = "wit-bindgen" version = "0.38.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b550e454e4cce8984398539a94a0226511e1f295b14afdc8f08b4e2e2ff9de3a" +source = "git+https://github.com/vados-cosmonic/wit-bindgen?rev=97b7b12fdc36542aee2350f34c178439bd562bbe#97b7b12fdc36542aee2350f34c178439bd562bbe" dependencies = [ "wit-bindgen-rt 0.38.0", "wit-bindgen-rust-macro", @@ -5272,8 +5271,7 @@ dependencies = [ [[package]] name = "wit-bindgen-core" version = "0.38.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e2f98d49960a416074c5d72889f810ed3032a32ffef5e4760094426fefbfe8" +source = "git+https://github.com/vados-cosmonic/wit-bindgen?rev=97b7b12fdc36542aee2350f34c178439bd562bbe#97b7b12fdc36542aee2350f34c178439bd562bbe" dependencies = [ "anyhow", "heck 0.5.0", @@ -5292,8 +5290,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rt" version = "0.38.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed6f8d372a2d4a1227f2556e051cc24b2a5f15768d53451c84ff91e2527139e3" +source = "git+https://github.com/vados-cosmonic/wit-bindgen?rev=97b7b12fdc36542aee2350f34c178439bd562bbe#97b7b12fdc36542aee2350f34c178439bd562bbe" dependencies = [ "bitflags 2.6.0", "futures", @@ -5303,8 +5300,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust" version = "0.38.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cc49091f84e4f2ace078bbc86082b57e667b9e789baece4b1184e0963382b6e" +source = "git+https://github.com/vados-cosmonic/wit-bindgen?rev=97b7b12fdc36542aee2350f34c178439bd562bbe#97b7b12fdc36542aee2350f34c178439bd562bbe" dependencies = [ "anyhow", "heck 0.5.0", @@ -5319,8 +5315,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust-macro" version = "0.38.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3545a699dc9d72298b2064ce71b771fc10fc6b757d29306b1e54a4283a75abba" +source = "git+https://github.com/vados-cosmonic/wit-bindgen?rev=97b7b12fdc36542aee2350f34c178439bd562bbe#97b7b12fdc36542aee2350f34c178439bd562bbe" dependencies = [ "anyhow", "prettyplease", diff --git a/Cargo.toml b/Cargo.toml index daaf3eb46..f1e0867ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -299,10 +299,10 @@ system-interface = { version = "0.27.1", features = ["cap_std_impls"] } io-lifetimes = { version = "2.0.3", default-features = false } io-extras = "0.18.1" rustix = "0.38.43" -# wit-bindgen: -wit-bindgen = { version = "0.38.0", default-features = false } -wit-bindgen-rt = { version = "0.38.0", default-features = false } -wit-bindgen-rust-macro = { version = "0.38.0", default-features = false } +# TODO: switch back to released wit-bindgen +wit-bindgen = { git = "https://github.com/vados-cosmonic/wit-bindgen", rev = "97b7b12fdc36542aee2350f34c178439bd562bbe", default-features = false} +wit-bindgen-rt = { git = "https://github.com/vados-cosmonic/wit-bindgen", rev = "97b7b12fdc36542aee2350f34c178439bd562bbe", default-features = false} +wit-bindgen-rust-macro = { git = "https://github.com/vados-cosmonic/wit-bindgen", rev = "97b7b12fdc36542aee2350f34c178439bd562bbe", default-features = false} # wasm-tools family: wasmparser = { version = "0.224.0", default-features = false, features = ['simd'] } @@ -583,4 +583,3 @@ wasm-mutate = { git = "https://github.com/bytecodealliance/wasm-tools" } wit-parser = { git = "https://github.com/bytecodealliance/wasm-tools" } wit-component = { git = "https://github.com/bytecodealliance/wasm-tools" } wasm-wave = { git = "https://github.com/bytecodealliance/wasm-tools" } - diff --git a/crates/cranelift/src/compiler/component.rs b/crates/cranelift/src/compiler/component.rs index 352fb11fe..579f0a2d2 100644 --- a/crates/cranelift/src/compiler/component.rs +++ b/crates/cranelift/src/compiler/component.rs @@ -120,17 +120,22 @@ impl<'a> TrampolineCompiler<'a> { Trampoline::TaskYield { async_ } => self.translate_task_yield_call(*async_), Trampoline::SubtaskDrop { instance } => self.translate_subtask_drop_call(*instance), Trampoline::StreamNew { ty } => self.translate_future_or_stream_call( - ty.as_u32(), + &[ty.as_u32()], None, host::stream_new, ir::types::I64, ), - Trampoline::StreamRead { ty, options } => { + Trampoline::StreamRead { + ty, + err_ctx_ty, + options, + } => { + let tys = &[ty.as_u32(), err_ctx_ty.as_u32()]; if let Some(info) = self.flat_stream_element_info(*ty) { - self.translate_flat_stream_call(*ty, options, host::flat_stream_read, &info) + self.translate_flat_stream_call(tys, options, host::flat_stream_read, &info) } else { self.translate_future_or_stream_call( - ty.as_u32(), + tys, Some(options), host::stream_read, ir::types::I64, @@ -138,11 +143,12 @@ impl<'a> TrampolineCompiler<'a> { } } Trampoline::StreamWrite { ty, options } => { + let tys = &[ty.as_u32()]; if let Some(info) = self.flat_stream_element_info(*ty) { - self.translate_flat_stream_call(*ty, options, host::flat_stream_write, &info) + self.translate_flat_stream_call(tys, options, host::flat_stream_write, &info) } else { self.translate_future_or_stream_call( - ty.as_u32(), + tys, Some(options), host::stream_write, ir::types::I64, @@ -156,31 +162,36 @@ impl<'a> TrampolineCompiler<'a> { self.translate_cancel_call(ty.as_u32(), *async_, host::stream_cancel_write) } Trampoline::StreamCloseReadable { ty } => self.translate_future_or_stream_call( - ty.as_u32(), + &[ty.as_u32()], None, host::stream_close_readable, ir::types::I8, ), - Trampoline::StreamCloseWritable { ty } => self.translate_future_or_stream_call( - ty.as_u32(), - None, - host::stream_close_writable, - ir::types::I8, - ), + Trampoline::StreamCloseWritable { ty, err_ctx_ty } => self + .translate_future_or_stream_call( + &[ty.as_u32(), err_ctx_ty.as_u32()], + None, + host::stream_close_writable, + ir::types::I8, + ), Trampoline::FutureNew { ty } => self.translate_future_or_stream_call( - ty.as_u32(), + &[ty.as_u32()], None, host::future_new, ir::types::I64, ), - Trampoline::FutureRead { ty, options } => self.translate_future_or_stream_call( - ty.as_u32(), + Trampoline::FutureRead { + ty, + err_ctx_ty, + options, + } => self.translate_future_or_stream_call( + &[ty.as_u32(), err_ctx_ty.as_u32()], Some(&options), host::future_read, ir::types::I64, ), Trampoline::FutureWrite { ty, options } => self.translate_future_or_stream_call( - ty.as_u32(), + &[ty.as_u32()], Some(options), host::future_write, ir::types::I64, @@ -192,17 +203,18 @@ impl<'a> TrampolineCompiler<'a> { self.translate_cancel_call(ty.as_u32(), *async_, host::future_cancel_write) } Trampoline::FutureCloseReadable { ty } => self.translate_future_or_stream_call( - ty.as_u32(), + &[ty.as_u32()], None, host::future_close_readable, ir::types::I8, ), - Trampoline::FutureCloseWritable { ty } => self.translate_future_or_stream_call( - ty.as_u32(), - None, - host::future_close_writable, - ir::types::I8, - ), + Trampoline::FutureCloseWritable { ty, err_ctx_ty } => self + .translate_future_or_stream_call( + &[ty.as_u32(), err_ctx_ty.as_u32()], + None, + host::future_close_writable, + ir::types::I8, + ), Trampoline::ErrorContextNew { ty, options } => self.translate_error_context_call( *ty, options, @@ -1127,7 +1139,7 @@ impl<'a> TrampolineCompiler<'a> { fn translate_future_or_stream_call( &mut self, - ty: u32, + tys: &[u32], options: Option<&CanonicalOptions>, get_libcall: fn( &dyn TargetIsa, @@ -1168,7 +1180,9 @@ impl<'a> TrampolineCompiler<'a> { ); } - callee_args.push(self.builder.ins().iconst(ir::types::I32, i64::from(ty))); + for ty in tys { + callee_args.push(self.builder.ins().iconst(ir::types::I32, i64::from(*ty))); + } callee_args.extend(args[2..].iter().copied()); @@ -1177,7 +1191,7 @@ impl<'a> TrampolineCompiler<'a> { fn translate_flat_stream_call( &mut self, - ty: TypeStreamTableIndex, + tys: &[u32], options: &CanonicalOptions, get_libcall: fn( &dyn TargetIsa, @@ -1205,16 +1219,19 @@ impl<'a> TrampolineCompiler<'a> { ), None => self.builder.ins().iconst(pointer_type, 0), }, - self.builder - .ins() - .iconst(ir::types::I32, i64::from(ty.as_u32())), + ]; + for ty in tys { + callee_args.push(self.builder.ins().iconst(ir::types::I32, i64::from(*ty))); + } + + callee_args.extend([ self.builder .ins() .iconst(ir::types::I32, i64::from(info.size32)), self.builder .ins() .iconst(ir::types::I32, i64::from(info.align32)), - ]; + ]); callee_args.extend(args[2..].iter().copied()); diff --git a/crates/environ/src/component.rs b/crates/environ/src/component.rs index 4db10e288..f19d2a69f 100644 --- a/crates/environ/src/component.rs +++ b/crates/environ/src/component.rs @@ -108,13 +108,13 @@ macro_rules! foreach_builtin_component_function { #[cfg(feature = "component-model-async")] future_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, ty: u32, future: u32, address: u32) -> u64; #[cfg(feature = "component-model-async")] - future_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, ty: u32, future: u32, address: u32) -> u64; + future_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, ty: u32, err_ctx_ty: u32, future: u32, address: u32) -> u64; #[cfg(feature = "component-model-async")] future_cancel_write(vmctx: vmctx, ty: u32, async_: u8, writer: u32) -> u64; #[cfg(feature = "component-model-async")] future_cancel_read(vmctx: vmctx, ty: u32, async_: u8, reader: u32) -> u64; #[cfg(feature = "component-model-async")] - future_close_writable(vmctx: vmctx, ty: u32, writer: u32, error: u32) -> bool; + future_close_writable(vmctx: vmctx, ty: u32, err_ctx_ty: u32, writer: u32, error: u32) -> bool; #[cfg(feature = "component-model-async")] future_close_readable(vmctx: vmctx, ty: u32, reader: u32) -> bool; #[cfg(feature = "component-model-async")] @@ -122,19 +122,19 @@ macro_rules! foreach_builtin_component_function { #[cfg(feature = "component-model-async")] stream_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, ty: u32, stream: u32, address: u32, count: u32) -> u64; #[cfg(feature = "component-model-async")] - stream_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, ty: u32, stream: u32, address: u32, count: u32) -> u64; + stream_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, ty: u32, err_ctx_ty: u32, stream: u32, address: u32, count: u32) -> u64; #[cfg(feature = "component-model-async")] stream_cancel_write(vmctx: vmctx, ty: u32, async_: u8, writer: u32) -> u64; #[cfg(feature = "component-model-async")] stream_cancel_read(vmctx: vmctx, ty: u32, async_: u8, reader: u32) -> u64; #[cfg(feature = "component-model-async")] - stream_close_writable(vmctx: vmctx, ty: u32, writer: u32, error: u32) -> bool; + stream_close_writable(vmctx: vmctx, ty: u32, err_ctx_ty: u32, writer: u32, error: u32) -> bool; #[cfg(feature = "component-model-async")] stream_close_readable(vmctx: vmctx, ty: u32, reader: u32) -> bool; #[cfg(feature = "component-model-async")] flat_stream_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, ty: u32, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32) -> u64; #[cfg(feature = "component-model-async")] - flat_stream_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, ty: u32, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32) -> u64; + flat_stream_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, ty: u32, err_ctx_ty: u32, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32) -> u64; #[cfg(feature = "component-model-async")] error_context_new(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, ty: u32, debug_msg_address: u32, debug_msg_len: u32) -> u64; #[cfg(feature = "component-model-async")] diff --git a/crates/environ/src/component/dfg.rs b/crates/environ/src/component/dfg.rs index 1a06b5815..1a20f7a85 100644 --- a/crates/environ/src/component/dfg.rs +++ b/crates/environ/src/component/dfg.rs @@ -310,6 +310,7 @@ pub enum Trampoline { }, StreamRead { ty: TypeStreamTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, options: CanonicalOptions, }, StreamWrite { @@ -329,12 +330,14 @@ pub enum Trampoline { }, StreamCloseWritable { ty: TypeStreamTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, FutureNew { ty: TypeFutureTableIndex, }, FutureRead { ty: TypeFutureTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, options: CanonicalOptions, }, FutureWrite { @@ -354,6 +357,7 @@ pub enum Trampoline { }, FutureCloseWritable { ty: TypeFutureTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, ErrorContextNew { ty: TypeComponentLocalErrorContextTableIndex, @@ -797,8 +801,13 @@ impl LinearizeDfg<'_> { instance: *instance, }, Trampoline::StreamNew { ty } => info::Trampoline::StreamNew { ty: *ty }, - Trampoline::StreamRead { ty, options } => info::Trampoline::StreamRead { + Trampoline::StreamRead { + ty, + err_ctx_ty, + options, + } => info::Trampoline::StreamRead { ty: *ty, + err_ctx_ty: *err_ctx_ty, options: self.options(options), }, Trampoline::StreamWrite { ty, options } => info::Trampoline::StreamWrite { @@ -816,12 +825,20 @@ impl LinearizeDfg<'_> { Trampoline::StreamCloseReadable { ty } => { info::Trampoline::StreamCloseReadable { ty: *ty } } - Trampoline::StreamCloseWritable { ty } => { - info::Trampoline::StreamCloseWritable { ty: *ty } + Trampoline::StreamCloseWritable { ty, err_ctx_ty } => { + info::Trampoline::StreamCloseWritable { + ty: *ty, + err_ctx_ty: *err_ctx_ty, + } } Trampoline::FutureNew { ty } => info::Trampoline::FutureNew { ty: *ty }, - Trampoline::FutureRead { ty, options } => info::Trampoline::FutureRead { + Trampoline::FutureRead { + ty, + err_ctx_ty, + options, + } => info::Trampoline::FutureRead { ty: *ty, + err_ctx_ty: *err_ctx_ty, options: self.options(options), }, Trampoline::FutureWrite { ty, options } => info::Trampoline::FutureWrite { @@ -839,8 +856,11 @@ impl LinearizeDfg<'_> { Trampoline::FutureCloseReadable { ty } => { info::Trampoline::FutureCloseReadable { ty: *ty } } - Trampoline::FutureCloseWritable { ty } => { - info::Trampoline::FutureCloseWritable { ty: *ty } + Trampoline::FutureCloseWritable { ty, err_ctx_ty } => { + info::Trampoline::FutureCloseWritable { + ty: *ty, + err_ctx_ty: *err_ctx_ty, + } } Trampoline::ErrorContextNew { ty, options } => info::Trampoline::ErrorContextNew { ty: *ty, diff --git a/crates/environ/src/component/info.rs b/crates/environ/src/component/info.rs index 2fec36924..6bc076605 100644 --- a/crates/environ/src/component/info.rs +++ b/crates/environ/src/component/info.rs @@ -741,6 +741,10 @@ pub enum Trampoline { StreamRead { /// The table index for the specific `stream` type and caller instance. ty: TypeStreamTableIndex, + + /// The table index for the `error-context` type in the caller instance. + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, + /// Any options (e.g. string encoding) to use when storing values to /// memory. options: CanonicalOptions, @@ -787,6 +791,9 @@ pub enum Trampoline { StreamCloseWritable { /// The table index for the specific `stream` type and caller instance. ty: TypeStreamTableIndex, + + /// The table index for the `error-context` type in the caller instance. + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, /// A `future.new` intrinsic to create a new `future` handle of the @@ -800,6 +807,10 @@ pub enum Trampoline { FutureRead { /// The table index for the specific `future` type and caller instance. ty: TypeFutureTableIndex, + + /// The table index for the `error-context` type in the caller instance. + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, + /// Any options (e.g. string encoding) to use when storing values to /// memory. options: CanonicalOptions, @@ -846,6 +857,9 @@ pub enum Trampoline { FutureCloseWritable { /// The table index for the specific `future` type and caller instance. ty: TypeFutureTableIndex, + + /// The table index for the `error-context` type in the caller instance. + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, /// A `error-context.new` intrinsic to create a new `error-context` with a diff --git a/crates/environ/src/component/translate/inline.rs b/crates/environ/src/component/translate/inline.rs index 9b25cc023..7cc50d532 100644 --- a/crates/environ/src/component/translate/inline.rs +++ b/crates/environ/src/component/translate/inline.rs @@ -759,12 +759,17 @@ impl<'a> Inliner<'a> { else { unreachable!() }; + let err_ctx_ty = types.error_context_table_type()?; let options = self.adapter_options(frame, types, options); let options = self.canonical_options(options); - let index = self - .result - .trampolines - .push((*func, dfg::Trampoline::StreamRead { ty, options })); + let index = self.result.trampolines.push(( + *func, + dfg::Trampoline::StreamRead { + ty, + err_ctx_ty, + options, + }, + )); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } StreamWrite { ty, func, options } => { @@ -829,10 +834,11 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let index = self - .result - .trampolines - .push((*func, dfg::Trampoline::StreamCloseWritable { ty })); + let err_ctx_ty = types.error_context_table_type()?; + let index = self.result.trampolines.push(( + *func, + dfg::Trampoline::StreamCloseWritable { ty, err_ctx_ty }, + )); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } FutureNew { ty, func } => { @@ -853,12 +859,17 @@ impl<'a> Inliner<'a> { else { unreachable!() }; + let err_ctx_ty = types.error_context_table_type()?; let options = self.adapter_options(frame, types, options); let options = self.canonical_options(options); - let index = self - .result - .trampolines - .push((*func, dfg::Trampoline::FutureRead { ty, options })); + let index = self.result.trampolines.push(( + *func, + dfg::Trampoline::FutureRead { + ty, + err_ctx_ty, + options, + }, + )); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } FutureWrite { ty, func, options } => { @@ -923,10 +934,11 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let index = self - .result - .trampolines - .push((*func, dfg::Trampoline::FutureCloseWritable { ty })); + let err_ctx_ty = types.error_context_table_type()?; + let index = self.result.trampolines.push(( + *func, + dfg::Trampoline::FutureCloseWritable { ty, err_ctx_ty }, + )); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } ErrorContextNew { func, options } => { diff --git a/crates/environ/src/component/types.rs b/crates/environ/src/component/types.rs index f70b6f3af..60e6c2f76 100644 --- a/crates/environ/src/component/types.rs +++ b/crates/environ/src/component/types.rs @@ -91,13 +91,16 @@ indices! { pub struct TypeListIndex(u32); /// Index pointing to a future type in the component model. pub struct TypeFutureIndex(u32); + /// Index pointing to a future table within a component. /// /// This is analogous to `TypeResourceTableIndex` in that it tracks /// ownership of futures within each (sub)component instance. pub struct TypeFutureTableIndex(u32); + /// Index pointing to a stream type in the component model. pub struct TypeStreamIndex(u32); + /// Index pointing to a stream table within a component. /// /// This is analogous to `TypeResourceTableIndex` in that it tracks diff --git a/crates/misc/component-async-tests/src/lib.rs b/crates/misc/component-async-tests/src/lib.rs index ecc16f8d4..0427a4022 100644 --- a/crates/misc/component-async-tests/src/lib.rs +++ b/crates/misc/component-async-tests/src/lib.rs @@ -1945,4 +1945,13 @@ mod test { &fs::read(test_programs_artifacts::ASYNC_ERROR_CONTEXT_STREAM_CALLEE_COMPONENT).await?; test_run(&compose(caller, callee).await?).await } + + // #[tokio::test] + // async fn async_future_end_err() -> Result<()> { + // let caller = + // &fs::read(test_programs_artifacts::ASYNC_ERROR_CONTEXT_FUTURE_CALLER_COMPONENT).await?; + // let callee = + // &fs::read(test_programs_artifacts::ASYNC_ERROR_CONTEXT_FUTURE_CALLEE_COMPONENT).await?; + // test_run(&compose(caller, callee).await?).await + // } } diff --git a/crates/test-programs/Cargo.toml b/crates/test-programs/Cargo.toml index dbd29a4ca..4fbf971c6 100644 --- a/crates/test-programs/Cargo.toml +++ b/crates/test-programs/Cargo.toml @@ -25,4 +25,3 @@ base64 = "0.21.0" wasip2 = { version = "0.14.0", package = 'wasi' } once_cell = "1.19.0" flate2 = "1.0.28" - diff --git a/crates/test-programs/src/bin/async_error_context_stream_callee.rs b/crates/test-programs/src/bin/async_error_context_stream_callee.rs index e816489cc..15ec6ade8 100644 --- a/crates/test-programs/src/bin/async_error_context_stream_callee.rs +++ b/crates/test-programs/src/bin/async_error_context_stream_callee.rs @@ -15,7 +15,7 @@ mod bindings { } use bindings::wit_stream; use wit_bindgen_rt::async_support::futures::SinkExt; -use wit_bindgen_rt::async_support::{self, StreamReader}; +use wit_bindgen_rt::async_support::{self, error_context_new, StreamReader}; struct Component; diff --git a/crates/test-programs/src/bin/async_error_context_stream_caller.rs b/crates/test-programs/src/bin/async_error_context_stream_caller.rs index 16c930540..f9e16360e 100644 --- a/crates/test-programs/src/bin/async_error_context_stream_caller.rs +++ b/crates/test-programs/src/bin/async_error_context_stream_caller.rs @@ -29,6 +29,9 @@ impl Guest for Component { let Some(_) = stream.next().await else { panic!("unexpected send #1"); }; + let None = stream.next().await else { + panic!("unexpected extra object"); + }; } } diff --git a/crates/test-programs/src/bin/async_http_echo.rs b/crates/test-programs/src/bin/async_http_echo.rs index 90394a65e..bcf395058 100644 --- a/crates/test-programs/src/bin/async_http_echo.rs +++ b/crates/test-programs/src/bin/async_http_echo.rs @@ -45,7 +45,7 @@ impl Handler for Component { async_support::spawn(async move { let mut body_rx = body.stream().unwrap(); - while let Some(chunk) = body_rx.next().await { + while let Some(Ok(chunk)) = body_rx.next().await { pipe_tx.send(chunk).await.unwrap(); } diff --git a/crates/test-programs/src/bin/async_http_middleware.rs b/crates/test-programs/src/bin/async_http_middleware.rs index f65de7cbd..549b39070 100644 --- a/crates/test-programs/src/bin/async_http_middleware.rs +++ b/crates/test-programs/src/bin/async_http_middleware.rs @@ -75,7 +75,7 @@ impl Handler for Component { let mut decoder = DeflateDecoder::new(Vec::new()); - while let Some(chunk) = body_rx.next().await { + while let Some(Ok(chunk)) = body_rx.next().await { decoder.write_all(&chunk).unwrap(); pipe_tx.send(mem::take(decoder.get_mut())).await.unwrap(); } @@ -128,7 +128,7 @@ impl Handler for Component { let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast()); - while let Some(chunk) = body_rx.next().await { + while let Some(Ok(chunk)) = body_rx.next().await { encoder.write_all(&chunk).unwrap(); pipe_tx.send(mem::take(encoder.get_mut())).await.unwrap(); } diff --git a/crates/test-programs/src/bin/async_transmit_callee.rs b/crates/test-programs/src/bin/async_transmit_callee.rs index b1345e53b..8824da59d 100644 --- a/crates/test-programs/src/bin/async_transmit_callee.rs +++ b/crates/test-programs/src/bin/async_transmit_callee.rs @@ -44,16 +44,16 @@ impl Guest for Component { let mut caller_future_rx1 = Some(caller_future_rx1); let mut callee_future_tx1 = Some(callee_future_tx1); - while let Some(messages) = control_rx.next().await { + while let Some(Ok(messages)) = control_rx.next().await { for message in messages { match message { Control::ReadStream(value) => { - assert_eq!(caller_stream_rx.next().await, Some(vec![value])); + assert_eq!(caller_stream_rx.next().await, Some(Ok(vec![value]))); } Control::ReadFuture(value) => { assert_eq!( caller_future_rx1.take().unwrap().into_future().await, - Some(value) + Some(Ok(value)) ); } Control::WriteStream(value) => { diff --git a/crates/test-programs/src/bin/async_transmit_caller.rs b/crates/test-programs/src/bin/async_transmit_caller.rs index 2612ba057..bdc75bfde 100644 --- a/crates/test-programs/src/bin/async_transmit_caller.rs +++ b/crates/test-programs/src/bin/async_transmit_caller.rs @@ -92,7 +92,7 @@ impl Guest for Component { .send(vec![Control::WriteStream("a".into())]) .await .unwrap(); - assert_eq!(callee_stream_rx.next().await, Some(vec!["a".into()])); + assert_eq!(callee_stream_rx.next().await, Some(Ok(vec!["a".into()]))); // Start reading a value from the stream, but cancel the read before telling the peer to write. { @@ -107,7 +107,7 @@ impl Guest for Component { .send(vec![Control::WriteStream("b".into())]) .await .unwrap(); - assert_eq!(callee_stream_rx.next().await, Some(vec!["b".into()])); + assert_eq!(callee_stream_rx.next().await, Some(Ok(vec!["b".into()]))); // Start reading a value from the future, but cancel the read before telling the peer to write. { @@ -124,7 +124,7 @@ impl Guest for Component { .send(vec![Control::WriteFuture("b".into())]) .await .unwrap(); - assert_eq!(callee_future_rx1.into_future().await, Some("b".into())); + assert_eq!(callee_future_rx1.into_future().await, Some(Ok("b".into()))); // Start writing a value to the stream, but drop the stream without telling the peer to read. let send = caller_stream_tx.send(vec!["d".into()]); diff --git a/crates/test-programs/src/bin/async_unit_stream_caller.rs b/crates/test-programs/src/bin/async_unit_stream_caller.rs index 878ea225b..22027e13d 100644 --- a/crates/test-programs/src/bin/async_unit_stream_caller.rs +++ b/crates/test-programs/src/bin/async_unit_stream_caller.rs @@ -29,7 +29,7 @@ impl Guest for Component { let mut rx = unit_stream::run(count).await; let mut received = 0; - while let Some(chunk) = rx.next().await { + while let Some(Ok(chunk)) = rx.next().await { received += chunk.len(); } diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 95b516a1f..feb4453fc 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -279,6 +279,7 @@ pub unsafe trait VMComponentAsyncStore { realloc: *mut VMFuncRef, string_encoding: u8, ty: TypeFutureTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, future: u32, address: u32, ) -> Result; @@ -306,6 +307,7 @@ pub unsafe trait VMComponentAsyncStore { &mut self, instance: &mut ComponentInstance, ty: TypeFutureTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, writer: u32, error: u32, ) -> Result<()>; @@ -346,6 +348,7 @@ pub unsafe trait VMComponentAsyncStore { realloc: *mut VMFuncRef, string_encoding: u8, ty: TypeStreamTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, stream: u32, address: u32, count: u32, @@ -374,6 +377,7 @@ pub unsafe trait VMComponentAsyncStore { &mut self, instance: &mut ComponentInstance, ty: TypeStreamTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, writer: u32, error: u32, ) -> Result<()>; @@ -409,6 +413,7 @@ pub unsafe trait VMComponentAsyncStore { memory: *mut VMMemoryDefinition, realloc: *mut VMFuncRef, ty: TypeStreamTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, payload_size: u32, payload_align: u32, stream: u32, @@ -727,6 +732,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { realloc: *mut VMFuncRef, string_encoding: u8, ty: TypeFutureTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, future: u32, address: u32, ) -> Result { @@ -737,6 +743,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { realloc, string_encoding, TableIndex::Future(ty), + err_ctx_ty, None, future, address, @@ -780,6 +787,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { &mut self, instance: &mut ComponentInstance, ty: TypeFutureTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, writer: u32, error: u32, ) -> Result<()> { @@ -787,6 +795,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { StoreContextMut(self), instance, TableIndex::Future(ty), + err_ctx_ty, writer, error, ) @@ -846,6 +855,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { realloc: *mut VMFuncRef, string_encoding: u8, ty: TypeStreamTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, stream: u32, address: u32, count: u32, @@ -857,6 +867,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { realloc, string_encoding, TableIndex::Stream(ty), + err_ctx_ty, None, stream, address, @@ -900,6 +911,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { &mut self, instance: &mut ComponentInstance, ty: TypeStreamTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, writer: u32, error: u32, ) -> Result<()> { @@ -907,6 +919,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { StoreContextMut(self), instance, TableIndex::Stream(ty), + err_ctx_ty, writer, error, ) @@ -961,6 +974,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { memory: *mut VMMemoryDefinition, realloc: *mut VMFuncRef, ty: TypeStreamTableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, payload_size: u32, payload_align: u32, stream: u32, @@ -974,6 +988,7 @@ unsafe impl VMComponentAsyncStore for StoreInner { realloc, StringEncoding::Utf8 as u8, TableIndex::Stream(ty), + err_ctx_ty, Some(FlatAbi { size: payload_size, align: payload_align, @@ -2286,7 +2301,10 @@ fn task_check( (*instance).component_waitable_tables()[caller_instance].get_mut_by_rep(call.rep()) }; let Some((handle, _)) = entry else { - bail!("handle not found for waitable rep {}", call.rep()); + bail!( + "(waiting for task) handle not found for waitable rep {}", + call.rep() + ); }; let options = unsafe { @@ -2321,7 +2339,10 @@ fn task_check( .get_mut_by_rep(call.rep()) }; let Some((handle, _)) = entry else { - bail!("handle not found for waitable rep {}", call.rep()); + bail!( + "(polling task) handle not found for waitable rep {}", + call.rep() + ); }; let options = unsafe { diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index e81e535db..394a7fb64 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -16,7 +16,7 @@ use { }, AsContextMut, StoreContextMut, ValRaw, }, - anyhow::{anyhow, bail, Context, Result}, + anyhow::{anyhow, bail, ensure, Context, Result}, futures::{ channel::oneshot, future::{self, FutureExt}, @@ -47,6 +47,29 @@ pub(super) enum TableIndex { Future(TypeFutureTableIndex), } +pub(crate) enum HostReadResult { + /// Values sent during the stream + Values(Option>), + /// When host streams end, they may have an attached error-context + #[allow(unused)] + EndOfStream(Option), +} + +/// Action to take after writing +enum PostWrite { + Continue, + Close(Option), +} + +impl HostReadResult { + fn into_values(self) -> Option> { + match self { + HostReadResult::Values(maybe_vec) => maybe_vec, + HostReadResult::EndOfStream(_) => None, + } + } +} + fn payload(ty: TableIndex, types: &Arc) -> Option { match ty { TableIndex::Future(ty) => types[types[ty].ty].payload, @@ -178,7 +201,7 @@ fn accept( transmit.write = WriteState::HostReady { accept: Box::new(accept::(values, offset, transmit_id, tx)), - close: false, + post_write: PostWrite::Continue, }; } @@ -198,15 +221,24 @@ fn accept( } } +/// Write a waitable value from the host +/// +/// # Arguments +/// +/// * `store` - the engine store +/// * `transmit_rep` - Global representation of the transmit object that will be modified +/// * `values` - List of values that should be written +/// * `post_write` - Whether the transmit should be closed after write, possibly with an error context +/// fn host_write>( mut store: S, - rep: u32, + transmit_rep: u32, values: Vec, - mut close: bool, + mut post_write: PostWrite, ) -> Result> { let mut store = store.as_context_mut(); let (tx, rx) = oneshot::channel(); - let transmit_id = TableId::::new(rep); + let transmit_id = TableId::::new(transmit_rep); let mut offset = 0; loop { @@ -214,7 +246,8 @@ fn host_write(values, offset, transmit_id, tx)), - close, + post_write, }; - close = false; + post_write = PostWrite::Continue; } ReadState::GuestReady { @@ -241,6 +274,7 @@ fn host_write unsafe { let types = (*instance.as_ptr()).component_types(); let lower = &mut LowerContext::new( @@ -264,11 +298,7 @@ fn host_write Event::FutureRead, TableIndex::Stream(_) => Event::StreamRead, @@ -301,8 +331,8 @@ fn host_write {} } - if close { - host_close_writer(store, rep)?; + if let PostWrite::Close(err_ctx) = post_write { + host_close_writer(store, transmit_rep, err_ctx)?; } break Ok(rx); @@ -312,17 +342,18 @@ fn host_write>( mut store: S, rep: u32, -) -> Result>>> { +) -> Result>> { let mut store = store.as_context_mut(); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = oneshot::channel::>(); let transmit_id = TableId::::new(rep); let transmit = store .concurrent_state() .table .get_mut(transmit_id) .with_context(|| rep.to_string())?; - let new_state = if let WriteState::Closed = &transmit.write { - WriteState::Closed + + let new_state = if let WriteState::Closed(maybe_err_ctx) = &transmit.write { + WriteState::Closed(*maybe_err_ctx) } else { WriteState::Open }; @@ -340,7 +371,7 @@ pub fn host_read { - _ = tx.send( + _ = tx.send(HostReadResult::Values( ty.map(|ty| { if address % usize::try_from(T::ALIGN32)? != 0 { bail!("write pointer not aligned"); @@ -356,7 +387,7 @@ pub fn host_read { @@ -364,10 +395,15 @@ pub fn host_read>() .map_err(|_| anyhow!("transmit type mismatch"))?; let count = values.len(); - _ = tx.send(Some(values)); + _ = tx.send(HostReadResult::Values(Some(values))); count } - Writer::None => 0, + // In this case, the very first writer that comes along + // was a close stream (with error) + Writer::End(err_ctx) => { + _ = tx.send(HostReadResult::EndOfStream(err_ctx)); + 1 + } }) }), }; @@ -382,18 +418,19 @@ pub fn host_read unsafe { let types = (*instance.as_ptr()).component_types(); let lift = &mut LiftContext::new(store.0, &options, types, instance.as_ptr()); - _ = tx.send( + _ = tx.send(HostReadResult::Values( payload(ty, types) .map(|ty| { let list = &WasmList::new(address, count, lift, ty)?; T::load_list(lift, list) }) .transpose()?, - ); + )); log::trace!( "remove write child of {}: {}", @@ -405,8 +442,9 @@ pub fn host_read { + WriteState::HostReady { accept, post_write } => { accept(Reader::Host { accept: Box::new(move |any| { - _ = tx.send(Some( + _ = tx.send(HostReadResult::Values(Some( *any.downcast() .map_err(|_| anyhow!("transmit type mismatch"))?, - )); + ))); Ok(()) }), })?; - if close { - store.concurrent_state().table.get_mut(transmit_id)?.write = WriteState::Closed; + if let PostWrite::Close(err_ctx) = post_write { + store.concurrent_state().table.get_mut(transmit_id)?.write = + WriteState::Closed(err_ctx); } } - WriteState::Closed => { + WriteState::Closed(_) => { host_close_reader(store, rep)?; } } @@ -467,7 +506,7 @@ fn host_cancel_write>(mut store: S, rep: u32) -> Re transmit.write = WriteState::Open; } - WriteState::Open | WriteState::Closed => { + WriteState::Open | WriteState::Closed(_) => { bail!("stream or future write canceled when no write is pending") } } @@ -506,41 +545,105 @@ fn host_cancel_read>(mut store: S, rep: u32) -> Res Ok(0) } -fn host_close_writer>(mut store: S, rep: u32) -> Result<()> { +/// Close the writer end of a Future or Stream +/// +/// # Arguments +/// +/// * `store` - the store for the component +/// * `transmit_rep` - A component-global representation of the transmit state for the writer that should be closed +/// * `err_ctx` - An optional component-global representation of an error context to use as the final value of the writer (`0` if none) +/// +fn host_close_writer>( + mut store: S, + transmit_rep: u32, + err_ctx: Option, +) -> Result<()> { let mut store = store.as_context_mut(); - let transmit_id = TableId::::new(rep); + let transmit_id = TableId::::new(transmit_rep); let transmit = store.concurrent_state().table.get_mut(transmit_id)?; + // If there was a write in progress, update it with the changed information match &mut transmit.write { - WriteState::GuestReady { close, .. } => { - *close = true; + // For guest-level streams that were waiting to write, we must update to close on the *next* read. + WriteState::GuestReady { post_write, .. } => { + *post_write = PostWrite::Close(err_ctx); } - WriteState::HostReady { close, .. } => { - *close = true; + // For host-level streams that were waiting for a write, we must update to close on the *next* read. + WriteState::HostReady { post_write, .. } => { + *post_write = PostWrite::Close(err_ctx); } + // If the write state was simply opened (and a read has not been attempted), we can immediately close v @ WriteState::Open => { - *v = WriteState::Closed; + *v = WriteState::Closed(err_ctx); } - WriteState::Closed => unreachable!(), + // It should be impossible to double-close a writable + WriteState::Closed(_) => unreachable!("write state is already closed"), } + // If the existing read state is closed, then there's nothing to read + // and we can keep it that way. + // + // If the read state was any other state, then we must set the new state to open + // to indicate that there *is* data to be read let new_state = if let ReadState::Closed = &transmit.read { ReadState::Closed } else { ReadState::Open }; + // Swap in the new read state match mem::replace(&mut transmit.read, new_state) { + // If the guest was ready to read, then we cannot close the reader (or writer) + // we must deliver the event, and update the state associated with the handle to + // represent that a read must be performed ReadState::GuestReady { ty, + err_ctx_ty, instance, handle, caller, .. } => unsafe { + // Lift the global err_ctx that we're receiving into an error context + // reference that the reader(caller) will + let reader_state_tbl = (*instance.as_ptr()) + .component_error_context_tables() + .get_mut(err_ctx_ty) + .context("retrieving component-local error context during host writer close")?; + + let push_param = match err_ctx { + None => CLOSED, + Some(err_ctx) => { + let rep = err_ctx.as_u32(); + // Get or insert the global error context into this guest's component-local error context tracking + let (local_err_ctx, _) = match reader_state_tbl.get_mut_by_rep(rep) { + Some(r) => { + // If the error already existed, since we're about to read it, increase + // the local component-wide reference count + (*r.1).0 += 1; + r + } + None => { + // If the error context was not already tracked locally, start tracking + reader_state_tbl.insert(rep, LocalErrorContextRefCount(1))?; + reader_state_tbl.get_mut_by_rep(rep).context( + "retrieving inserted local error context during guest read", + )? + } + }; + + // NOTE: we do not have to manage the global error context ref count here, because + // it was preemptively increased, and the guest that is ready to consume this + // will account for the extra global context ref count. + + CLOSED | local_err_ctx as usize + } + }; + + // Ensure the final read of the guest is queued, with appropriate closure indicator push_event( store, transmit_id.rep(), @@ -548,48 +651,63 @@ fn host_close_writer>(mut store: S, rep: u32) -> Re TableIndex::Future(_) => Event::FutureRead, TableIndex::Stream(_) => Event::StreamRead, }, - CLOSED, + push_param, caller, ); *get_mut_by_index(&mut *instance.as_ptr(), ty, handle)?.1 = StreamFutureState::Read; }, + // If the host was ready to read, and the writer end is being closed (host->host write?) + // signal to the reader that we've reached the end of the stream, and close the reader immediately ReadState::HostReady { accept } => { - accept(Writer::None)?; - - host_close_reader(store, rep)?; + accept(Writer::End(err_ctx))?; + host_close_reader(store, transmit_rep)?; } + // If the read state is open, then there are no registered readers of the stream/future ReadState::Open => {} + // If the read state was already closed, then we can remove the transmit state completely + // (both writer and reader have been closed) ReadState::Closed => { - log::trace!("host_close_writer delete {}", transmit_id.rep()); + log::trace!("host_close_writer delete {transmit_rep}"); store.concurrent_state().table.delete(transmit_id)?; } } Ok(()) } -fn host_close_reader>(mut store: S, rep: u32) -> Result<()> { +/// Close the reader end of a Future or Stream +/// +/// # Arguments +/// +/// * `store` - the store for the component +/// * `transmit_rep` - A global-component-level representation of the transmit state for the reader that should be closed +/// +fn host_close_reader>(mut store: S, transmit_rep: u32) -> Result<()> { let mut store = store.as_context_mut(); - let transmit_id = TableId::::new(rep); + let transmit_id = TableId::::new(transmit_rep); let transmit = store.concurrent_state().table.get_mut(transmit_id)?; transmit.read = ReadState::Closed; - let new_state = if let WriteState::Closed = &transmit.write { - WriteState::Closed + // If the write end is already closed, it should stay closed, + // otherwise, it should be opened. + let new_state = if let WriteState::Closed(err_ctx) = &transmit.write { + WriteState::Closed(*err_ctx) } else { WriteState::Open }; match mem::replace(&mut transmit.write, new_state) { + // If a guest is waiting to write, ensure that the next write + // reflects the closed state of the stream WriteState::GuestReady { ty, instance, handle, - close, + post_write, caller, .. } => unsafe { @@ -604,7 +722,7 @@ fn host_close_reader>(mut store: S, rep: u32) -> Re caller, ); - if close { + if let PostWrite::Close(_) = post_write { store.concurrent_state().table.delete(transmit_id)?; } else { *get_mut_by_index(&mut *instance.as_ptr(), ty, handle)?.1 = @@ -612,18 +730,20 @@ fn host_close_reader>(mut store: S, rep: u32) -> Re } }, - WriteState::HostReady { accept, close } => { + // If the reader is closed, we can ignore the waiting write from host + WriteState::HostReady { + accept, post_write, .. + } => { accept(Reader::None)?; - - if close { + if let PostWrite::Close(_) = post_write { store.concurrent_state().table.delete(transmit_id)?; } } WriteState::Open => {} - WriteState::Closed => { - log::trace!("host_close_reader delete {}", transmit_id.rep()); + WriteState::Closed(_) => { + log::trace!("host_close_reader delete {transmit_rep}"); store.concurrent_state().table.delete(transmit_id)?; } } @@ -649,7 +769,7 @@ impl FutureWriter { T: func::Lower + Send + Sync + 'static, { Ok(Promise(Box::pin( - host_write(store, self.rep, vec![value], true)?.map(drop), + host_write(store, self.rep, vec![value], PostWrite::Close(None))?.map(drop), ))) } @@ -657,8 +777,36 @@ impl FutureWriter { /// /// If this object is dropped without calling either this method or `write`, /// any read on the readable end will remain pending forever. + /// + /// # Arguments + /// + /// * `store` - the store associated with the component instance + /// * `err_ctx` - the handle of an error context that should be reported with the stream closure (`0` if none) + /// pub fn close>(self, store: S) -> Result<()> { - host_close_writer(store, self.rep) + self.close_with_error(store, 0) + } + + /// Close this object without writing a value. + /// + /// If this object is dropped without calling either this method or `write`, + /// any read on the readable end will remain pending forever. + /// + /// # Arguments + /// + /// * `store` - the store associated with the component instance + /// * `err_ctx` - the handle of an error context that should be reported with the stream closure (`0` if none) + /// + pub fn close_with_error>( + self, + store: S, + err_ctx: u32, + ) -> Result<()> { + host_close_writer( + store, + self.rep, + (err_ctx != 0).then(|| TypeComponentGlobalErrorContextTableIndex::from_u32(err_ctx)), + ) } } @@ -683,7 +831,7 @@ impl FutureReader { { Ok(Promise(Box::pin(host_read(store, self.rep)?.map(|v| { v.ok() - .and_then(|v| v.map(|v| v.into_iter().next().unwrap())) + .and_then(|v| v.into_values().map(|v| v.into_iter().next().unwrap())) })))) } @@ -841,7 +989,7 @@ impl StreamWriter { T: func::Lower + Send + Sync + 'static, { Ok(Promise(Box::pin( - host_write(store, self.rep, values, false)?.map(move |_| self), + host_write(store, self.rep, values, PostWrite::Continue)?.map(move |_| self), ))) } @@ -849,8 +997,36 @@ impl StreamWriter { /// /// If this object is dropped without calling this method, any read on the /// readable end will remain pending forever. + /// + /// # Arguments + /// + /// * `store` - the store associated with the component instance + /// * `err_ctx` - the handle of an error context that should be reported with the stream closure (`0` if none) + /// pub fn close>(self, store: S) -> Result<()> { - host_close_writer(store, self.rep) + self.close_with_error(store, 0) + } + + /// Close this object with a final error + /// + /// If this object is dropped without calling this method, any read on the + /// readable end will remain pending forever. + /// + /// # Arguments + /// + /// * `store` - the store associated with the component instance + /// * `err_ctx` - the handle of an error context that should be reported with the stream closure (`0` if none) + /// + pub fn close_with_error>( + self, + store: S, + err_ctx: u32, + ) -> Result<()> { + host_close_writer( + store, + self.rep, + (err_ctx != 0).then(|| TypeComponentGlobalErrorContextTableIndex::from_u32(err_ctx)), + ) } } @@ -876,9 +1052,9 @@ impl StreamReader { where T: func::Lift + Sync + Send + 'static, { - Ok(Promise(Box::pin( - host_read(store, self.rep)?.map(move |v| v.ok().and_then(|v| v.map(|v| (self, v)))), - ))) + Ok(Promise(Box::pin(host_read(store, self.rep)?.map( + move |v| v.ok().and_then(|v| v.into_values().map(|v| (self, v))), + )))) } /// Convert this `StreamReader` into a [`Val`]. @@ -1146,19 +1322,26 @@ enum WriteState { instance: SendSyncPtr, handle: u32, caller: TableId, - close: bool, + post_write: PostWrite, }, HostReady { accept: Box Result + Send + Sync>, - close: bool, + post_write: PostWrite, }, - Closed, + Closed(Option), } +/// Read state of a transmit channel +/// +/// Channels generally start as open, and once they are read for data by either +/// a guest or host, we transition into `GuestReady` or `HostReady` respectively. +/// +/// Once a transmit channel is closed, it should *stay* closed. enum ReadState { Open, GuestReady { ty: TableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, flat_abi: Option, options: Options, address: usize, @@ -1174,6 +1357,7 @@ enum ReadState { } enum Writer<'a> { + /// Writes that are queued from guests Guest { lift: &'a mut LiftContext<'a>, ty: Option, @@ -1183,7 +1367,7 @@ enum Writer<'a> { Host { values: Box, }, - None, + End(Option), } struct RawLowerContext<'a> { @@ -1396,6 +1580,7 @@ pub(super) fn guest_write( instance: _, handle: read_handle, caller: read_caller, + .. } => { assert_eq!(flat_abi, read_flat_abi); @@ -1479,7 +1664,7 @@ pub(super) fn guest_write( instance: SendSyncPtr::new(NonNull::new(instance).unwrap()), handle, caller, - close: false, + post_write: PostWrite::Continue, }; BLOCKED @@ -1504,6 +1689,7 @@ pub(super) fn guest_read( realloc: *mut VMFuncRef, string_encoding: u8, ty: TableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, flat_abi: Option, handle: u32, address: u32, @@ -1529,8 +1715,8 @@ pub(super) fn guest_read( *state = StreamFutureState::Busy; let transmit_id = TableId::::new(rep); let transmit = cx.concurrent_state().table.get_mut(transmit_id)?; - let new_state = if let WriteState::Closed = &transmit.write { - WriteState::Closed + let new_state = if let WriteState::Closed(err_ctx) = &transmit.write { + WriteState::Closed(*err_ctx) } else { WriteState::Open }; @@ -1545,7 +1731,7 @@ pub(super) fn guest_read( instance: _, handle: write_handle, caller: write_caller, - close, + post_write, } => { assert_eq!(flat_abi, write_flat_abi); @@ -1575,8 +1761,9 @@ pub(super) fn guest_read( .table .remove_child(transmit_id, write_caller)?; - if close { - cx.concurrent_state().table.get_mut(transmit_id)?.write = WriteState::Closed; + if let PostWrite::Close(err_ctx) = post_write { + cx.concurrent_state().table.get_mut(transmit_id)?.write = + WriteState::Closed(err_ctx); } else { unsafe { *get_mut_by_index(&mut *instance, write_ty, write_handle)?.1 = @@ -1598,7 +1785,7 @@ pub(super) fn guest_read( count } - WriteState::HostReady { accept, close } => { + WriteState::HostReady { accept, post_write } => { let count = accept(Reader::Guest { lower: RawLowerContext { store: cx.0.traitobj().as_ptr(), @@ -1611,8 +1798,9 @@ pub(super) fn guest_read( count, })?; - if close { - cx.concurrent_state().table.get_mut(transmit_id)?.write = WriteState::Closed; + if let PostWrite::Close(err_ctx) = post_write { + cx.concurrent_state().table.get_mut(transmit_id)?.write = + WriteState::Closed(err_ctx); } count @@ -1643,12 +1831,60 @@ pub(super) fn guest_read( instance: SendSyncPtr::new(NonNull::new(instance).unwrap()), handle, caller, + err_ctx_ty, }; BLOCKED } - WriteState::Closed => CLOSED, + WriteState::Closed(err_ctx) => { + match err_ctx { + // If no error context is provided, closed can be sent + None => CLOSED, + // If an error context was present, we must ensure it's created and bitwise OR w/ CLOSED + Some(err_ctx) => { + // Lower the global error context that was saved into write state into a component-local + // error context handle + let state_tbl = unsafe { + (*instance) + .component_error_context_tables() + .get_mut(err_ctx_ty) + .context( + "retrieving local error context table during closed read w/ error", + ) + }?; + + // Get or insert the global error context into this guest's component-local error context tracking + let (local_err_ctx, _) = match state_tbl.get_mut_by_rep(err_ctx.as_u32()) { + Some(r) => { + // If the error already existed, since we're about to read it, increase + // the local component-wide reference count + (*r.1).0 += 1; + r + } + None => { + let rep = err_ctx.as_u32(); + // If the error context was not already tracked locally, start tracking + state_tbl.insert(rep, LocalErrorContextRefCount(1))?; + state_tbl.get_mut_by_rep(rep).context( + "retrieving inserted local error context during guest read", + )? + } + }; + + // NOTE: During write closure when the error context was provided, we + // incremented the global count to ensure the error context would not be garbage collected, + // if dropped by the sending component. + // + // Since we did that preemptively, we do not need to increment the global ref count even + // after this increase in local ref count. + // + // If a reader (this reader) *never* comes along, when the relevant stream/future is closed, + // the writer state will indicate that the global count must be amended. + CLOSED | local_err_ctx as usize + } + } + } }; if result != BLOCKED { @@ -1713,16 +1949,13 @@ pub(super) fn guest_cancel_read( } pub(super) fn guest_close_writable( - cx: StoreContextMut, + mut cx: StoreContextMut, instance: &mut ComponentInstance, ty: TableIndex, + err_ctx_ty: TypeComponentLocalErrorContextTableIndex, writer: u32, - error: u32, + err_ctx_idx: u32, ) -> Result<()> { - if error != 0 { - bail!("todo: closing writable streams and futures with errors not yet implemented"); - } - let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = state_table(instance, ty).remove_by_index(writer)? else { @@ -1735,7 +1968,50 @@ pub(super) fn guest_close_writable( } StreamFutureState::Busy => bail!("cannot drop busy stream or future"), } - host_close_writer(cx, rep) + + // Resolve the error context + let global_err_ctx = match err_ctx_idx { + // If no error context was provided, we can pass that along as-is + 0 => None, + + // If a non-zero error context was provided, first ensure it's valid, + // then lift the guest-local (component instance local) error context reference + // to the component-global level. + // + // This ensures that after closing the writer, when the eventual reader appears + // we can lower the component-global error context into a reader-local error context + err_ctx => { + // Look up the local component error context + let state_tbl = (*instance) + .component_error_context_tables() + .get_mut(err_ctx_ty) + .context("retrieving local error context during guest close writable")?; + // NOTE: the rep below is the component-global error context index + let (rep, _) = state_tbl.get_mut_by_index(err_ctx_idx)?; + + let global_err_ctx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep); + // Closing the writer with an error context means that a reader must later + // come along and discover the error context even once the writer goes away. + // + // Here we preemptively increase the ref count to ensure the error context + // won't be removed by the time the reader comes along + let GlobalErrorContextRefCount(global_count) = (*instance) + .component_global_error_context_ref_counts() + .get_mut(&global_err_ctx) + .context("retrieving global error context ref count during guest close writable")?; + *global_count += 1; + ensure!( + cx.concurrent_state() + .table + .get(TableId::::new(rep)) + .is_ok(), + "missing global error context state [{rep}] for local error context [{err_ctx}] during guest close writable" + ); + Some(global_err_ctx) + } + }; + + host_close_writer(cx, rep, global_err_ctx) } pub(super) fn guest_close_readable( diff --git a/crates/wasmtime/src/runtime/vm/component/libcalls.rs b/crates/wasmtime/src/runtime/vm/component/libcalls.rs index 8a383e0ca..edd1291b8 100644 --- a/crates/wasmtime/src/runtime/vm/component/libcalls.rs +++ b/crates/wasmtime/src/runtime/vm/component/libcalls.rs @@ -7,7 +7,9 @@ use core::cell::Cell; use core::convert::Infallible; use core::ptr::NonNull; use core::slice; -use wasmtime_environ::component::TypeResourceTableIndex; +use wasmtime_environ::component::{ + TypeComponentLocalErrorContextTableIndex, TypeResourceTableIndex, +}; const UTF16_TAG: usize = 1 << 31; @@ -858,6 +860,7 @@ unsafe fn future_read( realloc: *mut u8, string_encoding: u8, ty: u32, + err_ctx_ty: u32, future: u32, address: u32, ) -> Result { @@ -868,6 +871,7 @@ unsafe fn future_read( realloc.cast::(), string_encoding, wasmtime_environ::component::TypeFutureTableIndex::from_u32(ty), + TypeComponentLocalErrorContextTableIndex::from_u32(err_ctx_ty), future, address, ) @@ -916,6 +920,7 @@ unsafe fn future_cancel_read( unsafe fn future_close_writable( vmctx: NonNull, ty: u32, + err_ctx_ty: u32, writer: u32, error: u32, ) -> Result<()> { @@ -925,6 +930,7 @@ unsafe fn future_close_writable( .future_close_writable( instance, wasmtime_environ::component::TypeFutureTableIndex::from_u32(ty), + TypeComponentLocalErrorContextTableIndex::from_u32(err_ctx_ty), writer, error, ) @@ -990,6 +996,7 @@ unsafe fn stream_read( realloc: *mut u8, string_encoding: u8, ty: u32, + err_ctx_ty: u32, stream: u32, address: u32, count: u32, @@ -1001,6 +1008,7 @@ unsafe fn stream_read( realloc.cast::(), string_encoding, wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), + TypeComponentLocalErrorContextTableIndex::from_u32(err_ctx_ty), stream, address, count, @@ -1050,6 +1058,7 @@ unsafe fn stream_cancel_read( unsafe fn stream_close_writable( vmctx: NonNull, ty: u32, + err_ctx_ty: u32, writer: u32, error: u32, ) -> Result<()> { @@ -1059,6 +1068,7 @@ unsafe fn stream_close_writable( .stream_close_writable( instance, wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), + TypeComponentLocalErrorContextTableIndex::from_u32(err_ctx_ty), writer, error, ) @@ -1117,6 +1127,7 @@ unsafe fn flat_stream_read( memory: *mut u8, realloc: *mut u8, ty: u32, + err_ctx_ty: u32, payload_size: u32, payload_align: u32, stream: u32, @@ -1131,6 +1142,7 @@ unsafe fn flat_stream_read( memory.cast::(), realloc.cast::(), wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), + TypeComponentLocalErrorContextTableIndex::from_u32(err_ctx_ty), payload_size, payload_align, stream,