Skip to content

Commit 7095380

Browse files
igormqIgor Macedo QuintanilhaIgor Macedo Quintanilha
authored
fix: context propagation fifo (#1530)
* fix: context propagation fifo * proposal 2 * fix: use startObservation for parent observation to preserve custom conventions When a parent observation was present, the observation was created using Observation.createNotStarted() with just the convention name, bypassing the customObservationConvention and ObservationDocumentation setup. Refactor startObservation to accept an optional parent observation and extract the observe-and-send logic into a private method, ensuring both code paths use the same convention/documentation pipeline. --------- Co-authored-by: Igor Macedo Quintanilha <igormq@poli.ufrj.br> Co-authored-by: Igor Macedo Quintanilha <igormq@Mac.mynet>
1 parent d01e69e commit 7095380

File tree

3 files changed

+311
-20
lines changed

3 files changed

+311
-20
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/AbstractMessagingTemplate.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -301,23 +301,31 @@ public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointN
301301
public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointName, Message<T> message) {
302302
String endpointToUse = getEndpointName(endpointName);
303303
logger.trace("Sending message {} to endpoint {}", MessageHeaderUtils.getId(message), endpointName);
304-
return preProcessMessageForSendAsync(endpointToUse, message)
305-
.thenCompose(messageToUse -> observeAndSendAsync(messageToUse, endpointToUse)
306-
.exceptionallyCompose(
307-
t -> CompletableFuture.failedFuture(new MessagingOperationFailedException(
308-
"Message send operation failed for message %s to endpoint %s"
309-
.formatted(MessageHeaderUtils.getId(message), endpointToUse),
310-
endpointToUse, message, t)))
311-
.whenComplete((v, t) -> logSendMessageResult(endpointToUse, message, t)));
312-
}
313-
314-
private <T> CompletableFuture<SendResult<T>> observeAndSendAsync(Message<T> message, String endpointToUse) {
315-
AbstractTemplateObservation.Context context = this.observationSpecifics.createContext(message, endpointToUse);
316-
Observation observation = startObservation(context);
317-
Map<String, Object> carrier = Objects.requireNonNull(context.getCarrier(), "No carrier found in context.");
318-
Message<T> messageWithObservationHeader = MessageHeaderUtils.addHeadersIfAbsent(message, carrier);
319-
return doSendAsync(endpointToUse, convertMessageToSend(messageWithObservationHeader),
320-
messageWithObservationHeader)
304+
305+
// Capture parent observation on the calling thread to propagate trace context across async boundary
306+
var parentObservation = this.observationRegistry.getCurrentObservation();
307+
308+
return preProcessMessageForSendAsync(endpointToUse, message).thenCompose(
309+
preprocessedMessage -> observeAndSend(preprocessedMessage, message, endpointToUse, parentObservation));
310+
}
311+
312+
private <T> CompletableFuture<SendResult<T>> observeAndSend(Message<T> preprocessedMessage,
313+
Message<T> originalMessage, String endpointToUse, @Nullable Observation parentObservation) {
314+
var context = this.observationSpecifics.createContext(preprocessedMessage, endpointToUse);
315+
Observation observation = startObservation(context, parentObservation);
316+
var carrier = Objects.requireNonNull(context.getCarrier(), "No carrier found in context.");
317+
var messageWithObservationHeaders = MessageHeaderUtils.addHeadersIfAbsent(preprocessedMessage, carrier);
318+
return doSendAndCompleteObservation(messageWithObservationHeaders, endpointToUse, context, observation)
319+
.exceptionallyCompose(t -> CompletableFuture.failedFuture(new MessagingOperationFailedException(
320+
"Message send operation failed for message %s to endpoint %s"
321+
.formatted(MessageHeaderUtils.getId(originalMessage), endpointToUse),
322+
endpointToUse, originalMessage, t)))
323+
.whenComplete((v, t) -> logSendMessageResult(endpointToUse, originalMessage, t));
324+
}
325+
326+
private <T> CompletableFuture<SendResult<T>> doSendAndCompleteObservation(Message<T> message, String endpointToUse,
327+
AbstractTemplateObservation.Context context, Observation observation) {
328+
return doSendAsync(endpointToUse, convertMessageToSend(message), message)
321329
.whenComplete((sendResult, t) -> completeObservation(sendResult, context, t, observation));
322330
}
323331

@@ -335,13 +343,18 @@ private void completeObservation(@Nullable SendResult<?> sendResult, AbstractTem
335343
}
336344

