Skip to content

Commit cacfd4c

Browse files
xinlian12Copilot
andauthored
fix: isolate per-record failures in Kafka sink transformer and guard DLQ reporter (#49286)
* fix: isolate per-record ID-generation failures in Kafka sink transformer --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent eb81eb2 commit cacfd4c

6 files changed

Lines changed: 544 additions & 31 deletions

File tree

sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed per-record error isolation in Kafka sink transformer to honor DLQ and tolerance settings, instead of failing the entire batch when a single record fails during transformation. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286)
11+
* Guarded `ErrantRecordReporter.report()` in `CosmosWriterBase` against secondary failures so DLQ errors do not mask original write failures. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286)
1012

1113
#### Other Changes
1214

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ public void start(Map<String, String> props) {
5050
this.sinkTaskConfig.getClientMetadataCachesSnapshot());
5151
LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId());
5252
this.throughputControlClientItem = this.getThroughputControlCosmosClient();
53-
this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig);
53+
54+
this.sinkRecordTransformer = new SinkRecordTransformer(
55+
this.sinkTaskConfig,
56+
this.context.errantRecordReporter(),
57+
this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel());
5458

5559
if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) {
5660
this.cosmosWriter =
@@ -129,7 +133,7 @@ record -> this.sinkTaskConfig
129133
List<SinkRecord> transformedRecords = sinkRecordTransformer.transform(containerName, entry.getValue());
130134
this.cosmosWriter.write(container, transformedRecords);
131135

132-
totalWrittenRecordsPerContainer.merge(containerName, (long) entry.getValue().size(), Long::sum);
136+
totalWrittenRecordsPerContainer.merge(containerName, (long) transformedRecords.size(), Long::sum);
133137
}
134138

135139
logWrittenRecordCount();

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@ protected boolean shouldRetry(Throwable exception, int attemptedCount, int maxRe
8383
}
8484

8585
protected void sendToDlqIfConfigured(SinkOperation sinkOperationContext) {
86-
if (this.errantRecordReporter != null) {
87-
errantRecordReporter.report(sinkOperationContext.getSinkRecord(), sinkOperationContext.getException());
88-
}
86+
DlqReportHelper.reportToDlqIfConfigured(
87+
this.errantRecordReporter,
88+
sinkOperationContext.getSinkRecord(),
89+
sinkOperationContext.getException(),
90+
LOGGER);
8991
}
9092
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.kafka.connect.implementation.sink;
5+
6+
import org.apache.kafka.connect.sink.ErrantRecordReporter;
7+
import org.apache.kafka.connect.sink.SinkRecord;
8+
import org.slf4j.Logger;
9+
10+
/**
11+
* Shared helper for DLQ (Dead Letter Queue) reporting.
12+
*
13+
* <p>Both {@link CosmosWriterBase} and {@link SinkRecordTransformer} need fire-and-forget
14+
* DLQ reporting that guards against reporter failures. This helper centralises that logic.
15+
*/
16+
final class DlqReportHelper {
17+
18+
private DlqReportHelper() {
19+
}
20+
21+
/**
22+
* Reports a failed record to the DLQ if a reporter is configured.
23+
*
24+
* <p>Per Kafka Connect best practices, DLQ reporting is a side-effect for observability —
25+
* reporter failures are swallowed so they do not mask the original processing error.
26+
*
27+
* @param reporter the errant record reporter, may be {@code null}
28+
* @param record the sink record that failed processing
29+
* @param error the original processing error
30+
* @param logger the caller's logger for error reporting
31+
*/
32+
static void reportToDlqIfConfigured(
33+
ErrantRecordReporter reporter,
34+
SinkRecord record,
35+
Throwable error,
36+
Logger logger) {
37+
38+
if (reporter == null) {
39+
return;
40+
}
41+
try {
42+
reporter.report(record, error);
43+
} catch (Exception reportException) {
44+
logger.error(
45+
"Failed to report errant record to DLQ for topic {}, partition {}, offset {}.",
46+
record.topic(),
47+
record.kafkaPartition(),
48+
record.kafkaOffset(),
49+
reportException);
50+
}
51+
}
52+
}

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.ProvidedInValueStrategy;
1212
import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.TemplateStrategy;
1313
import org.apache.kafka.connect.data.Struct;
14+
import org.apache.kafka.connect.sink.ErrantRecordReporter;
1415
import org.apache.kafka.connect.sink.SinkRecord;
1516
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
@@ -24,9 +25,24 @@ public class SinkRecordTransformer {
2425
private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordTransformer.class);
2526

