@@ -15,11 +15,12 @@ use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
1515use aws_smithy_runtime_api:: client:: runtime_components:: RuntimeComponents ;
1616use aws_smithy_runtime_api:: shared:: IntoShared ;
1717use aws_smithy_types:: body:: SdkBody ;
18+ use bytes:: Bytes ;
1819use std:: path:: Path ;
1920use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2021use std:: sync:: { Arc , Mutex , MutexGuard } ;
21- use std:: { fs, io} ;
2222use tokio:: task:: JoinHandle ;
23+ use { std:: fs, std:: io} ;
2324
2425/// Recording client
2526///
@@ -28,17 +29,13 @@ use tokio::task::JoinHandle;
2829/// # Example
2930///
3031/// ```rust,ignore
31- /// use aws_smithy_async::rt::sleep::default_async_sleep;
32- /// use aws_smithy_runtime::client::http::hyper_014::default_connector;
3332/// use aws_smithy_http_client::test_util::dvr::RecordingClient;
34- /// use aws_smithy_runtime_api::client::http::HttpConnectorSettingsBuilder;
3533/// use aws_sdk_s3::{Client, Config};
3634///
3735/// #[tokio::test]
3836/// async fn test_content_length_enforcement_is_not_applied_to_head_request() {
39- /// let settings = HttpConnectorSettingsBuilder::default().build();
40- /// let http_client = default_connector(&settings, default_async_sleep()).unwrap();
41- /// let http_client = RecordingClient::new(http_client);
37+ /// // Create a recording client that wraps a default HTTPS connector
38+ /// let http_client = RecordingClient::https();
4239///
4340/// // Since we need to send a real request for this,
4441/// // you'll need to use your real credentials.
@@ -125,14 +122,56 @@ impl RecordingClient {
125122 }
126123}
127124
125+ /// Record a request body, preferring a buffered approach for non-streaming bodies.
126+ ///
127+ /// For request bodies that have their content available as bytes, this buffers
128+ /// the body in memory to avoid timing issues with channel-based streaming.
129+ /// Streaming request bodies fall back to the channel-based recorder.
130+ fn record_request_body (
131+ body : & mut SdkBody ,
132+ event_id : ConnectionId ,
133+ direction : Direction ,
134+ event_bus : Arc < Mutex < Vec < Event > > > ,
135+ ) {
136+ if let Some ( bytes) = body. bytes ( ) {
137+ let data = Bytes :: copy_from_slice ( bytes) ;
138+ let mut events = event_bus. lock ( ) . unwrap ( ) ;
139+ events. push ( Event {
140+ connection_id : event_id,
141+ action : Action :: Data {
142+ data : BodyData :: from ( data. clone ( ) ) ,
143+ direction,
144+ } ,
145+ } ) ;
146+ events. push ( Event {
147+ connection_id : event_id,
148+ action : Action :: Eof {
149+ ok : true ,
150+ direction,
151+ } ,
152+ } ) ;
153+ * body = SdkBody :: from ( data) ;
154+ } else {
155+ record_body ( body, event_id, direction, event_bus) ;
156+ }
157+ }
158+
159+ /// Record body data using a channel-based streaming approach.
160+ ///
161+ /// This spawns a task that reads from the original body and forwards data
162+ /// through a channel to a replacement body. This is used for response bodies
163+ /// and streaming request bodies.
128164fn record_body (
129165 body : & mut SdkBody ,
130166 event_id : ConnectionId ,
131167 direction : Direction ,
132168 event_bus : Arc < Mutex < Vec < Event > > > ,
133169) -> JoinHandle < ( ) > {
134- let ( sender, output_body) = crate :: test_util:: body:: channel_body ( ) ;
170+ // Capture the content length so the replacement body reports the correct size_hint()
171+ let content_length = body. content_length ( ) ;
172+ let ( sender, output_body) = crate :: test_util:: body:: channel_body_with_size_hint ( content_length) ;
135173 let real_body = std:: mem:: replace ( body, output_body) ;
174+
136175 tokio:: spawn ( async move {
137176 let mut real_body = real_body;
138177 let mut sender = sender;
@@ -147,18 +186,17 @@ fn record_body(
147186 direction,
148187 } ,
149188 } ) ;
150- // This happens if the real connection is closed during recording.
151- // Need to think more carefully if this is the correct thing to log in this
152- // case.
189+ // Forward the data to the replacement body
153190 if sender. send_data ( data) . await . is_err ( ) {
154191 event_bus. lock ( ) . unwrap ( ) . push ( Event {
155192 connection_id : event_id,
156193 action : Action :: Eof {
157194 direction : direction. opposite ( ) ,
158195 ok : false ,
159196 } ,
160- } )
161- } ;
197+ } ) ;
198+ break ;
199+ }
162200 }
163201 None => {
164202 event_bus. lock ( ) . unwrap ( ) . push ( Event {
@@ -190,45 +228,47 @@ fn record_body(
190228impl HttpConnector for RecordingClient {
191229 fn call ( & self , mut request : HttpRequest ) -> HttpConnectorFuture {
192230 let event_id = self . next_id ( ) ;
231+
193232 // A request has three phases:
194- // 1. A "Request" phase. This is initial HTTP request, headers, & URI
195- // 2. A body phase. This may contain multiple data segments.
196- // 3. A finalization phase. An EOF of some sort is sent on the body to indicate that
197- // the channel should be closed.
233+ // 1. A "Request" phase: initial HTTP request (headers, method, URI)
234+ // 2. A body phase: may contain multiple data segments
235+ // 3. A finalization phase: EOF is sent to indicate the body is complete
198236
199- // Phase 1: the initial http request
237+ // Phase 1: Record the initial request
200238 self . data . lock ( ) . unwrap ( ) . push ( Event {
201239 connection_id : event_id,
202240 action : Action :: Request {
203241 request : Request :: from ( & request) ,
204242 } ,
205243 } ) ;
206244
207- // Phase 2: Swap out the real request body for one that will log all traffic that passes
208- // through it
209- // This will also handle phase three when the request body runs out of data .
210- record_body (
245+ // Phase 2 & 3: Record the request body
246+ // Use buffered recording for request bodies to avoid race conditions
247+ // with channel-based streaming that can cause HTTP/2 framing issues .
248+ record_request_body (
211249 request. body_mut ( ) ,
212250 event_id,
213251 Direction :: Request ,
214252 self . data . clone ( ) ,
215253 ) ;
254+
216255 let events = self . data . clone ( ) ;
217- // create a channel we'll use to stream the data while reading it
218256 let resp_fut = self . inner . call ( request) ;
257+
219258 let fut = async move {
220259 let resp = resp_fut. await ;
221260 match resp {
222261 Ok ( mut resp) => {
223- // push the initial response event
262+ // Record the initial response
224263 events. lock ( ) . unwrap ( ) . push ( Event {
225264 connection_id : event_id,
226265 action : Action :: Response {
227266 response : Ok ( Response :: from ( & resp) ) ,
228267 } ,
229268 } ) ;
230269
231- // instrument the body and record traffic
270+ // Record the response body using channel-based streaming
271+ // (this works fine for responses since we're consuming, not producing)
232272 record_body ( resp. body_mut ( ) , event_id, Direction :: Response , events) ;
233273 Ok ( resp)
234274 }
0 commit comments