Skip to content

Commit 7cd809b

Browse files
committed
KAFKA-16505: add source raw bytes in processorContex
1 parent cebec91 commit 7cd809b

File tree

12 files changed

+253
-11
lines changed

12 files changed

+253
-11
lines changed

streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,39 @@ public interface ErrorHandlerContext {
147147
* @return The timestamp.
148148
*/
149149
long timestamp();
150+
151+
/**
152+
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
153+
*
154+
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
155+
* punctuation callback}, or while processing a record that was forwarded by a punctuation
156+
* callback, it will return null.
157+
*
158+
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
159+
* to the repartition topic.
160+
*
161+
* <p> Always returns null if this method is invoked within a
162+
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
163+
*
164+
* @return the raw byte of the key of the source message
165+
*/
166+
byte[] sourceRawKey();
167+
168+
169+
/**
170+
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
171+
*
172+
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
173+
* punctuation callback}, or while processing a record that was forwarded by a punctuation
174+
* callback, it will return null.
175+
*
176+
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
177+
* to the repartition topic.
178+
*
179+
* <p> Always returns null if this method is invoked within a
180+
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
181+
*
182+
* @return the raw byte of the value of the source message
183+
*/
184+
byte[] sourceRawValue();
150185
}

streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
3333
private final Headers headers;
3434
private final String processorNodeId;
3535
private final TaskId taskId;
36+
private final byte[] sourceRawKey;
37+
private final byte[] sourceRawValue;
3638

3739
private final long timestamp;
3840
private final ProcessorContext processorContext;
@@ -44,7 +46,9 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
4446
final Headers headers,
4547
final String processorNodeId,
4648
final TaskId taskId,
47-
final long timestamp) {
49+
final long timestamp,
50+
final byte[] sourceRawKey,
51+
final byte[] sourceRawValue) {
4852
this.topic = topic;
4953
this.partition = partition;
5054
this.offset = offset;
@@ -53,6 +57,8 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
5357
this.taskId = taskId;
5458
this.processorContext = processorContext;
5559
this.timestamp = timestamp;
60+
this.sourceRawKey = sourceRawKey;
61+
this.sourceRawValue = sourceRawValue;
5662
}
5763

5864
@Override
@@ -90,6 +96,14 @@ public long timestamp() {
9096
return timestamp;
9197
}
9298

