Skip to content

Commit fe4863d

Browse files
committed
Fix MDC connector context not propagating to async querier thread
Since v10.8.5, logs from the async query path (TableQuerierProcessor, TableQuerier, etc.) lost the [connector-name|task-N] prefix because Kafka Connect's MDC is thread-local and was not propagated to the background executor thread introduced by the RecordQueue architecture. Capture MDC.getCopyOfContextMap() on the calling (poll) thread before submitting to the executor, then restore it on the executor thread before ConnectLogContext is created. This allows ConnectLogContext to find and extend the connector.context MDC entry as intended. MDC is cleared in a finally block to prevent leaking to pooled threads.
1 parent 1e5a279 commit fe4863d

File tree

3 files changed

+217
-2
lines changed

3 files changed

+217
-2
lines changed

src/main/java/io/confluent/connect/jdbc/util/RecordQueue.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.time.Duration;
1414
import java.util.ArrayList;
1515
import java.util.List;
16+
import java.util.Map;
1617
import java.util.Objects;
1718
import java.util.concurrent.ArrayBlockingQueue;
1819
import java.util.concurrent.BlockingQueue;
@@ -29,6 +30,8 @@
2930
import java.util.function.IntFunction;
3031
import java.util.function.Supplier;
3132

33+
import org.slf4j.MDC;
34+
3235
/**
3336
* A concurrent and blocking queue for source records, with the ability to asynchronously execute
3437
* record generators that send records to this queue.
@@ -318,12 +321,17 @@ public <T> CompletableFuture<T> submit(
318321
final RecordDestination<RecordT> cancellableDestination =
319322
this.withAdditionalRunningCondition(stillRun::get);
320323

324+
// Capture the caller's MDC (which includes Connect's connector.context)
325+
// so it can be restored on the executor thread
326+
Map<String, String> callerMdc = MDC.getCopyOfContextMap();
327+
321328
// Submit the function and use the cancellable destination
322329
Supplier<T> supplier = createLoggingSupplier(
323330
operationName,
324331
logContext,
325332
cancellableDestination,
326-
generatorProcessor
333+
generatorProcessor,
334+
callerMdc
327335
);
328336
return CompletableFuture.supplyAsync(supplier, executor);
329337
}
@@ -465,9 +473,15 @@ protected <T> Supplier<T> createLoggingSupplier(
465473
String operationName,
466474
String logContext,
467475
RecordDestination<RecordT> destination,
468-
Function<RecordDestination<RecordT>, T> generatorProcessor
476+
Function<RecordDestination<RecordT>, T> generatorProcessor,
477+
Map<String, String> callerMdc
469478
) {
470479
return () -> {
480+
// Restore the caller's MDC on this executor thread so that
481+
// ConnectLogContext can find and extend the connector.context
482+
if (callerMdc != null) {
483+
MDC.setContextMap(callerMdc);
484+
}
471485
try (ConnectLogContext context = new ConnectLogContext(logContext)) {
472486
try {
473487
log.debug("{}Starting {}", context.prefix(), operationName);
@@ -479,6 +493,9 @@ protected <T> Supplier<T> createLoggingSupplier(
479493
} finally {
480494
log.debug("{}Stopped {}", context.prefix(), operationName);
481495
}
496+
} finally {
497+
// Clean up MDC to prevent leaking context to pooled threads
498+
MDC.clear();
482499
}
483500
};
484501
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright [2024 - 2024] Confluent Inc.
3+
*/
4+
5+
package io.confluent.connect.jdbc.util;
6+
7+
import org.junit.After;
8+
import org.junit.Test;
9+
import org.slf4j.MDC;
10+
11+
import static org.junit.Assert.assertEquals;
12+
import static org.junit.Assert.assertNull;
13+
14+
public class ConnectLogContextTest {
15+
16+
private static final String CONNECTOR_MDC_CONTEXT = "connector.context";
17+
18+
@After
19+
public void tearDown() {
20+
MDC.clear();
21+
}
22+
23+
@Test
24+
public void shouldAppendSuffixWhenMdcHasConnectorContext() {
25+
MDC.put(CONNECTOR_MDC_CONTEXT, "[my-connector|task-0] ");
26+
27+
try (ConnectLogContext ctx = new ConnectLogContext("tableQuerierProcessor")) {
28+
// MDC should now have the appended context
29+
assertEquals(
30+
"[my-connector|task-0|tableQuerierProcessor] ",
31+
MDC.get(CONNECTOR_MDC_CONTEXT)
32+
);
33+
// prefix should be empty since MDC is used
34+
assertEquals("", ctx.prefix());
35+
}
36+
37+
// After close, MDC should be restored to original value
38+
assertEquals("[my-connector|task-0] ", MDC.get(CONNECTOR_MDC_CONTEXT));
39+
}
40+
41+
@Test
42+
public void shouldUsePrefixWhenMdcIsEmpty() {
43+
// No MDC set
44+
try (ConnectLogContext ctx = new ConnectLogContext("tableQuerierProcessor")) {
45+
// MDC should still be null
46+
assertNull(MDC.get(CONNECTOR_MDC_CONTEXT));
47+
// prefix should contain the log context
48+
assertEquals("tableQuerierProcessor ", ctx.prefix());
49+
}
50+
}
51+
52+
@Test
53+
public void shouldHandleNestedContexts() {
54+
MDC.put(CONNECTOR_MDC_CONTEXT, "[my-connector|task-0] ");
55+
56+
try (ConnectLogContext outer = new ConnectLogContext("outer")) {
57+
assertEquals("[my-connector|task-0|outer] ", MDC.get(CONNECTOR_MDC_CONTEXT));
58+
59+
try (ConnectLogContext inner = new ConnectLogContext("inner")) {
60+
assertEquals("[my-connector|task-0|outer|inner] ", MDC.get(CONNECTOR_MDC_CONTEXT));
61+
}
62+
63+
// After inner closes, should restore to outer's context
64+
assertEquals("[my-connector|task-0|outer] ", MDC.get(CONNECTOR_MDC_CONTEXT));
65+
}
66+
67+
// After outer closes, should restore to original
68+
assertEquals("[my-connector|task-0] ", MDC.get(CONNECTOR_MDC_CONTEXT));
69+
}
70+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright [2024 - 2024] Confluent Inc.
3+
*/
4+
5+
package io.confluent.connect.jdbc.util;
6+
7+
import org.apache.kafka.connect.source.SourceRecord;
8+
import org.junit.After;
9+
import org.junit.Before;
10+
import org.junit.Test;
11+
import org.slf4j.MDC;
12+
13+
import java.time.Duration;
14+
import java.util.concurrent.CompletableFuture;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertNotNull;
20+
import static org.junit.Assert.assertNull;
21+
import static org.junit.Assert.assertTrue;
22+
23+
public class RecordQueueMdcTest {
24+
25+
private static final String CONNECTOR_MDC_CONTEXT = "connector.context";
26+
27+
private RecordQueue<SourceRecord> queue;
28+
29+
@Before
30+
public void setUp() {
31+
queue = RecordQueue.<SourceRecord>builder()
32+
.maxBatchSize(10)
33+
.maxExecutorThreads(1)
34+
.build();
35+
}
36+
37+
@After
38+
public void tearDown() throws InterruptedException {
39+
MDC.clear();
40+
if (queue != null) {
41+
queue.stop();
42+
queue.awaitTermination(Duration.ofSeconds(5));
43+
}
44+
}
45+
46+
@Test
47+
public void shouldPropagateConnectorContextToExecutorThread() throws Exception {
48+
// Simulate Kafka Connect setting MDC on the calling thread
49+
MDC.put(CONNECTOR_MDC_CONTEXT, "[my-connector|task-0] ");
50+
51+
AtomicReference<String> capturedMdc = new AtomicReference<>();
52+
53+
CompletableFuture<Boolean> future = queue.submit(
54+
"Test Operation",
55+
"testProcessor",
56+
destination -> {
57+
// Capture the MDC value on the executor thread
58+
capturedMdc.set(MDC.get(CONNECTOR_MDC_CONTEXT));
59+
return true;
60+
}
61+
);
62+
63+
assertTrue(future.get(10, TimeUnit.SECONDS));
64+
65+
// The executor thread should have seen the connector context with appended suffix
66+
assertNotNull("MDC should be propagated to executor thread", capturedMdc.get());
67+
assertEquals(
68+
"[my-connector|task-0|testProcessor] ",
69+
capturedMdc.get()
70+
);
71+
}
72+
73+
@Test
74+
public void shouldCleanUpMdcAfterExecution() throws Exception {
75+
MDC.put(CONNECTOR_MDC_CONTEXT, "[my-connector|task-0] ");
76+
77+
AtomicReference<String> mdcAfterExecution = new AtomicReference<>();
78+
79+
// First submit — sets MDC on executor thread
80+
CompletableFuture<Boolean> future1 = queue.submit(
81+
"First Operation",
82+
"firstProcessor",
83+
destination -> true
84+
);
85+
future1.get(10, TimeUnit.SECONDS);
86+
87+
// Clear caller's MDC to simulate a different calling context
88+
MDC.clear();
89+
90+
// Second submit — without MDC on the calling thread
91+
CompletableFuture<Boolean> future2 = queue.submit(
92+
"Second Operation",
93+
"secondProcessor",
94+
destination -> {
95+
mdcAfterExecution.set(MDC.get(CONNECTOR_MDC_CONTEXT));
96+
return true;
97+
}
98+
);
99+
future2.get(10, TimeUnit.SECONDS);
100+
101+
// The second execution should NOT have the first execution's MDC
102+
// (it should be null since the caller had no MDC set)
103+
assertNull(
104+
"MDC should be cleaned up between executions on pooled threads",
105+
mdcAfterExecution.get()
106+
);
107+
}
108+
109+
@Test
110+
public void shouldWorkWhenCallerHasNoMdc() throws Exception {
111+
// No MDC set on the calling thread
112+
AtomicReference<String> capturedMdc = new AtomicReference<>();
113+
114+
CompletableFuture<Boolean> future = queue.submit(
115+
"Test Operation",
116+
"testProcessor",
117+
destination -> {
118+
capturedMdc.set(MDC.get(CONNECTOR_MDC_CONTEXT));
119+
return true;
120+
}
121+
);
122+
123+
assertTrue(future.get(10, TimeUnit.SECONDS));
124+
125+
// Without MDC, ConnectLogContext falls back to prefix mode — no MDC set
126+
assertNull(capturedMdc.get());
127+
}
128+
}

0 commit comments

Comments
 (0)