@@ -305,32 +305,22 @@ public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointN
305305 // Capture parent observation on the calling thread to propagate trace context across async boundary
306306 var parentObservation = this .observationRegistry .getCurrentObservation ();
307307
308- return preProcessMessageForSendAsync (endpointToUse , message ).thenCompose (preprocessedMessage -> {
309- // Create observation context with preprocessed message (now includes FIFO headers if applicable)
310- var context = this .observationSpecifics .createContext (preprocessedMessage , endpointToUse );
311-
312- // Start observation and link to parent to maintain trace continuity
313- Observation observation ;
314- if (parentObservation != null ) {
315- observation = Observation .createNotStarted (this .observationSpecifics .getDefaultConvention ().getName (),
316- () -> context , this .observationRegistry ).parentObservation (parentObservation ).start ();
317- }
318- else {
319- observation = startObservation (context );
320- }
321-
322- // Add trace headers to the message
323- var carrier = Objects .requireNonNull (context .getCarrier (), "No carrier found in context." );
324- var messageWithObservationHeaders = MessageHeaderUtils .addHeadersIfAbsent (preprocessedMessage , carrier );
308+ return preProcessMessageForSendAsync (endpointToUse , message ).thenCompose (
309+ preprocessedMessage -> observeAndSend (preprocessedMessage , message , endpointToUse , parentObservation ));
310+ }
325311
326- return doSendAndCompleteObservation (messageWithObservationHeaders , endpointToUse , context , observation )
327- .exceptionallyCompose (
328- t -> CompletableFuture .failedFuture (new MessagingOperationFailedException (
329- "Message send operation failed for message %s to endpoint %s"
330- .formatted (MessageHeaderUtils .getId (message ), endpointToUse ),
331- endpointToUse , message , t )))
332- .whenComplete ((v , t ) -> logSendMessageResult (endpointToUse , message , t ));
333- });
312+ private <T > CompletableFuture <SendResult <T >> observeAndSend (Message <T > preprocessedMessage ,
313+ Message <T > originalMessage , String endpointToUse , @ Nullable Observation parentObservation ) {
314+ var context = this .observationSpecifics .createContext (preprocessedMessage , endpointToUse );
315+ Observation observation = startObservation (context , parentObservation );
316+ var carrier = Objects .requireNonNull (context .getCarrier (), "No carrier found in context." );
317+ var messageWithObservationHeaders = MessageHeaderUtils .addHeadersIfAbsent (preprocessedMessage , carrier );
318+ return doSendAndCompleteObservation (messageWithObservationHeaders , endpointToUse , context , observation )
319+ .exceptionallyCompose (t -> CompletableFuture .failedFuture (new MessagingOperationFailedException (
320+ "Message send operation failed for message %s to endpoint %s"
321+ .formatted (MessageHeaderUtils .getId (originalMessage ), endpointToUse ),
322+ endpointToUse , originalMessage , t )))
323+ .whenComplete ((v , t ) -> logSendMessageResult (endpointToUse , originalMessage , t ));
334324 }
335325
336326 private <T > CompletableFuture <SendResult <T >> doSendAndCompleteObservation (Message <T > message , String endpointToUse ,
@@ -353,13 +343,18 @@ private void completeObservation(@Nullable SendResult<?> sendResult, AbstractTem
353343 }
354344
355345 @ SuppressWarnings ("unchecked" )
356- private <Context extends Observation .Context > Observation startObservation (Context observationContext ) {
346+ private <Context extends Observation .Context > Observation startObservation (Context observationContext ,
347+ @ Nullable Observation parentObservation ) {
357348 ObservationConvention <Context > defaultConvention = (ObservationConvention <Context >) observationSpecifics
358349 .getDefaultConvention ();
359350 ObservationConvention <Context > customConvention = (ObservationConvention <Context >) this .customObservationConvention ;
360351 ObservationDocumentation documentation = observationSpecifics .getDocumentation ();
361- return documentation .start (customConvention , defaultConvention , () -> observationContext ,
362- this .observationRegistry );
352+ Observation observation = documentation .observation (customConvention , defaultConvention ,
353+ () -> observationContext , this .observationRegistry );
354+ if (parentObservation != null ) {
355+ observation .parentObservation (parentObservation );
356+ }
357+ return observation .start ();
363358 }
364359
365360 protected abstract <T > Message <T > preProcessMessageForSend (String endpointToUse , Message <T > message );
0 commit comments