Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package io.quarkus.opentelemetry.deployment.instrumentation;

import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.quarkus.opentelemetry.deployment.common.SemconvResolver;
import io.quarkus.opentelemetry.deployment.common.exporter.InMemoryLogRecordExporterProvider;
import io.quarkus.opentelemetry.deployment.common.exporter.InMemoryMetricExporterProvider;
import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporter;
import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporterProvider;
import io.quarkus.test.QuarkusUnitTest;

/**
* Regression test for <a href="https://github.com/quarkusio/quarkus/issues/52239">#52239</a>.
*/
public class RestClientReadTimeoutOpenTelemetryTest {

static final AtomicReference<String> TRACE_ID_AFTER_SLEEP = new AtomicReference<>();
static final CountDownLatch SLOW_HANDLER_DONE = new CountDownLatch(1);

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest().withApplicationRoot((jar) -> jar
.addPackage(TestSpanExporter.class.getPackage())
.addClasses(SemconvResolver.class)
.addAsResource(new StringAsset(TestSpanExporterProvider.class.getCanonicalName()),
"META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider")
.addAsResource(new StringAsset(InMemoryMetricExporterProvider.class.getCanonicalName()),
"META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider")
.addAsResource(new StringAsset(InMemoryLogRecordExporterProvider.class.getCanonicalName()),
"META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider"))
.withConfigurationResource("application-default.properties")
.overrideConfigKey("quarkus.rest-client.slow-client.url", "${test.url}")
.overrideConfigKey("quarkus.rest-client.slow-client.read-timeout", "500");

@Inject
TestSpanExporter spanExporter;

@AfterEach
void tearDown() {
spanExporter.reset();
TRACE_ID_AFTER_SLEEP.set(null);
}

@Test
void readTimeoutDoesNotLoseServerOtelContext() throws InterruptedException {
given().get("/caller").then().statusCode(200);

// Wait for the slow handler to complete (it sleeps 1s, timeout is 0.5s)
SLOW_HANDLER_DONE.await(3, SECONDS);

// Wait for spans to be exported. We expect at least:
// 1. server span for GET /caller
// 2. server span for GET /slow (ended when connection was reset)
List<SpanData> spans = spanExporter.getFinishedSpanItemsAtLeast(2);

// Verify exactly one server span for /slow (no duplicates from double sendResponse)
long slowServerSpanCount = spans.stream()
.filter(s -> s.getKind() == SERVER && s.getName().contains("slow"))
.count();
assertEquals(1, slowServerSpanCount, "Expected exactly one server span for /slow, got: " + spans);

// Verify the server span for /slow has an error (connection closed)
SpanData slowServerSpan = spans.stream()
.filter(s -> s.getKind() == SERVER && s.getName().contains("slow"))
.findFirst()
.orElseThrow();
assertEquals(StatusCode.ERROR, slowServerSpan.getStatus().getStatusCode());

// Verify the traceId was still valid after the sleep (the core assertion for #52239)
String traceIdAfterSleep = TRACE_ID_AFTER_SLEEP.get();
assertNotEquals(null, traceIdAfterSleep, "Slow handler did not capture traceId after sleep");
assertNotEquals("00000000000000000000000000000000", traceIdAfterSleep,
"OTel context was lost: Span.current() returned invalid traceId after client disconnect");

// Verify the traceId matches the exported server span
assertEquals(slowServerSpan.getTraceId(), traceIdAfterSleep,
"TraceId after sleep should match the exported server span's traceId");
}

@Path("/slow")
public static class SlowResource {
@GET
public String slow() throws InterruptedException {
try {
// Sleep longer than the client read timeout (1s) to trigger a connection reset
Thread.sleep(1000);
// Capture the traceId AFTER the client has disconnected
TRACE_ID_AFTER_SLEEP.set(Span.current().getSpanContext().getTraceId());
return "slow response";
} finally {
SLOW_HANDLER_DONE.countDown();
}
}
}

@RegisterRestClient(configKey = "slow-client")
@Path("/slow")
public interface SlowClient {
@GET
String slow();
}

@Path("/caller")
public static class CallerResource {
@Inject
@RestClient
SlowClient slowClient;

@GET
public String call() {
try {
return slowClient.slow();
} catch (Exception e) {
return "timeout";
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,20 @@ default <R> void sendResponse(

Object request = spanOperation.getRequest();
Instrumenter<REQ, RESP> instrumenter = getSendResponseInstrumenter();
try (scope) {
instrumenter.end(spanOperation.getSpanContext(), (REQ) request, (RESP) response, failure);

if (failure != null && response == null) {
// Connection reset (e.g. client read timeout) can be triggered while a worker is still be running.
// Will end the span to record the failure but don't close the scope yet because it would erase the OTel context
// and mess with ongoing child spans
if (spanOperation.tryEndSpan()) {
instrumenter.end(spanOperation.getSpanContext(), (REQ) request, (RESP) response, failure);
}
Comment on lines +73 to +78
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will the scope be closed?

} else {
try (scope) {
if (spanOperation.tryEndSpan()) {
instrumenter.end(spanOperation.getSpanContext(), (REQ) request, (RESP) response, failure);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

import io.opentelemetry.context.Scope;
Expand Down Expand Up @@ -97,6 +98,7 @@ static class SpanOperation {
private final MultiMap headers;
private final io.opentelemetry.context.Context spanContext;
private final Scope scope;
private final AtomicBoolean spanEnded = new AtomicBoolean();

public SpanOperation(final Context context, final Object request, final MultiMap headers,
final io.opentelemetry.context.Context spanContext, final Scope scope) {
Expand All @@ -107,6 +109,10 @@ public SpanOperation(final Context context, final Object request, final MultiMap
this.scope = scope;
}

boolean tryEndSpan() {
return spanEnded.compareAndSet(false, true);
}

public Context getContext() {
return context;
}
Expand Down
Loading