44//! `HttpConfig` so we don't need to thread context through each call.
55
66use std:: error:: Error ;
7+ use std:: future:: Future ;
8+ use std:: pin:: Pin ;
79use std:: sync:: Arc ;
810
911use bytes:: Bytes ;
@@ -12,6 +14,7 @@ use http_body_util::combinators::UnsyncBoxBody;
1214use http_body_util:: { BodyExt , StreamBody } ;
1315use wasmtime_wasi_http:: p2:: bindings:: http:: types:: ErrorCode as P2ErrorCode ;
1416use wasmtime_wasi_http:: p2:: body:: HyperIncomingBody ;
17+ use wasmtime_wasi_http:: p3:: bindings:: http:: types:: ErrorCode as P3ErrorCode ;
1518
1619use crate :: config:: HttpConfig ;
1720
@@ -55,6 +58,27 @@ impl ActHttpClient {
5558 worker : None ,
5659 } )
5760 }
61+
62+ /// Perform an outgoing request on the p3 WASI HTTP path. Returns the
63+ /// response plus a completion future matching the p3 hook signature.
64+ pub async fn send_p3 (
65+ & self ,
66+ request : http:: Request < UnsyncBoxBody < Bytes , P3ErrorCode > > ,
67+ ) -> Result <
68+ (
69+ http:: Response < UnsyncBoxBody < Bytes , P3ErrorCode > > ,
70+ Pin < Box < dyn Future < Output = Result < ( ) , P3ErrorCode > > + Send > > ,
71+ ) ,
72+ P3ErrorCode ,
73+ > {
74+ let reqwest_req = p3_to_reqwest ( request) ?;
75+ let resp = self
76+ . client
77+ . execute ( reqwest_req)
78+ . await
79+ . map_err ( reqwest_to_p3_error) ?;
80+ reqwest_response_to_p3 ( resp) . await
81+ }
5882}
5983
6084/// Convert an outgoing `hyper::Request` from the p2 WASI HTTP binding into
@@ -181,6 +205,115 @@ fn reqwest_to_p2_error(err: reqwest::Error) -> P2ErrorCode {
181205 P2ErrorCode :: HttpProtocolError
182206}
183207
208+ // ── p3 helpers ────────────────────────────────────────────────────────────
209+
210+ /// Convert an outgoing p3 request into a reqwest::Request. Streaming body,
211+ /// same approach as p2_to_reqwest — we wrap the UnsyncBoxBody as a Stream
212+ /// and feed it through reqwest::Body::wrap_stream, because UnsyncBoxBody
213+ /// is !Sync and wrap() requires Sync.
214+ fn p3_to_reqwest (
215+ request : http:: Request < UnsyncBoxBody < Bytes , P3ErrorCode > > ,
216+ ) -> Result < reqwest:: Request , P3ErrorCode > {
217+ use futures_util:: StreamExt ;
218+ use http_body_util:: BodyStream ;
219+
220+ let ( parts, body) = request. into_parts ( ) ;
221+ let scheme = parts
222+ . uri
223+ . scheme_str ( )
224+ . map ( str:: to_string)
225+ . unwrap_or_else ( || "https" . into ( ) ) ;
226+ let authority = parts
227+ . uri
228+ . authority ( )
229+ . map ( |a| a. to_string ( ) )
230+ . ok_or ( P3ErrorCode :: HttpRequestUriInvalid ) ?;
231+ let path_and_query = parts
232+ . uri
233+ . path_and_query ( )
234+ . map ( |p| p. as_str ( ) )
235+ . unwrap_or ( "/" ) ;
236+ let url_str = format ! ( "{scheme}://{authority}{path_and_query}" ) ;
237+ let url = reqwest:: Url :: parse ( & url_str) . map_err ( |_| P3ErrorCode :: HttpRequestUriInvalid ) ?;
238+ let method = reqwest:: Method :: from_bytes ( parts. method . as_str ( ) . as_bytes ( ) )
239+ . map_err ( |_| P3ErrorCode :: HttpProtocolError ) ?;
240+
241+ let data_stream = BodyStream :: new ( body) . filter_map ( |frame_res| async move {
242+ match frame_res {
243+ Ok ( frame) => frame. into_data ( ) . ok ( ) . map ( Ok :: < _ , std:: io:: Error > ) ,
244+ Err ( _) => Some ( Err ( std:: io:: Error :: other ( "wasi http p3 body stream error" ) ) ) ,
245+ }
246+ } ) ;
247+ let body = reqwest:: Body :: wrap_stream ( data_stream) ;
248+
249+ let mut builder = reqwest:: Client :: new ( ) . request ( method, url) . body ( body) ;
250+ for ( name, value) in parts. headers . iter ( ) {
251+ builder = builder. header ( name, value) ;
252+ }
253+ builder. build ( ) . map_err ( |_| P3ErrorCode :: HttpProtocolError )
254+ }
255+
256+ /// Error mapper for the p3 path. Same taxonomy as p2 but different ErrorCode
257+ /// enum.
258+ fn reqwest_to_p3_error ( err : reqwest:: Error ) -> P3ErrorCode {
259+ if err. is_timeout ( ) {
260+ return P3ErrorCode :: ConnectionTimeout ;
261+ }
262+ if err. is_connect ( ) {
263+ return P3ErrorCode :: ConnectionRefused ;
264+ }
265+ if err. is_redirect ( ) {
266+ return P3ErrorCode :: HttpRequestDenied ;
267+ }
268+ if err. is_decode ( ) {
269+ return P3ErrorCode :: HttpProtocolError ;
270+ }
271+ if err. is_request ( ) {
272+ return P3ErrorCode :: HttpRequestUriInvalid ;
273+ }
274+ if err. is_body ( ) {
275+ return P3ErrorCode :: HttpRequestBodySize ( None ) ;
276+ }
277+ P3ErrorCode :: HttpProtocolError
278+ }
279+
280+ /// Convert a reqwest response to the p3 shape the hook expects:
281+ /// http::Response<UnsyncBoxBody<Bytes, P3ErrorCode>> plus a
282+ /// Future<Output = Result<(), P3ErrorCode>> representing the body
283+ /// completion (reqwest handles this transparently; we return Ok(())
284+ /// immediately since body errors surface through the stream).
285+ async fn reqwest_response_to_p3 (
286+ resp : reqwest:: Response ,
287+ ) -> Result <
288+ (
289+ http:: Response < UnsyncBoxBody < Bytes , P3ErrorCode > > ,
290+ Pin < Box < dyn Future < Output = Result < ( ) , P3ErrorCode > > + Send > > ,
291+ ) ,
292+ P3ErrorCode ,
293+ > {
294+ let status = resp. status ( ) ;
295+ let version = resp. version ( ) ;
296+ let headers = resp. headers ( ) . clone ( ) ;
297+
298+ let byte_stream = resp
299+ . bytes_stream ( )
300+ . map_ok ( hyper:: body:: Frame :: data)
301+ . map_err ( reqwest_to_p3_error) ;
302+ let body: UnsyncBoxBody < Bytes , P3ErrorCode > =
303+ BodyExt :: boxed_unsync ( StreamBody :: new ( byte_stream) ) ;
304+
305+ let mut builder = http:: Response :: builder ( ) . status ( status) . version ( version) ;
306+ if let Some ( hdrs) = builder. headers_mut ( ) {
307+ hdrs. extend ( headers) ;
308+ }
309+ let resp = builder
310+ . body ( body)
311+ . map_err ( |_| P3ErrorCode :: HttpProtocolError ) ?;
312+ let io: Pin < Box < dyn Future < Output = Result < ( ) , P3ErrorCode > > + Send > > =
313+ Box :: pin ( async { Ok ( ( ) ) } ) ;
314+ Ok ( ( resp, io) )
315+ }
316+
184317#[ cfg( test) ]
185318mod tests {
186319 use super :: * ;
0 commit comments