Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.AggregationWithHeaders;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -75,23 +71,23 @@ private byte[] serialize(final String topic, final AGG plainAggregation, final H

final byte[] rawAggregation = aggregationSerializer.serialize(topic, headers, plainAggregation);

// Since we can't control the result of the internal serializer, we make sure that the result
// is not null as well.
// Serializing non-null values to null can be useful when working with Optional-like values
// where the Optional.empty case is serialized to null.
// See the discussion here: https://github.com/apache/kafka/pull/7679
if (rawAggregation == null) {
return null;
}

final byte[] rawHeaders = HeadersSerializer.serialize(headers);
// empty (byte[0]) for null/empty headers, or [count][header1][header2]... for non-empty
final ByteBuffer rawHeaders = HeadersSerializer.serialize(headers);

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {

ByteUtils.writeVarint(rawHeaders.length, out);
out.write(rawHeaders);
out.write(rawAggregation);

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to serialize AggregationWithHeaders on topic: " + topic, e);
}
// Format: [headersSize(varint)][headersBytes][value]
return Utils.prepareByteBufferWithSizePrefix(rawHeaders.limit(), rawHeaders.limit() + rawAggregation.length)
.put(rawHeaders)
.put(rawAggregation)
.array();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.ByteUtils;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
Expand All @@ -49,49 +46,65 @@
class HeadersSerializer {

/**
* Serializes headers into a byte array using varint encoding per KIP-1271.
* Serializes headers into a ByteBuffer using varint encoding per KIP-1271.
* <p>
* The output format is [count][header1][header2]... without a size prefix.
* The size prefix is added by the outer serializer that uses this.
* <p>
* For null or empty headers, returns an empty byte array (0 bytes)
* instead of encoding headerCount=0 (1 byte).
* <p>
* The returned ByteBuffer may have larger capacity than actual payload size.
* It's position will be set to zero, and its limit marks the payload end.
*
* @param headers the headers to serialize (can be null)
* @return the serialized byte array (empty array if headers are null or empty)
*/
public static byte[] serialize(final Headers headers) {
public static ByteBuffer serialize(final Headers headers) {
final Header[] headersArray = (headers == null) ? new Header[0] : headers.toArray();

if (headersArray.length == 0) {
return new byte[0];
return ByteBuffer.allocate(0);
}

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
// we first estimate an upper bound for the buffer we need,
// so we can allocate the whole buffer at once

ByteUtils.writeVarint(headersArray.length, out);
// start with 5 bytes for varint encoding of header count
int estimatedBufferSize = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use sizeOfVarint to get exact size?

int exactBufferSize = ByteUtils.sizeOfVarint(headersArray.length);
    
    for (final Header header : headersArray) {
        final String headerKey = header.key();
        
        int headerKeySize = Utils.utf8Length(headerKey);
        exactBufferSize += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize;

        final byte[] value = header.value();
        if (value == null) {
            exactBufferSize += ByteUtils.sizeOfVarint(-1);
        } else {
            exactBufferSize += ByteUtils.sizeOfVarint(value.length) + value.length;
        }
    }

for (final Header header : headersArray) {
// adding 5 bytes for varint encoding of header-key length
estimatedBufferSize += 5 + header.key().length();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use key.getBytes(StandardCharsets.UTF_8).length.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to reuse the byte array from key.getBytes(StandardCharsets.UTF_8)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it not be the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. UTF-16 vs UTF-8...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, The serialized format writes UTF-8 bytes, while String.length() counts UTF-16 code units.


for (final Header header : headersArray) {
final byte[] keyBytes = header.key().getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = header.value();
final byte[] value = header.value();
if (value == null) {
++estimatedBufferSize;
} else {
// adding 5 bytes for varint encoding of header-value length
estimatedBufferSize += 5 + value.length;
}
}

ByteUtils.writeVarint(keyBytes.length, out);
out.write(keyBytes);
final ByteBuffer result = ByteBuffer.allocate(estimatedBufferSize);
ByteUtils.writeVarint(headersArray.length, result);

// Write value length and value bytes (varint + raw bytes)
// null is represented as -1, encoded as varint
if (valueBytes == null) {
ByteUtils.writeVarint(-1, out);
} else {
ByteUtils.writeVarint(valueBytes.length, out);
out.write(valueBytes);
}
}
for (final Header header : headersArray) {
final String headerKey = header.key();
ByteUtils.writeVarint(headerKey.length(), result);
result.put(headerKey.getBytes(StandardCharsets.UTF_8));

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to serialize headers", e);
final byte[] headerValue = header.value();
if (headerValue != null) {
ByteUtils.writeVarint(headerValue.length, result);
result.put(headerValue);
} else {
result.put((byte) 0x01); // hardcoded varint encoding for `-1`
}
}

result.limit(result.position());
result.position(0);

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.utils.ByteUtils;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public final class RecordConverters {
Expand All @@ -33,7 +27,7 @@ public final class RecordConverters {
private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record -> {
final byte[] rawValue = record.value();
final long timestamp = record.timestamp();
final byte[] recordValue = rawValue == null ? null :
final byte[] recordValueWithTimestamp = rawValue == null ? null :
ByteBuffer.allocate(8 + rawValue.length)
.putLong(timestamp)
.put(rawValue)
Expand All @@ -45,9 +39,9 @@ public final class RecordConverters {
timestamp,
record.timestampType(),
record.serializedKeySize(),
record.serializedValueSize(),
recordValueWithTimestamp != null ? recordValueWithTimestamp.length : 0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stumbled over this by chance -- it's a long standing minor bug... Fixing on the side.

record.key(),
recordValue,
recordValueWithTimestamp,
record.headers(),
record.leaderEpoch()
);
Expand All @@ -57,7 +51,7 @@ public final class RecordConverters {
final byte[] rawValue = record.value();

// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
final byte[] recordValue = reconstructFromRaw(
final byte[] recordValueWithTimestampAndHeaders = reconstructFromRaw(
rawValue,
record.timestamp(),
record.headers()
Expand All @@ -70,9 +64,9 @@ public final class RecordConverters {
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
record.serializedValueSize(),
recordValueWithTimestampAndHeaders != null ? recordValueWithTimestampAndHeaders.length : 0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same...

record.key(),
recordValue,
recordValueWithTimestampAndHeaders,
record.headers(),
record.leaderEpoch()
);
Expand All @@ -86,7 +80,7 @@ public static RecordConverter rawValueToHeadersValue() {
final byte[] rawValue = record.value();

// Format: [headersSize(varint)][headersBytes][aggregation] (no timestamp)
final byte[] recordValue = reconstructSessionFromRaw(
final byte[] recordValueWithHeaders = reconstructSessionFromRaw(
rawValue,
record.headers()
);
Expand All @@ -98,9 +92,9 @@ public static RecordConverter rawValueToHeadersValue() {
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
record.serializedValueSize(),
recordValueWithHeaders != null ? recordValueWithHeaders.length : 0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And one more

record.key(),
recordValue,
recordValueWithHeaders,
record.headers(),
record.leaderEpoch()
);
Expand Down Expand Up @@ -133,19 +127,12 @@ static byte[] reconstructSessionFromRaw(final byte[] rawValue, final Headers hea
if (rawValue == null) {
return null;
}
final byte[] rawHeaders = HeadersSerializer.serialize(headers);

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {

ByteUtils.writeVarint(rawHeaders.length, out);
out.write(rawHeaders);
out.write(rawValue);

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to reconstruct AggregationWithHeaders", e);
}
final ByteBuffer rawHeaders = HeadersSerializer.serialize(headers);
return Utils.prepareByteBufferWithSizePrefix(rawHeaders.limit(), rawHeaders.limit() + rawValue.length)
.put(rawHeaders)
.put(rawValue)
.array();
}

/**
Expand All @@ -161,23 +148,12 @@ static byte[] reconstructFromRaw(final byte[] rawValue, final long timestamp, fi
if (rawValue == null) {
return null;
}
final byte[] rawTimestamp;
try (LongSerializer timestampSerializer = new LongSerializer()) {
rawTimestamp = timestampSerializer.serialize("", timestamp);
}
final byte[] rawHeaders = HeadersSerializer.serialize(headers);

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {

ByteUtils.writeVarint(rawHeaders.length, out);
out.write(rawHeaders);
out.write(rawTimestamp);
out.write(rawValue);

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to reconstruct ValueTimestampHeaders", e);
}
final ByteBuffer rawHeaders = HeadersSerializer.serialize(headers);
return Utils.prepareByteBufferWithSizePrefix(rawHeaders.limit(), rawHeaders.limit() + 8 + rawValue.length)
.put(rawHeaders)
.putLong(timestamp)
.put(rawValue)
.array();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@
import java.nio.ByteBuffer;

public class Utils {
/**
* Create a new ByteBuffer to store {@code bufferSize} bytes, after a var-length encoded prefix.
*/
static ByteBuffer prepareByteBufferWithSizePrefix(final int prefix, final int bufferSize) {
final ByteBuffer varLengthBuffer = ByteBuffer.allocate(5); // 5 bytes for max varint encoding
ByteUtils.writeVarint(prefix, varLengthBuffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you leverage ByteUtils.sizeOfVarint to avoid creating the temporary buffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I missed that we have sizeOfVarint -- I was actually considering to maybe add such a helper -- this simplifies things quite a bit.


// allocate buffer with _exact_ size
final ByteBuffer buffer = ByteBuffer.allocate(varLengthBuffer.position() + bufferSize);

varLengthBuffer.limit(varLengthBuffer.position());
varLengthBuffer.position(0);
buffer.put(varLengthBuffer);

return buffer;
}

/**
* Extract raw plain value from serialized ValueTimestampHeaders.
* This strips both the headers and timestamp portions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,13 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.ValueTimestampHeaders;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

Expand All @@ -51,18 +46,15 @@
*/
class ValueTimestampHeadersSerializer<V> implements WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
public final Serializer<V> valueSerializer;
private final LongSerializer timestampSerializer;

ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
Objects.requireNonNull(valueSerializer);
this.valueSerializer = valueSerializer;
this.timestampSerializer = new LongSerializer();
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
valueSerializer.configure(configs, isKey);
timestampSerializer.configure(configs, isKey);
}

@Override
Expand All @@ -89,30 +81,20 @@ private byte[] serialize(final String topic, final V plainValue, final long time
return null;
}

final byte[] rawTimestamp = timestampSerializer.serialize(topic, timestamp);

// empty (byte[0]) for null/empty headers, or [count][header1][header2]... for non-empty
final byte[] rawHeaders = HeadersSerializer.serialize(headers);
final ByteBuffer rawHeaders = HeadersSerializer.serialize(headers);

// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {

ByteUtils.writeVarint(rawHeaders.length, out); // headersSize (it may be 0 due to null/empty headers)
out.write(rawHeaders); // empty (byte[0]) for null/empty headers, or [count][header1][header2]... for non-empty
out.write(rawTimestamp); // [timestamp(8)]
out.write(rawValue); // [value]

return baos.toByteArray();
} catch (final IOException e) {
throw new SerializationException("Failed to serialize ValueTimestampHeaders", e);
}
return Utils.prepareByteBufferWithSizePrefix(rawHeaders.limit(), rawHeaders.limit() + 8 + rawValue.length)
.put(rawHeaders)
.putLong(timestamp)
.put(rawValue)
.array();
}

@Override
public void close() {
valueSerializer.close();
timestampSerializer.close();
}

@Override
Expand Down
Loading
Loading