diff --git a/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/instrumentation/RestClientReadTimeoutOpenTelemetryTest.java b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/instrumentation/RestClientReadTimeoutOpenTelemetryTest.java new file mode 100644 index 0000000000000..202d3e5675db0 --- /dev/null +++ b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/instrumentation/RestClientReadTimeoutOpenTelemetryTest.java @@ -0,0 +1,157 @@ +package io.quarkus.opentelemetry.deployment.instrumentation; + +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.restassured.RestAssured.given; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.jboss.logging.Logger; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.quarkus.opentelemetry.deployment.common.SemconvResolver; +import io.quarkus.opentelemetry.deployment.common.exporter.InMemoryLogRecordExporterProvider; +import io.quarkus.opentelemetry.deployment.common.exporter.InMemoryMetricExporterProvider; +import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporter; +import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporterProvider; +import io.quarkus.opentelemetry.runtime.OpenTelemetryUtil; +import io.quarkus.test.QuarkusUnitTest; + +/** + * Regression test for #52239. + */ +public class RestClientReadTimeoutOpenTelemetryTest { + + static final AtomicReference TRACE_ID_AFTER_SLEEP = new AtomicReference<>(); + static final CountDownLatch SLOW_HANDLER_DONE = new CountDownLatch(1); + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest().withApplicationRoot((jar) -> jar + .addPackage(TestSpanExporter.class.getPackage()) + .addClasses(SemconvResolver.class) + .addAsResource(new StringAsset(TestSpanExporterProvider.class.getCanonicalName()), + "META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider") + .addAsResource(new StringAsset(InMemoryMetricExporterProvider.class.getCanonicalName()), + "META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider") + .addAsResource(new StringAsset(InMemoryLogRecordExporterProvider.class.getCanonicalName()), + "META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider")) + .withConfigurationResource("application-default.properties") + .overrideConfigKey("quarkus.log.console.format", + "%d{HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId} [%c{2.}] (%t) %s%e%n") + .overrideConfigKey("quarkus.log.category.\"io.quarkus.opentelemetry\".level", "DEBUG") + .overrideConfigKey("quarkus.rest-client.slow-client.url", "${test.url}") + .overrideConfigKey("quarkus.rest-client.slow-client.read-timeout", "3000"); + private static final org.slf4j.Logger log = LoggerFactory.getLogger(RestClientReadTimeoutOpenTelemetryTest.class); + + @Inject + TestSpanExporter spanExporter; + + @AfterEach + void tearDown() { + spanExporter.reset(); + TRACE_ID_AFTER_SLEEP.set(null); + } + + @Test + void readTimeoutDoesNotLoseServerOtelContext() throws InterruptedException { + given().get("/caller").then().statusCode(200); + + // Wait for the slow handler to complete (it sleeps 1s, timeout is 0.5s) + SLOW_HANDLER_DONE.await(3, SECONDS); + + // Wait for spans to be exported. We expect at least: + // 1. server span for GET /caller + // 2. server span for GET /slow (ended when connection was reset) + List spans = spanExporter.getFinishedSpanItemsAtLeast(2); + + // Verify exactly one server span for /slow (no duplicates from double sendResponse) + long slowServerSpanCount = spans.stream() + .filter(s -> s.getKind() == SERVER && s.getName().contains("slow")) + .count(); + assertEquals(1, slowServerSpanCount, "Expected exactly one server span for /slow, got: " + spans); + + // Verify the server span for /slow has an error (connection closed) + SpanData slowServerSpan = spans.stream() + .filter(s -> s.getKind() == SERVER && s.getName().contains("slow")) + .findFirst() + .orElseThrow(); + assertEquals(StatusCode.ERROR, slowServerSpan.getStatus().getStatusCode()); + + // Verify the traceId was still valid after the sleep (the core assertion for #52239) + String traceIdAfterSleep = TRACE_ID_AFTER_SLEEP.get(); + assertNotEquals(null, traceIdAfterSleep, "Slow handler did not capture traceId after sleep"); + assertNotEquals("00000000000000000000000000000000", traceIdAfterSleep, + "OTel context was lost: Span.current() returned invalid traceId after client disconnect"); + + // Verify the traceId matches the exported server span + assertEquals(slowServerSpan.getTraceId(), traceIdAfterSleep, + "TraceId after sleep should match the exported server span's traceId"); + } + + @Path("/caller") + public static class CallerResource { + private static final Logger logger = Logger.getLogger(CallerResource.class); + + @Inject + @RestClient + SlowClient slowClient; + + @GET + public String call() { + try { + logger.infov("Calling Slow"); + String slow = slowClient.slow(); + logger.infov("client received: {0}", slow); + return slow; + } catch (Exception e) { + return "timeout"; + } + } + } + + @RegisterRestClient(configKey = "slow-client") + @Path("/slow") + public interface SlowClient { + @GET + String slow(); + } + + @Path("/slow") + public static class SlowResource { + private static final Logger logger = Logger.getLogger(SlowResource.class); + + @GET + public String slow() throws InterruptedException { + logger.infov("Span before sleep: {0}", OpenTelemetryUtil.getSpanData(Context.current())); + + try { + // Sleep longer than the client read timeout (1s) to trigger a connection reset + Thread.sleep(5000); + // Capture the traceId AFTER the client has disconnected + TRACE_ID_AFTER_SLEEP.set(Span.current().getSpanContext().getTraceId()); + logger.infov("Span after sleep: {0}", OpenTelemetryUtil.getSpanData(Context.current())); + return "slow response"; + } finally { + SLOW_HANDLER_DONE.countDown(); + } + } + } +} diff --git a/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/logs/OtelLoggingTest.java b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/logs/OtelLoggingTest.java index 7384b96e73c68..fe9e0148aa9a9 100644 --- a/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/logs/OtelLoggingTest.java +++ b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/logs/OtelLoggingTest.java @@ -54,7 +54,8 @@ public class OtelLoggingTest { "META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider") .add(new StringAsset( "quarkus.otel.logs.enabled=true\n" + - "quarkus.otel.traces.enabled=true\n"), + "quarkus.otel.traces.enabled=true\n" + + "quarkus.log.category.\"io.quarkus.opentelemetry\".level=INFO\n"), "application.properties")); @Inject diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/MDCEnabledContextStorage.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/MDCEnabledContextStorage.java index 90a1269c5ee59..4035cdc8b9882 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/MDCEnabledContextStorage.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/MDCEnabledContextStorage.java @@ -29,12 +29,17 @@ public Scope attach(Context toAttach) { return new Scope() { @Override public void close() { - if (log.isDebugEnabled()) { - log.debugv("Closing Otel context: {0}", OpenTelemetryUtil.getSpanData(toAttach)); - } if (beforeAttach == null) { + if (log.isDebugEnabled()) { + log.debugv("Closing Otel context: {0}", OpenTelemetryUtil.getSpanData(toAttach)); + } OpenTelemetryUtil.clearMDCData(null); } else { + if (log.isDebugEnabled()) { + log.debugv("Closing Otel context: {0} and restoring previous OTel context: {1}", + OpenTelemetryUtil.getSpanData(toAttach), + OpenTelemetryUtil.getSpanData(beforeAttach)); + } OpenTelemetryUtil.setMDCData(beforeAttach, null); } scope.close(); diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java index 493a5b006b37e..756d74abf3440 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java @@ -80,10 +80,6 @@ public void close() { // compare otel contexts when closing scope final Context otelBefore = getOtelContext(vertxContext); - if (log.isDebugEnabled()) { - log.debugv("Closing Otel context: {0}", OpenTelemetryUtil.getSpanData(otelToAttach)); - } - if (otelBefore != otelToAttach && log.isDebugEnabled()) { // Different references can contain the same span data. // Duplicated contexts can be duplicated. @@ -99,9 +95,17 @@ public void close() { } if (otelBeforeAttach == null) { + if (log.isDebugEnabled()) { + log.debugv("Closing Otel context: {0}", OpenTelemetryUtil.getSpanData(otelToAttach)); + } OpenTelemetryUtil.clearMDCData(vertxContext); vertxContext.removeLocal(OTEL_CONTEXT); } else { + if (log.isDebugEnabled()) { + log.debugv("Closing Otel context: {0} and restoring previous OTel context: {1}", + OpenTelemetryUtil.getSpanData(otelToAttach), + OpenTelemetryUtil.getSpanData(otelBeforeAttach)); + } OpenTelemetryUtil.setMDCData(otelBeforeAttach, vertxContext); vertxContext.putLocal(OTEL_CONTEXT, otelBeforeAttach); } diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/HttpInstrumenterVertxTracer.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/HttpInstrumenterVertxTracer.java index cd4daa12980ae..993e5c1a555eb 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/HttpInstrumenterVertxTracer.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/HttpInstrumenterVertxTracer.java @@ -12,6 +12,8 @@ import java.util.List; import java.util.function.BiConsumer; +import org.jboss.logging.Logger; + import io.netty.handler.codec.http.HttpResponseStatus; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributesBuilder; @@ -51,6 +53,8 @@ import io.vertx.core.tracing.TracingPolicy; public class HttpInstrumenterVertxTracer implements InstrumenterVertxTracer { + private static final Logger logger = Logger.getLogger(HttpInstrumenterVertxTracer.class); + private final Instrumenter serverInstrumenter; private final Instrumenter clientInstrumenter; @@ -104,7 +108,9 @@ public void sendResponse( final OpenTelemetryVertxTracer.SpanOperation spanOperation, final Throwable failure, final TagExtractor tagExtractor) { - + if (logger.isDebugEnabled()) { + logger.debugv("http failure: {0}", failure); + } HttpServerRoute.update(spanOperation.getSpanContext(), SERVER_FILTER, RouteGetter.ROUTE_GETTER, ((HttpRequestSpan) spanOperation.getRequest()), (HttpResponse) response); InstrumenterVertxTracer.super.sendResponse(context, response, spanOperation, failure, tagExtractor); diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/InstrumenterVertxTracer.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/InstrumenterVertxTracer.java index d336d55c7b58e..b7c67db894753 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/InstrumenterVertxTracer.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/InstrumenterVertxTracer.java @@ -68,8 +68,22 @@ default void sendResponse( Object request = spanOperation.getRequest(); Instrumenter instrumenter = getSendResponseInstrumenter(); - try (scope) { - instrumenter.end(spanOperation.getSpanContext(), (REQ) request, (RESP) response, failure); + + if (failure != null && response == null) { + // Connection reset (e.g. client read timeout) can be triggered while a worker is still running. + // When the worker completed, this method is called again but the original scope.close() has already cleaned the + // OTel context from the DuplicatedContext + // This will end the span to record the failure but won't close the scope yet because it would erase the OTel context + // and mess with ongoing child spans + if (spanOperation.tryEndSpan()) { + instrumenter.end(spanOperation.getSpanContext(), (REQ) request, (RESP) response, failure); + } + } else { + try (scope) { + if (spanOperation.tryEndSpan()) { + instrumenter.end(spanOperation.getSpanContext(), (REQ) request, (RESP) response, failure); + } + } } } diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/OpenTelemetryVertxTracer.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/OpenTelemetryVertxTracer.java index 3f1ceb140bd87..a6f845f3efdd6 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/OpenTelemetryVertxTracer.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/instrumentation/vertx/OpenTelemetryVertxTracer.java @@ -3,6 +3,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.BiConsumer; import io.opentelemetry.context.Scope; @@ -97,6 +98,9 @@ static class SpanOperation { private final MultiMap headers; private final io.opentelemetry.context.Context spanContext; private final Scope scope; + private volatile int spanEnded; + private static final AtomicIntegerFieldUpdater spanEndedUpdater = AtomicIntegerFieldUpdater + .newUpdater(SpanOperation.class, "spanEnded"); public SpanOperation(final Context context, final Object request, final MultiMap headers, final io.opentelemetry.context.Context spanContext, final Scope scope) { @@ -107,6 +111,10 @@ public SpanOperation(final Context context, final Object request, final MultiMap this.scope = scope; } + boolean tryEndSpan() { + return spanEndedUpdater.compareAndSet(this, 0, 1); + } + public Context getContext() { return context; } diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryWithSpanAtStartupTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryWithSpanAtStartupTest.java index d116b4a32cab7..2ef72ecbbd647 100644 --- a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryWithSpanAtStartupTest.java +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryWithSpanAtStartupTest.java @@ -45,7 +45,7 @@ void testGeneratedSpansUsingRestClientReactive() { await().atMost(Duration.ofSeconds(5L)).pollDelay(Duration.ofMillis(50)).until(() -> { // make sure incoming spans are processed List> spans = getSpans(); - return spans.size() >= 1; + return spans.size() >= 2; }); List> spans = getSpans();