Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.AggregationWithHeaders;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -75,23 +72,27 @@ private byte[] serialize(final String topic, final AGG plainAggregation, final H

final byte[] rawAggregation = aggregationSerializer.serialize(topic, headers, plainAggregation);

// Since we can't control the result of the internal serializer, we make sure that the result
// is not null as well.
// Serializing non-null values to null can be useful when working with Optional-like values
// where the Optional.empty case is serialized to null.
// See the discussion here: https://github.com/apache/kafka/pull/7679
if (rawAggregation == null) {
return null;
}

final byte[] rawHeaders = HeadersSerializer.serialize(headers);
final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = HeadersSerializer.prepareSerialization(headers);

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
final int payloadSize = preSerializedHeaders.requiredBufferSizeForHeaders + rawAggregation.length;

ByteUtils.writeVarint(rawHeaders.length, out);
out.write(rawHeaders);
out.write(rawAggregation);
// Format: [headersSize(varint)][headersBytes][value]
final ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders) + payloadSize);
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, buffer);

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to serialize AggregationWithHeaders on topic: " + topic, e);
}
// empty (byte[0]) for null/empty headers, or [count][header1][header2]... for non-empty
return HeadersSerializer.serialize(preSerializedHeaders, buffer)
.put(rawAggregation)
.array();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.ByteUtils;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
Expand All @@ -48,50 +45,91 @@
*/
class HeadersSerializer {

static final class PreSerializedHeaders {
final int requiredBufferSizeForHeaders;
final byte[][] rawHeaderKeys;
final byte[][] rawHeaderValues;

PreSerializedHeaders(
final int requiredBufferSizeForHeaders,
final byte[][] rawHeaderKeys,
final byte[][] rawHeaderValues
) {
this.requiredBufferSizeForHeaders = requiredBufferSizeForHeaders;
this.rawHeaderKeys = rawHeaderKeys;
this.rawHeaderValues = rawHeaderValues;
}
}

public static PreSerializedHeaders prepareSerialization(final Headers headers) {
final Header[] headersArray = (headers == null) ? new Header[0] : headers.toArray();

if (headersArray.length == 0) {
return new PreSerializedHeaders(0, null, null);
}

// we first compute the size for the buffer we need,
// so we can allocate the whole buffer at once later

// cache to avoid translating String header-keys to byte[] twice
final byte[][] serializerHeaderKeys = new byte[headersArray.length][];
final byte[][] serializedHeaderValues = new byte[headersArray.length][];

// start with varint encoding of header count
int requiredBufferSizeForHeaders = ByteUtils.sizeOfVarint(headersArray.length);

int i = 0;
for (final Header header : headersArray) {
serializerHeaderKeys[i] = header.key().getBytes(StandardCharsets.UTF_8);
requiredBufferSizeForHeaders += ByteUtils.sizeOfVarint(serializerHeaderKeys[i].length) + serializerHeaderKeys[i].length;

serializedHeaderValues[i] = header.value();
if (serializedHeaderValues[i] == null) {
++requiredBufferSizeForHeaders;
} else {
requiredBufferSizeForHeaders += ByteUtils.sizeOfVarint(serializedHeaderValues[i].length) + serializedHeaderValues[i].length;
}

++i;
}

return new PreSerializedHeaders(requiredBufferSizeForHeaders, serializerHeaderKeys, serializedHeaderValues);
}

/**
* Serializes headers into a byte array using varint encoding per KIP-1271.
* Serializes headers into a ByteBuffer using varint encoding per KIP-1271.
* <p>
* The output format is [count][header1][header2]... without a size prefix.
* The size prefix is added by the outer serializer that uses this.
* <p>
* For null or empty headers, returns an empty byte array (0 bytes)
* instead of encoding headerCount=0 (1 byte).
*
* @param headers the headers to serialize (can be null)
* @return the serialized byte array (empty array if headers are null or empty)
* @param preSerializedHeaders the preSerializedHeaders
* @param buffer the buffer to write the serialized header into (it's expected that the buffer position is set correctly)
* @return the modified {@code buffer} containing the serializer headers (empty array if headers are null or empty),\
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is \ typo?

* with corresponding advanced position
*/
public static byte[] serialize(final Headers headers) {
final Header[] headersArray = (headers == null) ? new Header[0] : headers.toArray();

if (headersArray.length == 0) {
return new byte[0];
public static ByteBuffer serialize(final PreSerializedHeaders preSerializedHeaders, final ByteBuffer buffer) {
if (preSerializedHeaders.requiredBufferSizeForHeaders == 0) {
return buffer;
}

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {

ByteUtils.writeVarint(headersArray.length, out);
final int numberOfHeaders = preSerializedHeaders.rawHeaderKeys.length;

for (final Header header : headersArray) {
final byte[] keyBytes = header.key().getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = header.value();
ByteUtils.writeVarint(numberOfHeaders, buffer);
for (int i = 0; i < numberOfHeaders; ++i) {
ByteUtils.writeVarint(preSerializedHeaders.rawHeaderKeys[i].length, buffer);
buffer.put(preSerializedHeaders.rawHeaderKeys[i]);

ByteUtils.writeVarint(keyBytes.length, out);
out.write(keyBytes);

// Write value length and value bytes (varint + raw bytes)
// null is represented as -1, encoded as varint
if (valueBytes == null) {
ByteUtils.writeVarint(-1, out);
} else {
ByteUtils.writeVarint(valueBytes.length, out);
out.write(valueBytes);
}
if (preSerializedHeaders.rawHeaderValues[i] != null) {
ByteUtils.writeVarint(preSerializedHeaders.rawHeaderValues[i].length, buffer);
buffer.put(preSerializedHeaders.rawHeaderValues[i]);
} else {
buffer.put((byte) 0x01); // hardcoded varint encoding for `-1`
}

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to serialize headers", e);
}

return buffer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.utils.ByteUtils;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public final class RecordConverters {
Expand All @@ -33,7 +28,7 @@ public final class RecordConverters {
private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record -> {
final byte[] rawValue = record.value();
final long timestamp = record.timestamp();
final byte[] recordValue = rawValue == null ? null :
final byte[] recordValueWithTimestamp = rawValue == null ? null :
ByteBuffer.allocate(8 + rawValue.length)
.putLong(timestamp)
.put(rawValue)
Expand All @@ -45,9 +40,9 @@ public final class RecordConverters {
timestamp,
record.timestampType(),
record.serializedKeySize(),
record.serializedValueSize(),
recordValueWithTimestamp != null ? recordValueWithTimestamp.length : 0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stumbled over this by chance -- it's a long standing minor bug... Fixing on the side.

record.key(),
recordValue,
recordValueWithTimestamp,
record.headers(),
record.leaderEpoch()
);
Expand All @@ -57,7 +52,7 @@ public final class RecordConverters {
final byte[] rawValue = record.value();

// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
final byte[] recordValue = reconstructFromRaw(
final byte[] recordValueWithTimestampAndHeaders = reconstructFromRaw(
rawValue,
record.timestamp(),
record.headers()
Expand All @@ -70,9 +65,9 @@ public final class RecordConverters {
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
record.serializedValueSize(),
recordValueWithTimestampAndHeaders != null ? recordValueWithTimestampAndHeaders.length : 0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same...

record.key(),
recordValue,
recordValueWithTimestampAndHeaders,
record.headers(),
record.leaderEpoch()
);
Expand All @@ -86,7 +81,7 @@ public static RecordConverter rawValueToHeadersValue() {
final byte[] rawValue = record.value();

// Format: [headersSize(varint)][headersBytes][aggregation] (no timestamp)
final byte[] recordValue = reconstructSessionFromRaw(
final byte[] recordValueWithHeaders = reconstructSessionFromRaw(
rawValue,
record.headers()
);
Expand All @@ -98,9 +93,9 @@ public static RecordConverter rawValueToHeadersValue() {
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
record.serializedValueSize(),
recordValueWithHeaders != null ? recordValueWithHeaders.length : 0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And one more

record.key(),
recordValue,
recordValueWithHeaders,
record.headers(),
record.leaderEpoch()
);
Expand Down Expand Up @@ -133,19 +128,18 @@ static byte[] reconstructSessionFromRaw(final byte[] rawValue, final Headers hea
if (rawValue == null) {
return null;
}
final byte[] rawHeaders = HeadersSerializer.serialize(headers);

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = HeadersSerializer.prepareSerialization(headers);

ByteUtils.writeVarint(rawHeaders.length, out);
out.write(rawHeaders);
out.write(rawValue);
final int payloadSize = preSerializedHeaders.requiredBufferSizeForHeaders + rawValue.length;

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to reconstruct AggregationWithHeaders", e);
}
// Format: [headersSize(varint)][headersBytes][value]
final ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders) + payloadSize);
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, buffer);

return HeadersSerializer.serialize(preSerializedHeaders, buffer)
.put(rawValue)
.array();
}

/**
Expand All @@ -161,23 +155,18 @@ static byte[] reconstructFromRaw(final byte[] rawValue, final long timestamp, fi
if (rawValue == null) {
return null;
}
final byte[] rawTimestamp;
try (LongSerializer timestampSerializer = new LongSerializer()) {
rawTimestamp = timestampSerializer.serialize("", timestamp);
}
final byte[] rawHeaders = HeadersSerializer.serialize(headers);

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = HeadersSerializer.prepareSerialization(headers);

ByteUtils.writeVarint(rawHeaders.length, out);
out.write(rawHeaders);
out.write(rawTimestamp);
out.write(rawValue);
final int payloadSize = preSerializedHeaders.requiredBufferSizeForHeaders + 8 + rawValue.length;

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to reconstruct ValueTimestampHeaders", e);
}
// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
final ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders) + payloadSize);
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, buffer);

return HeadersSerializer.serialize(preSerializedHeaders, buffer)
.putLong(timestamp)
.put(rawValue)
.array();
}
}
Loading
Loading