99+
public byte[] sourceRawKey() {
100+
return sourceRawKey;
101+
}
102+
103+
public byte[] sourceRawValue() {
104+
return sourceRawValue;
105+
}
106+
93107
@Override
94108
public String toString() {
95109
// we do exclude headers on purpose, to not accidentally log user data

streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,39 @@ public interface RecordContext {
110110
*/
111111
Headers headers();
112112

113+
/**
114+
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
115+
*
116+
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
117+
* punctuation callback}, or while processing a record that was forwarded by a punctuation
118+
* callback, it will return null.
119+
*
120+
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
121+
* to the repartition topic.
122+
*
123+
* <p> Always returns null if this method is invoked within a
124+
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
125+
*
126+
* @return the raw byte of the key of the source message
127+
*/
128+
byte[] sourceRawKey();
129+
130+
131+
/**
132+
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
133+
*
134+
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
135+
* punctuation callback}, or while processing a record that was forwarded by a punctuation
136+
* callback, it will return null.
137+
*
138+
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
139+
* to the repartition topic.
140+
*
141+
* <p> Always returns null if this method is invoked within a
142+
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
143+
*
144+
* @return the raw byte of the value of the source message
145+
*/
146+
byte[] sourceRawValue();
147+
113148
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,9 @@ public void process(final Record<KIn, VIn> record) {
215215
internalProcessorContext.headers(),
216216
internalProcessorContext.currentNode().name(),
217217
internalProcessorContext.taskId(),
218-
internalProcessorContext.timestamp());
218+
internalProcessorContext.timestamp(),
219+
internalProcessorContext.recordContext().sourceRawKey(),
220+
internalProcessorContext.recordContext().sourceRawValue());
219221

220222
final ProcessingExceptionHandler.ProcessingHandlerResponse response;
221223
try {

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
3737
private final String topic;
3838
private final int partition;
3939
private final Headers headers;
40+
private byte[] sourceRawKey;
41+
private byte[] sourceRawValue;
4042

4143
public ProcessorRecordContext(final long timestamp,
4244
final long offset,
@@ -48,6 +50,25 @@ public ProcessorRecordContext(final long timestamp,
4850
this.topic = topic;
4951
this.partition = partition;
5052
this.headers = Objects.requireNonNull(headers);
53+
this.sourceRawKey = null;
54+
this.sourceRawValue = null;
55+
}
56+
57+
58+
public ProcessorRecordContext(final long timestamp,
59+
final long offset,
60+
final int partition,
61+
final String topic,
62+
final Headers headers,
63+
final byte[] sourceRawKey,
64+
final byte[] sourceRawValue) {
65+
this.timestamp = timestamp;
66+
this.offset = offset;
67+
this.topic = topic;
68+
this.partition = partition;
69+
this.headers = Objects.requireNonNull(headers);
70+
this.sourceRawKey = sourceRawKey;
71+
this.sourceRawValue = sourceRawValue;
5172
}
5273

5374
@Override
@@ -75,6 +96,16 @@ public Headers headers() {
7596
return headers;
7697
}
7798

99+
@Override
100+
public byte[] sourceRawKey() {
101+
return sourceRawKey;
102+
}
103+
104+
@Override
105+
public byte[] sourceRawValue() {
106+
return sourceRawValue;
107+
}
108+
78109
public long residentMemorySizeEstimate() {
79110
long size = 0;
80111
size += Long.BYTES; // value.context.timestamp
@@ -176,6 +207,12 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
176207
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
177208
}
178209

210+
public void freeRawRecord() {
211+
this.sourceRawKey = null;
212+
this.sourceRawValue = null;
213+
}
214+
215+
179216
@Override
180217
public boolean equals(final Object o) {
181218
if (this == o) {

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ public <K, V> void send(final String topic,
258258

259259
final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
260260

261+
// As many records could be in-flight,
262+
// freeing raw records in the context to reduce memory pressure
263+
freeContext(context);
264+
261265
streamsProducer.send(serializedRecord, (metadata, exception) -> {
262266
try {
263267
// if there's already an exception record, skip logging offsets or new exceptions
@@ -310,6 +314,12 @@ public <K, V> void send(final String topic,
310314
});
311315
}
312316

317+
private static void freeContext(final InternalProcessorContext<Void, Void> context) {
318+
if (context != null && context.recordContext() != null) {
319+
context.recordContext().freeRawRecord();
320+
}
321+
}
322+
313323
private <K, V> void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
314324
final String topic,
315325
final K key,
@@ -387,7 +397,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
387397
recordContext.headers(),
388398
processorNodeId,
389399
taskId,
390-
recordContext.timestamp()
400+
recordContext.timestamp(),
401+
context.recordContext().sourceRawKey(),
402+
context.recordContext().sourceRawValue()
391403
) :
392404
new DefaultErrorHandlerContext(
393405
context,
@@ -397,7 +409,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
397409
new RecordHeaders(),
398410
processorNodeId,
399411
taskId,
400-
-1L
412+
-1L,
413+
null,
414+
null
401415
);
402416
}
403417

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa
9595
rawRecord.headers(),
9696
sourceNodeName,
9797
processorContext.taskId(),
98-
rawRecord.timestamp());
98+
rawRecord.timestamp(),
99+
rawRecord.key(),
100+
rawRecord.value());
99101

100102
final DeserializationHandlerResponse response;
101103
try {

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ private void updateHead() {
243243
lastCorruptedRecord = raw;
244244
continue;
245245
}
246-
headRecord = new StampedRecord(deserialized, timestamp);
246+
headRecord = new StampedRecord(deserialized, timestamp, raw.key(), raw.value());
247247
headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
248248
}
249249

streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,22 @@
2323

2424
public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {
2525

26+
private final byte[] rawKey;
27+
private final byte[] rawValue;
28+
2629
public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) {
2730
super(record, timestamp);
31+
this.rawKey = null;
32+
this.rawValue = null;
33+
}
34+
35+
public StampedRecord(final ConsumerRecord<?, ?> record,
36+
final long timestamp,
37+
final byte[] rawKey,
38+
final byte[] rawValue) {
39+
super(record, timestamp);
40+
this.rawKey = rawKey;
41+
this.rawValue = rawValue;
2842
}
2943

3044
public String topic() {
@@ -55,8 +69,26 @@ public Headers headers() {
5569
return value.headers();
5670
}
5771

72+
public byte[] rawKey() {
73+
return rawKey;
74+
}
75+
76+
public byte[] rawValue() {
77+
return rawValue;
78+
}
79+
5880
@Override
5981
public String toString() {
6082
return value.toString() + ", timestamp = " + timestamp;
6183
}
84+
85+
@Override
86+
public boolean equals(final Object other) {
87+
return super.equals(other);
88+
}
89+
90+
@Override
91+
public int hashCode() {
92+
return super.hashCode();
93+
}
6294
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,9 @@ private void doProcess(final long wallClockTime) {
853853
record.offset(),
854854
record.partition(),
855855
record.topic(),
856-
record.headers()
856+
record.headers(),
857+
record.rawKey(),
858+
record.rawValue()
857859
);
858860
updateProcessorContext(currNode, wallClockTime, recordContext);
859861

@@ -935,11 +937,12 @@ record = null;
935937
recordContext.headers(),
936938
node.name(),
937939
id(),
938-
recordContext.timestamp()
940+
recordContext.timestamp(),
941+
recordContext.sourceRawKey(),
942+
recordContext.sourceRawValue()
939943
);
940944

941-
final ProcessingExceptionHandler.ProcessingHandlerResponse response;
942-
try {
945+
final ProcessingExceptionHandler.ProcessingHandlerResponse response; try {
943946
response = Objects.requireNonNull(
944947
processingExceptionHandler.handle(errorHandlerContext, null, processingException),
945948
"Invalid ProcessingExceptionHandler response."

0 commit comments

Comments
 (0)