337345
@SuppressWarnings("unchecked")
338-
private <Context extends Observation.Context> Observation startObservation(Context observationContext) {
346+
private <Context extends Observation.Context> Observation startObservation(Context observationContext,
347+
@Nullable Observation parentObservation) {
339348
ObservationConvention<Context> defaultConvention = (ObservationConvention<Context>) observationSpecifics
340349
.getDefaultConvention();
341350
ObservationConvention<Context> customConvention = (ObservationConvention<Context>) this.customObservationConvention;
342351
ObservationDocumentation documentation = observationSpecifics.getDocumentation();
343-
return documentation.start(customConvention, defaultConvention, () -> observationContext,
344-
this.observationRegistry);
352+
Observation observation = documentation.observation(customConvention, defaultConvention,
353+
() -> observationContext, this.observationRegistry);
354+
if (parentObservation != null) {
355+
observation.parentObservation(parentObservation);
356+
}
357+
return observation.start();
345358
}
346359

347360
protected abstract <T> Message<T> preProcessMessageForSend(String endpointToUse, Message<T> message);
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.integration;
17+
18+
import io.awspring.cloud.sqs.operations.SqsTemplate;
19+
import io.micrometer.observation.Observation;
20+
import io.micrometer.observation.ObservationRegistry;
21+
import io.micrometer.observation.tck.TestObservationRegistry;
22+
import io.micrometer.tracing.CurrentTraceContext;
23+
import io.micrometer.tracing.Span;
24+
import io.micrometer.tracing.TraceContext;
25+
import io.micrometer.tracing.Tracer;
26+
import io.micrometer.tracing.handler.DefaultTracingObservationHandler;
27+
import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler;
28+
import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler;
29+
import io.micrometer.tracing.propagation.Propagator;
30+
import io.micrometer.tracing.test.simple.SimpleTraceContext;
31+
import io.micrometer.tracing.test.simple.SimpleTracer;
32+
import org.junit.jupiter.api.AfterEach;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.Test;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
import org.springframework.beans.factory.annotation.Autowired;
38+
import org.springframework.boot.test.context.SpringBootTest;
39+
import org.springframework.context.annotation.Bean;
40+
import org.springframework.context.annotation.Configuration;
41+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
42+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
43+
44+
import java.time.Duration;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.UUID;
48+
49+
import static org.assertj.core.api.Assertions.assertThat;
50+
51+
/**
52+
* Integration tests for trace context propagation in FIFO queues with SqsTemplate.
53+
* <p>
54+
* Verifies that trace headers (traceparent) are correctly propagated from sender to receiver when using
55+
* {@code sendAsync()} with FIFO queues, including scenarios where queue attributes must be resolved asynchronously on
56+
* the first call and when they are cached on subsequent calls.
57+
*
58+
* @author Igor Quintanilha
59+
*/
60+
@SpringBootTest
61+
public class SqsTemplateFifoTracingIntegrationTest extends BaseSqsIntegrationTest {
62+
private static final Logger logger = LoggerFactory.getLogger(SqsTemplateFifoTracingIntegrationTest.class);
63+
64+
private static final String FIFO_QUEUE_NAME = "trace-context-test-queue.fifo";
65+
private static final String FIFO_CACHE_HIT_QUEUE_NAME = "trace-context-test-queue-cache-hit.fifo";
66+
67+
@Autowired
68+
private SqsTemplate sqsTemplate;
69+
70+
@Autowired
71+
private TestObservationRegistry observationRegistry;
72+
73+
@Autowired
74+
private CurrentTraceContext currentTraceContext;
75+
76+
@BeforeAll
77+
static void beforeTests() {
78+
var client = createAsyncClient();
79+
createFifoQueue(client, FIFO_QUEUE_NAME, Map.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "false")).join();
80+
createFifoQueue(client, FIFO_CACHE_HIT_QUEUE_NAME, Map.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true")).join();
81+
82+
}
83+
84+
@AfterEach
85+
void cleanupAfterEach() {
86+
observationRegistry.clear();
87+
}
88+
89+
@Test
90+
void sendAsync_toFifoQueue_shouldPropagateObservationScopeOnFirstCall() {
91+
var parentObservation = Observation.start("parent-observation", observationRegistry);
92+
var payload = new TestEvent(UUID.randomUUID().toString());
93+
String expectedTraceId;
94+
95+
try (var ignored = parentObservation.openScope()) {
96+
expectedTraceId = currentTraceContext.context().traceId();
97+
sqsTemplate.sendAsync(FIFO_QUEUE_NAME, payload).join();
98+
}
99+
finally {
100+
parentObservation.stop();
101+
}
102+
103+
logger.info("expectedTraceId={}", expectedTraceId);
104+
105+
var receivedMessage = sqsTemplate
106+
.receive(from -> from.queue(FIFO_QUEUE_NAME).pollTimeout(Duration.ofSeconds(5)), TestEvent.class)
107+
.orElseThrow(() -> new AssertionError("Expected message was not received"));
108+
109+
assertThat(receivedMessage.getPayload()).isEqualTo(payload);
110+
var traceparent = (String) receivedMessage.getHeaders().get("traceparent");
111+
assertThat(traceparent).as("traceparent header should be present").isNotNull();
112+
assertThat(traceparent).as("traceparent should contain the traceId").contains(expectedTraceId);
113+
}
114+
115+
@Test
116+
void sendAsync_toFifoQueue_shouldCreateObservationOnCallingThreadAfterCacheHit() {
117+
// Given - Warm up: send a message to populate the queue attribute cache
118+
var warmupPayload = new TestEvent(UUID.randomUUID().toString());
119+
sqsTemplate.sendAsync(FIFO_CACHE_HIT_QUEUE_NAME, warmupPayload).join();
120+
121+
// Drain the warmup message
122+
sqsTemplate.receive(from -> from.queue(FIFO_CACHE_HIT_QUEUE_NAME).pollTimeout(Duration.ofSeconds(5)), TestEvent.class);
123+
124+
// Given - Start a NEW observation for the actual test
125+
var observation = Observation.start("test-send-second", observationRegistry);
126+
String expectedTraceId;
127+
128+
var payload = new TestEvent(UUID.randomUUID().toString());
129+
try (var ignored = observation.openScope()) {
130+
expectedTraceId = currentTraceContext.context().traceId();
131+
// When - Second call (cache hit - queue attributes already resolved)
132+
sqsTemplate.sendAsync(FIFO_CACHE_HIT_QUEUE_NAME, payload).join();
133+
}
134+
finally {
135+
observation.stop();
136+
}
137+
138+
logger.info("expectedTraceId={}", expectedTraceId);
139+
140+
var receivedMessage = sqsTemplate
141+
.receive(from -> from.queue(FIFO_CACHE_HIT_QUEUE_NAME).pollTimeout(Duration.ofSeconds(5)), TestEvent.class)
142+
.orElseThrow(() -> new AssertionError("Expected message was not received"));
143+
144+
assertThat(receivedMessage.getPayload()).isEqualTo(payload);
145+
var traceparent = (String) receivedMessage.getHeaders().get("traceparent");
146+
assertThat(traceparent).as("traceparent header should be present").isNotNull();
147+
assertThat(traceparent).as("traceparent should contain the traceId").contains(expectedTraceId);
148+
}
149+
150+
@Configuration
151+
static class TestConfiguration {
152+
153+
@Bean
154+
public SqsAsyncClient sqsAsyncClient() {
155+
return createAsyncClient();
156+
}
157+
158+
@Bean
159+
public Tracer tracer() {
160+
return new SimpleTracer();
161+
}
162+
163+
@Bean
164+
public CurrentTraceContext currentTraceContext(Tracer tracer) {
165+
return ((SimpleTracer) tracer).currentTraceContext();
166+
}
167+
168+
@Bean
169+
public Propagator propagator(Tracer tracer) {
170+
return new SimplePropagator(tracer);
171+
}
172+
173+
@Bean
174+
public ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator) {
175+
TestObservationRegistry registry = TestObservationRegistry.create();
176+
registry.observationConfig().observationHandler(new DefaultTracingObservationHandler(tracer));
177+
registry.observationConfig()
178+
.observationHandler(new PropagatingSenderTracingObservationHandler<>(tracer, propagator));
179+
registry.observationConfig()
180+
.observationHandler(new PropagatingReceiverTracingObservationHandler<>(tracer, propagator));
181+
return registry;
182+
}
183+
184+
@Bean
185+
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
186+
return SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
187+
.configure(options -> options.observationRegistry(observationRegistry)).build();
188+
}
189+
}
190+
191+
/**
192+
* Simple W3C Trace Context propagator for testing. In production, you would use a library like
193+
* micrometer-tracing-bridge-brave or micrometer-tracing-bridge-otel which provide full-featured propagators.
194+
*/
195+
static class SimplePropagator implements Propagator {
196+
197+
private final Tracer tracer;
198+
199+
SimplePropagator(Tracer tracer) {
200+
this.tracer = tracer;
201+
}
202+
203+
@Override
204+
public List<String> fields() {
205+
return List.of("traceparent", "tracestate");
206+
}
207+
208+
@Override
209+
public <C> void inject(TraceContext context, C carrier, Setter<C> setter) {
210+
// W3C Trace Context format: version-traceId-spanId-flags
211+
var traceparent = String.format("00-%s-%s-01", context.traceId(), context.spanId());
212+
setter.set(carrier, "traceparent", traceparent);
213+
}
214+
215+
@Override
216+
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
217+
var traceparent = getter.get(carrier, "traceparent");
218+
if (traceparent == null || traceparent.isEmpty()) {
219+
return tracer.spanBuilder().setNoParent();
220+
}
221+
// Parse W3C format: 00-traceId-spanId-01
222+
String[] parts = traceparent.split("-");
223+
if (parts.length < 4) {
224+
return tracer.spanBuilder().setNoParent();
225+
}
226+
// Use tracer to create span builder with extracted context
227+
Span.Builder builder = tracer.spanBuilder();
228+
var traceContext = new SimpleTraceContext();
229+
traceContext.setTraceId(parts[1]);
230+
traceContext.setParentId(parts[2]);
231+
traceContext.setSpanId(parts[3]);
232+
builder.setParent(traceContext);
233+
return builder;
234+
}
235+
}
236+
237+
record TestEvent(String data) {
238+
}
239+
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateObservationTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.awspring.cloud.sqs.listener.SqsHeaders;
2424
import io.awspring.cloud.sqs.support.observation.SqsTemplateObservation;
2525
import io.micrometer.common.KeyValues;
26+
import io.micrometer.observation.Observation;
2627
import io.micrometer.observation.tck.TestObservationRegistry;
2728
import io.micrometer.observation.tck.TestObservationRegistryAssert;
2829
import java.util.UUID;
@@ -165,6 +166,44 @@ void shouldCaptureErrorsInObservation() {
165166
.hasSingleObservationThat().hasError().assertThatError().isInstanceOf(RuntimeException.class);
166167
}
167168

