diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java index 1781d1fa970aa..4792052aa9e5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.ByteUtils; @@ -24,9 +23,7 @@ import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.state.AggregationWithHeaders; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Map; import java.util.Objects; @@ -75,23 +72,27 @@ private byte[] serialize(final String topic, final AGG plainAggregation, final H final byte[] rawAggregation = aggregationSerializer.serialize(topic, headers, plainAggregation); + // Since we can't control the result of the internal serializer, we make sure that the result + // is not null as well. + // Serializing non-null values to null can be useful when working with Optional-like values + // where the Optional.empty case is serialized to null. + // See the discussion here: https://github.com/apache/kafka/pull/7679 if (rawAggregation == null) { return null; } - final byte[] rawHeaders = HeadersSerializer.serialize(headers); + final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = HeadersSerializer.prepareSerialization(headers); - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final DataOutputStream out = new DataOutputStream(baos)) { + final int payloadSize = preSerializedHeaders.requiredBufferSizeForHeaders + rawAggregation.length; - ByteUtils.writeVarint(rawHeaders.length, out); - out.write(rawHeaders); - out.write(rawAggregation); + // Format: [headersSize(varint)][headersBytes][value] + final ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders) + payloadSize); + ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, buffer); - return baos.toByteArray(); - } catch (final IOException e) { - throw new SerializationException("Failed to serialize AggregationWithHeaders on topic: " + topic, e); - } + // empty (byte[0]) for null/empty headers, or [count][header1][header2]... for non-empty + return HeadersSerializer.serialize(preSerializedHeaders, buffer) + .put(rawAggregation) + .array(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java index f8c664c782047..6608a55f67b2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java @@ -16,14 +16,11 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.ByteUtils; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; /** @@ -48,8 +45,59 @@ */ class HeadersSerializer { + static final class PreSerializedHeaders { + final int requiredBufferSizeForHeaders; + final byte[][] rawHeaderKeys; + final byte[][] rawHeaderValues; + + PreSerializedHeaders( + final int requiredBufferSizeForHeaders, + final byte[][] rawHeaderKeys, + final byte[][] rawHeaderValues + ) { + this.requiredBufferSizeForHeaders = requiredBufferSizeForHeaders; + this.rawHeaderKeys = rawHeaderKeys; + this.rawHeaderValues = rawHeaderValues; + } + } + + public static PreSerializedHeaders prepareSerialization(final Headers headers) { + final Header[] headersArray = (headers == null) ? new Header[0] : headers.toArray(); + + if (headersArray.length == 0) { + return new PreSerializedHeaders(0, null, null); + } + + // we first compute the size for the buffer we need, + // so we can allocate the whole buffer at once later + + // cache to avoid translating String header-keys to byte[] twice + final byte[][] serializerHeaderKeys = new byte[headersArray.length][]; + final byte[][] serializedHeaderValues = new byte[headersArray.length][]; + + // start with varint encoding of header count + int requiredBufferSizeForHeaders = ByteUtils.sizeOfVarint(headersArray.length); + + int i = 0; + for (final Header header : headersArray) { + serializerHeaderKeys[i] = header.key().getBytes(StandardCharsets.UTF_8); + requiredBufferSizeForHeaders += ByteUtils.sizeOfVarint(serializerHeaderKeys[i].length) + serializerHeaderKeys[i].length; + + serializedHeaderValues[i] = header.value(); + if (serializedHeaderValues[i] == null) { + ++requiredBufferSizeForHeaders; + } else { + requiredBufferSizeForHeaders += ByteUtils.sizeOfVarint(serializedHeaderValues[i].length) + serializedHeaderValues[i].length; + } + + ++i; + } + + return new PreSerializedHeaders(requiredBufferSizeForHeaders, serializerHeaderKeys, serializedHeaderValues); + } + /** - * Serializes headers into a byte array using varint encoding per KIP-1271. + * Serializes headers into a ByteBuffer using varint encoding per KIP-1271. *
* The output format is [count][header1][header2]... without a size prefix.
* The size prefix is added by the outer serializer that uses this.
@@ -57,41 +105,31 @@ class HeadersSerializer {
* For null or empty headers, returns an empty byte array (0 bytes)
* instead of encoding headerCount=0 (1 byte).
*
- * @param headers the headers to serialize (can be null)
- * @return the serialized byte array (empty array if headers are null or empty)
+ * @param preSerializedHeaders the preSerializedHeaders
+ * @param buffer the buffer to write the serialized header into (it's expected that the buffer position is set correctly)
+ * @return the modified {@code buffer} containing the serializer headers (empty array if headers are null or empty),
+ * with corresponding advanced position
*/
- public static byte[] serialize(final Headers headers) {
- final Header[] headersArray = (headers == null) ? new Header[0] : headers.toArray();
-
- if (headersArray.length == 0) {
- return new byte[0];
+ public static ByteBuffer serialize(final PreSerializedHeaders preSerializedHeaders, final ByteBuffer buffer) {
+ if (preSerializedHeaders.requiredBufferSizeForHeaders == 0) {
+ return buffer;
}
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
-
- ByteUtils.writeVarint(headersArray.length, out);
+ final int numberOfHeaders = preSerializedHeaders.rawHeaderKeys.length;
- for (final Header header : headersArray) {
- final byte[] keyBytes = header.key().getBytes(StandardCharsets.UTF_8);
- final byte[] valueBytes = header.value();
+ ByteUtils.writeVarint(numberOfHeaders, buffer);
+ for (int i = 0; i < numberOfHeaders; ++i) {
+ ByteUtils.writeVarint(preSerializedHeaders.rawHeaderKeys[i].length, buffer);
+ buffer.put(preSerializedHeaders.rawHeaderKeys[i]);
- ByteUtils.writeVarint(keyBytes.length, out);
- out.write(keyBytes);
-
- // Write value length and value bytes (varint + raw bytes)
- // null is represented as -1, encoded as varint
- if (valueBytes == null) {
- ByteUtils.writeVarint(-1, out);
- } else {
- ByteUtils.writeVarint(valueBytes.length, out);
- out.write(valueBytes);
- }
+ if (preSerializedHeaders.rawHeaderValues[i] != null) {
+ ByteUtils.writeVarint(preSerializedHeaders.rawHeaderValues[i].length, buffer);
+ buffer.put(preSerializedHeaders.rawHeaderValues[i]);
+ } else {
+ buffer.put((byte) 0x01); // hardcoded varint encoding for `-1`
}
-
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Failed to serialize headers", e);
}
+
+ return buffer;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index 118bc019b5f28..058edad52c8b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -17,14 +17,9 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.utils.ByteUtils;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
public final class RecordConverters {
@@ -33,7 +28,7 @@ public final class RecordConverters {
private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record -> {
final byte[] rawValue = record.value();
final long timestamp = record.timestamp();
- final byte[] recordValue = rawValue == null ? null :
+ final byte[] recordValueWithTimestamp = rawValue == null ? null :
ByteBuffer.allocate(8 + rawValue.length)
.putLong(timestamp)
.put(rawValue)
@@ -45,9 +40,9 @@ public final class RecordConverters {
timestamp,
record.timestampType(),
record.serializedKeySize(),
- record.serializedValueSize(),
+ recordValueWithTimestamp != null ? recordValueWithTimestamp.length : 0,
record.key(),
- recordValue,
+ recordValueWithTimestamp,
record.headers(),
record.leaderEpoch()
);
@@ -57,7 +52,7 @@ public final class RecordConverters {
final byte[] rawValue = record.value();
// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
- final byte[] recordValue = reconstructFromRaw(
+ final byte[] recordValueWithTimestampAndHeaders = reconstructFromRaw(
rawValue,
record.timestamp(),
record.headers()
@@ -70,9 +65,9 @@ public final class RecordConverters {
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
- record.serializedValueSize(),
+ recordValueWithTimestampAndHeaders != null ? recordValueWithTimestampAndHeaders.length : 0,
record.key(),
- recordValue,
+ recordValueWithTimestampAndHeaders,
record.headers(),
record.leaderEpoch()
);
@@ -86,7 +81,7 @@ public static RecordConverter rawValueToHeadersValue() {
final byte[] rawValue = record.value();
// Format: [headersSize(varint)][headersBytes][aggregation] (no timestamp)
- final byte[] recordValue = reconstructSessionFromRaw(
+ final byte[] recordValueWithHeaders = reconstructSessionFromRaw(
rawValue,
record.headers()
);
@@ -98,9 +93,9 @@ public static RecordConverter rawValueToHeadersValue() {
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
- record.serializedValueSize(),
+ recordValueWithHeaders != null ? recordValueWithHeaders.length : 0,
record.key(),
- recordValue,
+ recordValueWithHeaders,
record.headers(),
record.leaderEpoch()
);
@@ -133,19 +128,18 @@ static byte[] reconstructSessionFromRaw(final byte[] rawValue, final Headers hea
if (rawValue == null) {
return null;
}
- final byte[] rawHeaders = HeadersSerializer.serialize(headers);
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = HeadersSerializer.prepareSerialization(headers);
- ByteUtils.writeVarint(rawHeaders.length, out);
- out.write(rawHeaders);
- out.write(rawValue);
+ final int payloadSize = preSerializedHeaders.requiredBufferSizeForHeaders + rawValue.length;
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Failed to reconstruct AggregationWithHeaders", e);
- }
+ // Format: [headersSize(varint)][headersBytes][value]
+ final ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders) + payloadSize);
+ ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, buffer);
+
+ return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+ .put(rawValue)
+ .array();
}
/**
@@ -161,23 +155,18 @@ static byte[] reconstructFromRaw(final byte[] rawValue, final long timestamp, fi
if (rawValue == null) {
return null;
}
- final byte[] rawTimestamp;
- try (LongSerializer timestampSerializer = new LongSerializer()) {
- rawTimestamp = timestampSerializer.serialize("", timestamp);
- }
- final byte[] rawHeaders = HeadersSerializer.serialize(headers);
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = HeadersSerializer.prepareSerialization(headers);
- ByteUtils.writeVarint(rawHeaders.length, out);
- out.write(rawHeaders);
- out.write(rawTimestamp);
- out.write(rawValue);
+ final int payloadSize = preSerializedHeaders.requiredBufferSizeForHeaders + 8 + rawValue.length;
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Failed to reconstruct ValueTimestampHeaders", e);
- }
+ // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+ final ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders) + payloadSize);
+ ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, buffer);
+
+ return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+ .putLong(timestamp)
+ .put(rawValue)
+ .array();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
index 1da1ad1f65f6e..f4e6fe07a22d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
@@ -16,18 +16,14 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
@@ -51,18 +47,15 @@
*/
class ValueTimestampHeadersSerializer