@@ -4,7 +4,8 @@ use crate::observation::ObservationBuilder;
44use axum:: body:: Body ;
55use axum:: extract:: Request ;
66use axum:: response:: Response ;
7- use bytes:: { Bytes , BytesMut } ;
7+ use bytes:: Bytes ;
8+ use bytes:: BytesMut ;
89use http:: header:: HeaderMap ;
910use http:: header:: HeaderName ;
1011use http:: header:: AUTHORIZATION ;
@@ -27,8 +28,9 @@ use tower::Layer;
2728use tower:: Service ;
2829
2930/// State shared between the streaming body and the observation emitter.
30- /// This is used to collect data as it streams and emit the observation when complete.
31- /// The observation is emitted in the Drop implementation when the body is finished.
31+ /// This is used to collect data as it streams and emit the observation when
32+ /// complete. The observation is emitted in the Drop implementation when the
33+ /// body is finished.
3234struct StreamingObserverState {
3335 buffer : BytesMut ,
3436 content_type : String ,
@@ -71,14 +73,12 @@ impl Drop for StreamingObserverState {
7173 size : bytes. len ( ) ,
7274 } ;
7375
74- let mut response_body_builder = ObservationBuilder :: new ( "http/response/body" ) ;
75- response_body_builder
76+ ObservationBuilder :: new ( "http/response/body" )
7677 . label ( "http/response" )
7778 . label ( "http/response/body" )
7879 . metadata ( "status" , & self . status_code . to_string ( ) )
7980 . log_level ( self . log_level )
80- . payload ( payload)
81- . build_with_execution ( & self . execution ) ;
81+ . payload_with_execution ( payload, & self . execution ) ;
8282 }
8383}
8484
@@ -246,31 +246,27 @@ where
246246 } ;
247247
248248 let ( parts, body) = req. into_parts ( ) ;
249- let mut request_headers_builder = ObservationBuilder :: new ( "http/request/headers" ) ;
250- request_headers_builder
249+ ObservationBuilder :: new ( "http/request/headers" )
251250 . label ( "http/request" )
252251 . label ( "http/request/headers" )
253252 . metadata ( "method" , parts. method . to_string ( ) )
254253 . metadata ( "uri" , parts. uri . to_string ( ) )
255254 . serde ( & json ! ( filter_headers(
256255 & parts. headers,
257256 & config. excluded_headers
258- ) ) )
259- . build ( ) ;
257+ ) ) ) ;
260258
261259 let request_body_bytes = body
262260 . collect ( )
263261 . await
264262 . map ( |collected| collected. to_bytes ( ) )
265263 . unwrap_or_else ( |_| Bytes :: new ( ) ) ;
266- let mut request_body_builder = ObservationBuilder :: new ( "http/request/body" ) ;
267- request_body_builder
264+ ObservationBuilder :: new ( "http/request/body" )
268265 . label ( "http/request" )
269266 . label ( "http/request/body" )
270267 . metadata ( "method" , parts. method . to_string ( ) )
271268 . metadata ( "uri" , parts. uri . to_string ( ) )
272- . payload ( bytes_to_payload ( & request_body_bytes, & parts. headers ) )
273- . build ( ) ;
269+ . payload ( bytes_to_payload ( & request_body_bytes, & parts. headers ) ) ;
274270
275271 let response = inner
276272 . call ( Request :: from_parts ( parts, Body :: from ( request_body_bytes) ) )
@@ -283,28 +279,31 @@ where
283279 500 ..=599 => LogLevel :: Error ,
284280 _ => LogLevel :: Info ,
285281 } ;
286- let mut response_headers_builder = ObservationBuilder :: new ( "http/response/headers" ) ;
287- response_headers_builder
282+ ObservationBuilder :: new ( "http/response/headers" )
288283 . label ( "http/response" )
289284 . label ( "http/response/headers" )
290285 . metadata ( "status" , & parts. status . as_u16 ( ) . to_string ( ) )
291286 . log_level ( log_level)
292287 . serde ( & json ! ( filter_headers(
293288 & parts. headers,
294289 & config. excluded_headers
295- ) ) )
296- . build ( ) ;
290+ ) ) ) ;
297291
298- // Wrap the response body in a streaming observer that captures data as it flows through
299- // and emits the observation when the stream completes
292+ // Wrap the response body in a streaming observer that captures data as it flows
293+ // through and emits the observation when the stream completes
300294 let content_type = parts
301295 . headers
302296 . get ( CONTENT_TYPE )
303297 . and_then ( |v| v. to_str ( ) . ok ( ) )
304298 . unwrap_or ( "application/octet-stream" )
305299 . to_string ( ) ;
306- let streaming_body =
307- StreamingObserverBody :: new ( body, content_type, log_level, parts. status . as_u16 ( ) , execution) ;
300+ let streaming_body = StreamingObserverBody :: new (
301+ body,
302+ content_type,
303+ log_level,
304+ parts. status . as_u16 ( ) ,
305+ execution,
306+ ) ;
308307
309308 Ok ( Response :: from_parts ( parts, Body :: new ( streaming_body) ) )
310309 } )
0 commit comments