11use crate :: context;
2- use crate :: execution :: ExecutionHandle ;
2+ use crate :: group :: GroupBuilder ;
33use crate :: observation:: ObservationBuilder ;
4+ use crate :: observation_handle:: ObservationPayloadHandle ;
45use axum:: body:: Body ;
56use axum:: extract:: Request ;
67use axum:: response:: Response ;
@@ -28,30 +29,21 @@ use tower::Layer;
2829use tower:: Service ;
2930
3031/// State shared between the streaming body and the observation emitter.
31- /// This is used to collect data as it streams and emit the observation when
32- /// complete. The observation is emitted in the Drop implementation when the
33- /// body is finished .
32+ /// This is used to collect data as it streams and add the body payload when
33+ /// complete. The body payload is added in the Drop implementation via the
34+ /// ObservationPayloadHandle .
3435struct StreamingObserverState {
3536 buffer : BytesMut ,
3637 content_type : String ,
37- log_level : LogLevel ,
38- status_code : u16 ,
39- execution : ExecutionHandle ,
38+ payload_handle : ObservationPayloadHandle ,
4039}
4140
4241impl StreamingObserverState {
43- fn new (
44- content_type : String ,
45- log_level : LogLevel ,
46- status_code : u16 ,
47- execution : ExecutionHandle ,
48- ) -> Self {
42+ fn new ( content_type : String , payload_handle : ObservationPayloadHandle ) -> Self {
4943 Self {
5044 buffer : BytesMut :: new ( ) ,
5145 content_type,
52- log_level,
53- status_code,
54- execution,
46+ payload_handle,
5547 }
5648 }
5749
@@ -64,7 +56,7 @@ impl Drop for StreamingObserverState {
6456 fn drop ( & mut self ) {
6557 let bytes = self . buffer . clone ( ) . freeze ( ) ;
6658 tracing:: debug!(
67- "StreamingObserverBody: emitting observation with {} bytes on drop" ,
59+ "StreamingObserverBody: adding body payload with {} bytes on drop" ,
6860 bytes. len( )
6961 ) ;
7062 let payload = Payload {
@@ -73,19 +65,14 @@ impl Drop for StreamingObserverState {
7365 size : bytes. len ( ) ,
7466 } ;
7567
76- ObservationBuilder :: new ( "http/response/body" )
77- . label ( "http/response" )
78- . label ( "http/response/body" )
79- . metadata ( "status" , & self . status_code . to_string ( ) )
80- . log_level ( self . log_level )
81- . execution ( & self . execution )
82- . payload ( payload) ;
68+ self . payload_handle . raw_payload ( "body" , payload) ;
8369 }
8470}
8571
8672pin_project ! {
8773 /// A body wrapper that streams data through while collecting it for observation.
88- /// When the stream completes, it emits the observation with the collected body.
74+ /// When the stream completes, it adds the body as a named payload to the
75+ /// existing response observation.
8976 pub struct StreamingObserverBody {
9077 #[ pin]
9178 inner: Body ,
@@ -94,20 +81,12 @@ pin_project! {
9481}
9582
9683impl StreamingObserverBody {
97- fn new (
98- inner : Body ,
99- content_type : String ,
100- log_level : LogLevel ,
101- status_code : u16 ,
102- execution : ExecutionHandle ,
103- ) -> Self {
84+ fn new ( inner : Body , content_type : String , payload_handle : ObservationPayloadHandle ) -> Self {
10485 Self {
10586 inner,
10687 state : Arc :: new ( Mutex :: new ( StreamingObserverState :: new (
10788 content_type,
108- log_level,
109- status_code,
110- execution,
89+ payload_handle,
11190 ) ) ) ,
11291 }
11392 }
@@ -247,27 +226,33 @@ where
247226 } ;
248227
249228 let ( parts, body) = req. into_parts ( ) ;
250- ObservationBuilder :: new ( "http/request/headers" )
251- . label ( "http/request" )
252- . label ( "http/request/headers ")
229+
230+ // Create a single group for the HTTP exchange
231+ let http_group = GroupBuilder :: new ( "http_request ")
253232 . metadata ( "method" , parts. method . to_string ( ) )
254233 . metadata ( "uri" , parts. uri . to_string ( ) )
255- . serde ( & json ! ( filter_headers(
256- & parts. headers,
257- & config. excluded_headers
258- ) ) ) ;
234+ . build_with_execution ( & execution)
235+ . into_handle ( ) ;
259236
237+ // Collect request body
260238 let request_body_bytes = body
261239 . collect ( )
262240 . await
263241 . map ( |collected| collected. to_bytes ( ) )
264242 . unwrap_or_else ( |_| Bytes :: new ( ) ) ;
265- ObservationBuilder :: new ( "http/request/body" )
266- . label ( "http/request" )
267- . label ( "http/request/body" )
243+
244+ // Single request observation with named payloads: "headers" + "body"
245+ let headers_json = json ! ( filter_headers( & parts. headers, & config. excluded_headers) ) ;
246+ let headers_payload = Payload :: json (
247+ serde_json:: to_string ( & headers_json) . unwrap_or_default ( ) ,
248+ ) ;
249+ ObservationBuilder :: new ( "http/request" )
250+ . group ( & http_group)
268251 . metadata ( "method" , parts. method . to_string ( ) )
269252 . metadata ( "uri" , parts. uri . to_string ( ) )
270- . payload ( bytes_to_payload ( & request_body_bytes, & parts. headers ) ) ;
253+ . execution ( & execution)
254+ . named_raw_payload ( "headers" , headers_payload)
255+ . raw_payload ( "body" , bytes_to_payload ( & request_body_bytes, & parts. headers ) ) ;
271256
272257 let response = inner
273258 . call ( Request :: from_parts ( parts, Body :: from ( request_body_bytes) ) )
@@ -280,31 +265,30 @@ where
280265 500 ..=599 => LogLevel :: Error ,
281266 _ => LogLevel :: Info ,
282267 } ;
283- ObservationBuilder :: new ( "http/response/headers" )
284- . label ( "http/response" )
285- . label ( "http/response/headers" )
268+
269+ // Single response observation with named payload "headers" sent immediately,
270+ // "body" added later when streaming completes via the payload handle
271+ let resp_headers_json = json ! ( filter_headers( & parts. headers, & config. excluded_headers) ) ;
272+ let resp_headers_payload = Payload :: json (
273+ serde_json:: to_string ( & resp_headers_json) . unwrap_or_default ( ) ,
274+ ) ;
275+ let payload_handle = ObservationBuilder :: new ( "http/response" )
276+ . group ( & http_group)
286277 . metadata ( "status" , & parts. status . as_u16 ( ) . to_string ( ) )
287278 . log_level ( log_level)
288- . serde ( & json ! ( filter_headers(
289- & parts. headers,
290- & config. excluded_headers
291- ) ) ) ;
279+ . execution ( & execution)
280+ . named_raw_payload ( "headers" , resp_headers_payload) ;
292281
293282 // Wrap the response body in a streaming observer that captures data as it flows
294- // through and emits the observation when the stream completes
283+ // through and adds the body payload when the stream completes
295284 let content_type = parts
296285 . headers
297286 . get ( CONTENT_TYPE )
298287 . and_then ( |v| v. to_str ( ) . ok ( ) )
299288 . unwrap_or ( "application/octet-stream" )
300289 . to_string ( ) ;
301- let streaming_body = StreamingObserverBody :: new (
302- body,
303- content_type,
304- log_level,
305- parts. status . as_u16 ( ) ,
306- execution,
307- ) ;
290+ let streaming_body =
291+ StreamingObserverBody :: new ( body, content_type, payload_handle) ;
308292
309293 Ok ( Response :: from_parts ( parts, Body :: new ( streaming_body) ) )
310294 } )
0 commit comments