2627
private final IdStrategy idStrategy;
28+
private final ErrantRecordReporter errantRecordReporter;
29+
private final ToleranceOnErrorLevel toleranceOnErrorLevel;
30+
31+
public SinkRecordTransformer(
32+
CosmosSinkTaskConfig sinkTaskConfig,
33+
ErrantRecordReporter errantRecordReporter,
34+
ToleranceOnErrorLevel toleranceOnErrorLevel) {
35+
this(createIdStrategy(sinkTaskConfig), errantRecordReporter, toleranceOnErrorLevel);
36+
}
2737

28-
public SinkRecordTransformer(CosmosSinkTaskConfig sinkTaskConfig) {
29-
this.idStrategy = this.createIdStrategy(sinkTaskConfig);
38+
// Package-private constructor for unit testing without requiring CosmosSinkTaskConfig.
39+
SinkRecordTransformer(
40+
IdStrategy idStrategy,
41+
ErrantRecordReporter errantRecordReporter,
42+
ToleranceOnErrorLevel toleranceOnErrorLevel) {
43+
this.idStrategy = idStrategy;
44+
this.errantRecordReporter = errantRecordReporter;
45+
this.toleranceOnErrorLevel = toleranceOnErrorLevel;
3046
}
3147

3248
@SuppressWarnings("unchecked")
@@ -44,30 +60,55 @@ public List<SinkRecord> transform(String containerName, List<SinkRecord> sinkRec
4460
record.value() == null ? null : record.value().getClass().getName(),
4561
record.value() == null ? null : record.valueSchema());
4662

47-
Object recordValue;
48-
if (record.value() instanceof Struct) {
49-
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
50-
} else if (record.value() instanceof Map) {
51-
recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value());
52-
} else {
53-
recordValue = record.value();
63+
try {
64+
Object recordValue;
65+
if (record.value() instanceof Struct) {
66+
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
67+
} else if (record.value() instanceof Map) {
68+
recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value());
69+
} else {
70+
recordValue = record.value();
71+
}
72+
73+
maybeInsertId(recordValue, record);
74+
75+
final SinkRecord updatedRecord = new SinkRecord(record.topic(),
76+
record.kafkaPartition(),
77+
record.keySchema(),
78+
record.key(),
79+
record.valueSchema(),
80+
recordValue,
81+
record.kafkaOffset(),
82+
record.timestamp(),
83+
record.timestampType(),
84+
record.headers());
85+
86+
toBeWrittenRecordList.add(updatedRecord);
87+
} catch (RuntimeException e) {
88+
// Report to DLQ if configured (fire-and-forget, guarded against reporter failures).
89+
DlqReportHelper.reportToDlqIfConfigured(this.errantRecordReporter, record, e, LOGGER);
90+
91+
// Use tolerance level to decide continue-vs-throw.
92+
if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) {
93+
LOGGER.warn(
94+
"Skipping record from topic {}, partition {}, offset {}, container {} due to transform error.",
95+
record.topic(),
96+
record.kafkaPartition(),
97+
record.kafkaOffset(),
98+
containerName,
99+
e);
100+
} else {
101+
LOGGER.error(
102+
"Failing task due to transform error for record from topic {}, partition {}, offset {}, "
103+
+ "container {}.",
104+
record.topic(),
105+
record.kafkaPartition(),
106+
record.kafkaOffset(),
107+
containerName,
108+
e);
109+
throw e;
110+
}
54111
}
55-
56-
maybeInsertId(recordValue, record);
57-
58-
// Create an updated record with from the current record and the updated record value
59-
final SinkRecord updatedRecord = new SinkRecord(record.topic(),
60-
record.kafkaPartition(),
61-
record.keySchema(),
62-
record.key(),
63-
record.valueSchema(),
64-
recordValue,
65-
record.kafkaOffset(),
66-
record.timestamp(),
67-
record.timestampType(),
68-
record.headers());
69-
70-
toBeWrittenRecordList.add(updatedRecord);
71112
}
72113

73114
return toBeWrittenRecordList;
@@ -82,7 +123,7 @@ private void maybeInsertId(Object recordValue, SinkRecord sinkRecord) {
82123
recordMap.put("id", this.idStrategy.generateId(sinkRecord));
83124
}
84125

85-
private IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) {
126+
private static IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) {
86127
IdStrategy idStrategyClass;
87128
switch (sinkTaskConfig.getIdStrategy()) {
88129
case FULL_KEY_STRATEGY:

0 commit comments

Comments
 (0)