Skip to content

KAFKA-16505: Add source raw bytes in processorContex #17960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerCon
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value()));
assertEquals("TOPIC_NAME", context.topic());
assertEquals("KSTREAM-PROCESSOR-0000000003", context.processorNodeId());
assertTrue(Arrays.equals("ID123-2-ERR".getBytes(), context.sourceRawKey())
|| Arrays.equals("ID123-5-ERR".getBytes(), context.sourceRawKey()));
assertTrue(Arrays.equals("ID123-A2".getBytes(), context.sourceRawValue())
|| Arrays.equals("ID123-A5".getBytes(), context.sourceRawValue()));
assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,38 @@ public interface ErrorHandlerContext {
* @return The timestamp.
*/
long timestamp();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final Headers headers;
private final String processorNodeId;
private final TaskId taskId;
private final byte[] sourceRawKey;
private final byte[] sourceRawValue;

private final long timestamp;
private final ProcessorContext processorContext;
Expand All @@ -44,7 +46,9 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
final Headers headers,
final String processorNodeId,
final TaskId taskId,
final long timestamp) {
final long timestamp,
final byte[] sourceRawKey,
final byte[] sourceRawValue) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
Expand All @@ -53,6 +57,8 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
this.taskId = taskId;
this.processorContext = processorContext;
this.timestamp = timestamp;
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
}

@Override
Expand Down Expand Up @@ -90,6 +96,14 @@ public long timestamp() {
return timestamp;
}

public byte[] sourceRawKey() {
return sourceRawKey;
}

public byte[] sourceRawValue() {
return sourceRawValue;
}

@Override
public String toString() {
// we do exclude headers on purpose, to not accidentally log user data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,37 @@ public interface RecordContext {
*/
Headers headers();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
recordContext.offset(),
recordContext.partition(),
recordContext.topic(),
record.headers());
record.headers(),
recordContext.sourceRawKey(),
recordContext.sourceRawValue()
);
}

if (childName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ public void process(final Record<KIn, VIn> record) {
internalProcessorContext.recordContext().headers(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(),
internalProcessorContext.recordContext().timestamp()
internalProcessorContext.recordContext().timestamp(),
internalProcessorContext.recordContext().sourceRawKey(),
internalProcessorContext.recordContext().sourceRawValue()
);

final ProcessingExceptionHandler.ProcessingHandlerResponse response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
private final String topic;
private final int partition;
private final Headers headers;
private byte[] sourceRawKey;
private byte[] sourceRawValue;

public ProcessorRecordContext(final long timestamp,
final long offset,
Expand All @@ -48,6 +50,24 @@ public ProcessorRecordContext(final long timestamp,
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.sourceRawKey = null;
this.sourceRawValue = null;
}

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers,
final byte[] sourceRawKey,
final byte[] sourceRawValue) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
}

@Override
Expand Down Expand Up @@ -75,6 +95,16 @@ public Headers headers() {
return headers;
}

@Override
public byte[] sourceRawKey() {
return sourceRawKey;
}

@Override
public byte[] sourceRawValue() {
return sourceRawValue;
}

public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
Expand Down Expand Up @@ -176,6 +206,11 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
}

public void freeRawRecord() {
this.sourceRawKey = null;
this.sourceRawValue = null;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ public <K, V> void send(final String topic,

final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);

// As many records could be in-flight,
// freeing raw records in the context to reduce memory pressure
freeRawInputRecordFromContext(context);

streamsProducer.send(serializedRecord, (metadata, exception) -> {
try {
// if there's already an exception record, skip logging offsets or new exceptions
Expand Down Expand Up @@ -311,6 +315,12 @@ public <K, V> void send(final String topic,
});
}

private static void freeRawInputRecordFromContext(final InternalProcessorContext<Void, Void> context) {
if (context != null && context.recordContext() != null) {
context.recordContext().freeRawRecord();
}
}

private <K, V> void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
final String topic,
final K key,
Expand Down Expand Up @@ -388,7 +398,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
recordContext.headers(),
processorNodeId,
taskId,
recordContext.timestamp()
recordContext.timestamp(),
context.recordContext().sourceRawKey(),
context.recordContext().sourceRawValue()
) :
new DefaultErrorHandlerContext(
context,
Expand All @@ -398,7 +410,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
new RecordHeaders(),
processorNodeId,
taskId,
-1L
-1L,
null,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa
rawRecord.headers(),
sourceNodeName,
processorContext.taskId(),
rawRecord.timestamp());
rawRecord.timestamp(),
rawRecord.key(),
rawRecord.value()
);

final DeserializationHandlerResponse response;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void updateHead() {
lastCorruptedRecord = raw;
continue;
}
headRecord = new StampedRecord(deserialized, timestamp);
headRecord = new StampedRecord(deserialized, timestamp, raw.key(), raw.value());
headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,22 @@

public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {

private final byte[] rawKey;
private final byte[] rawValue;

public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) {
super(record, timestamp);
this.rawKey = null;
this.rawValue = null;
}

public StampedRecord(final ConsumerRecord<?, ?> record,
final long timestamp,
final byte[] rawKey,
final byte[] rawValue) {
super(record, timestamp);
this.rawKey = rawKey;
this.rawValue = rawValue;
}

public String topic() {
Expand Down Expand Up @@ -55,8 +69,26 @@ public Headers headers() {
return value.headers();
}

public byte[] rawKey() {
return rawKey;
}

public byte[] rawValue() {
return rawValue;
}

@Override
public String toString() {
return value.toString() + ", timestamp = " + timestamp;
}

@Override
public boolean equals(final Object other) {
return super.equals(other);
}

@Override
public int hashCode() {
return super.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,9 @@ private void doProcess(final long wallClockTime) {
record.offset(),
record.partition(),
record.topic(),
record.headers()
record.headers(),
record.rawKey(),
record.rawValue()
);
updateProcessorContext(currNode, wallClockTime, recordContext);

Expand Down Expand Up @@ -935,7 +937,9 @@ record = null;
recordContext.headers(),
node.name(),
id(),
recordContext.timestamp()
recordContext.timestamp(),
recordContext.sourceRawKey(),
recordContext.sourceRawValue()
);

final ProcessingExceptionHandler.ProcessingHandlerResponse response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class ProcessorNodeTest {
private static final String NAME = "name";
private static final String KEY = "key";
private static final String VALUE = "value";
private static final byte[] RAW_KEY = KEY.getBytes();
private static final byte[] RAW_VALUE = VALUE.getBytes();

@Test
public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
Expand Down Expand Up @@ -331,7 +333,9 @@ private InternalProcessorContext<Object, Object> mockInternalProcessorContext()
OFFSET,
PARTITION,
TOPIC,
new RecordHeaders()));
new RecordHeaders(),
RAW_KEY,
RAW_VALUE));
when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME));

return internalProcessorContext;
Expand Down Expand Up @@ -359,6 +363,9 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa
assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId());
assertEquals(internalProcessorContext.taskId(), context.taskId());
assertEquals(internalProcessorContext.recordContext().timestamp(), context.timestamp());
assertEquals(internalProcessorContext.recordContext().sourceRawKey(), context.sourceRawKey());
assertEquals(internalProcessorContext.recordContext().sourceRawValue(), context.sourceRawValue());

assertEquals(KEY, record.key());
assertEquals(VALUE, record.value());
assertInstanceOf(RuntimeException.class, exception);
Expand Down
Loading