@@ -15,11 +15,12 @@ use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
1515use aws_smithy_runtime_api:: client:: runtime_components:: RuntimeComponents ;
1616use aws_smithy_runtime_api:: shared:: IntoShared ;
1717use aws_smithy_types:: body:: SdkBody ;
18+ use bytes:: Bytes ;
1819use std:: path:: Path ;
1920use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2021use std:: sync:: { Arc , Mutex , MutexGuard } ;
21- use std:: { fs, io} ;
2222use tokio:: task:: JoinHandle ;
23+ use { std:: fs, std:: io} ;
2324
2425/// Recording client
2526///
@@ -28,17 +29,13 @@ use tokio::task::JoinHandle;
2829/// # Example
2930///
3031/// ```rust,ignore
31- /// use aws_smithy_async::rt::sleep::default_async_sleep;
32- /// use aws_smithy_runtime::client::http::hyper_014::default_connector;
3332/// use aws_smithy_http_client::test_util::dvr::RecordingClient;
34- /// use aws_smithy_runtime_api::client::http::HttpConnectorSettingsBuilder;
3533/// use aws_sdk_s3::{Client, Config};
3634///
3735/// #[tokio::test]
3836/// async fn test_content_length_enforcement_is_not_applied_to_head_request() {
39- /// let settings = HttpConnectorSettingsBuilder::default().build();
40- /// let http_client = default_connector(&settings, default_async_sleep()).unwrap();
41- /// let http_client = RecordingClient::new(http_client);
37+ /// // Create a recording client that wraps a default HTTPS connector
38+ /// let http_client = RecordingClient::https();
4239///
4340/// // Since we need to send a real request for this,
4441/// // you'll need to use your real credentials.
@@ -125,14 +122,57 @@ impl RecordingClient {
125122 }
126123}
127124
125+ /// Record a request body, preferring a buffered approach for non-streaming bodies.
126+ ///
127+ /// For request bodies that have their content available as bytes, this buffers
128+ /// the body in memory to avoid timing issues with channel-based streaming.
129+ /// Streaming request bodies fall back to the channel-based recorder.
130+ fn record_request_body (
131+ body : & mut SdkBody ,
132+ event_id : ConnectionId ,
133+ direction : Direction ,
134+ event_bus : Arc < Mutex < Vec < Event > > > ,
135+ ) {
136+ if let Some ( bytes) = body. bytes ( ) {
137+ let data = Bytes :: copy_from_slice ( bytes) ;
138+ let mut events = event_bus. lock ( ) . unwrap ( ) ;
139+ events. push ( Event {
140+ connection_id : event_id,
141+ action : Action :: Data {
142+ data : BodyData :: from ( data. clone ( ) ) ,
143+ direction,
144+ } ,
145+ } ) ;
146+ events. push ( Event {
147+ connection_id : event_id,
148+ action : Action :: Eof {
149+ ok : true ,
150+ direction,
151+ } ,
152+ } ) ;
153+ * body = SdkBody :: from ( data) ;
154+ } else {
155+ record_body ( body, event_id, direction, event_bus) ;
156+ }
157+ }
158+
159+ /// Record body data using a channel-based streaming approach.
160+ ///
161+ /// This spawns a task that reads from the original body and forwards data
162+ /// through a channel to a replacement body. This is used for response bodies
163+ /// and streaming request bodies.
128164fn record_body (
129165 body : & mut SdkBody ,
130166 event_id : ConnectionId ,
131167 direction : Direction ,
132168 event_bus : Arc < Mutex < Vec < Event > > > ,
133169) -> JoinHandle < ( ) > {
134- let ( sender, output_body) = crate :: test_util:: body:: channel_body ( ) ;
170+ // Capture the content length so the replacement body reports the correct size_hint()
171+ let content_length = body. content_length ( ) ;
172+ let ( sender, output_body) =
173+ crate :: test_util:: body:: channel_body_with_size_hint ( content_length) ;
135174 let real_body = std:: mem:: replace ( body, output_body) ;
175+
136176 tokio:: spawn ( async move {
137177 let mut real_body = real_body;
138178 let mut sender = sender;
@@ -147,18 +187,17 @@ fn record_body(
147187 direction,
148188 } ,
149189 } ) ;
150- // This happens if the real connection is closed during recording.
151- // Need to think more carefully if this is the correct thing to log in this
152- // case.
190+ // Forward the data to the replacement body
153191 if sender. send_data ( data) . await . is_err ( ) {
154192 event_bus. lock ( ) . unwrap ( ) . push ( Event {
155193 connection_id : event_id,
156194 action : Action :: Eof {
157195 direction : direction. opposite ( ) ,
158196 ok : false ,
159197 } ,
160- } )
161- } ;
198+ } ) ;
199+ break ;
200+ }
162201 }
163202 None => {
164203 event_bus. lock ( ) . unwrap ( ) . push ( Event {
@@ -190,45 +229,47 @@ fn record_body(
190229impl HttpConnector for RecordingClient {
191230 fn call ( & self , mut request : HttpRequest ) -> HttpConnectorFuture {
192231 let event_id = self . next_id ( ) ;
232+
193233 // A request has three phases:
194- // 1. A "Request" phase. This is initial HTTP request, headers, & URI
195- // 2. A body phase. This may contain multiple data segments.
196- // 3. A finalization phase. An EOF of some sort is sent on the body to indicate that
197- // the channel should be closed.
234+ // 1. A "Request" phase: initial HTTP request (headers, method, URI)
235+ // 2. A body phase: may contain multiple data segments
236+ // 3. A finalization phase: EOF is sent to indicate the body is complete
198237
199- // Phase 1: the initial http request
238+ // Phase 1: Record the initial request
200239 self . data . lock ( ) . unwrap ( ) . push ( Event {
201240 connection_id : event_id,
202241 action : Action :: Request {
203242 request : Request :: from ( & request) ,
204243 } ,
205244 } ) ;
206245
207- // Phase 2: Swap out the real request body for one that will log all traffic that passes
208- // through it
209- // This will also handle phase three when the request body runs out of data .
210- record_body (
246+ // Phase 2 & 3: Record the request body
247+ // Use buffered recording for request bodies to avoid race conditions
248+ // with channel-based streaming that can cause HTTP/2 framing issues .
249+ record_request_body (
211250 request. body_mut ( ) ,
212251 event_id,
213252 Direction :: Request ,
214253 self . data . clone ( ) ,
215254 ) ;
255+
216256 let events = self . data . clone ( ) ;
217- // create a channel we'll use to stream the data while reading it
218257 let resp_fut = self . inner . call ( request) ;
258+
219259 let fut = async move {
220260 let resp = resp_fut. await ;
221261 match resp {
222262 Ok ( mut resp) => {
223- // push the initial response event
263+ // Record the initial response
224264 events. lock ( ) . unwrap ( ) . push ( Event {
225265 connection_id : event_id,
226266 action : Action :: Response {
227267 response : Ok ( Response :: from ( & resp) ) ,
228268 } ,
229269 } ) ;
230270
231- // instrument the body and record traffic
271+ // Record the response body using channel-based streaming
272+ // (this works fine for responses since we're consuming, not producing)
232273 record_body ( resp. body_mut ( ) , event_id, Direction :: Response , events) ;
233274 Ok ( resp)
234275 }
0 commit comments