169+
@Test
170+
void shouldApplyCustomConventionWhenParentObservationIsPresent() {
171+
// given
172+
SqsTemplateObservation.Convention customConvention = mock(SqsTemplateObservation.Convention.class);
173+
given(customConvention.supportsContext(any())).willReturn(true);
174+
given(customConvention.getName()).willReturn("spring.aws.sqs.template");
175+
176+
String lowCardinalityCustomKeyName = "custom.lowCardinality.key";
177+
String lowCardinalityCustomValue = "custom-lowCardinality-value";
178+
String highCardinalityCustomKeyName = "custom.highCardinality.key";
179+
String highCardinalityCustomValue = "custom-highCardinality-value";
180+
given(customConvention.getLowCardinalityKeyValues(any()))
181+
.willReturn(KeyValues.of(lowCardinalityCustomKeyName, lowCardinalityCustomValue));
182+
given(customConvention.getHighCardinalityKeyValues(any()))
183+
.willReturn(KeyValues.of(highCardinalityCustomKeyName, highCardinalityCustomValue));
184+
185+
TestObservationRegistry customRegistry = TestObservationRegistry.create();
186+
187+
SqsTemplate templateWithCustomConvention = SqsTemplate.builder().sqsAsyncClient(mockSqsAsyncClient)
188+
.configure(
189+
options -> options.observationRegistry(customRegistry).observationConvention(customConvention))
190+
.build();
191+
192+
// when - send within a parent observation scope
193+
Observation parentObservation = Observation.start("parent-observation", customRegistry);
194+
try (var ignored = parentObservation.openScope()) {
195+
templateWithCustomConvention.send(queueName, "test-payload");
196+
}
197+
finally {
198+
parentObservation.stop();
199+
}
200+
201+
// then - custom convention should be applied even with parent observation
202+
TestObservationRegistryAssert.then(customRegistry).hasNumberOfObservationsEqualTo(2)
203+
.hasAnObservationWithAKeyValue(lowCardinalityCustomKeyName, lowCardinalityCustomValue)
204+
.hasAnObservationWithAKeyValue(highCardinalityCustomKeyName, highCardinalityCustomValue);
205+
}
206+
168207
@Test
169208
void shouldSupportCustomKeyValuesInActiveSending() {
170209
// given

0 commit comments

Comments
 (0)