Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ByteUtils;

import java.nio.ByteBuffer;
Expand All @@ -41,18 +40,17 @@
*
* This is used by KIP-1271 to deserialize headers from state stores.
*/
public class HeadersDeserializer implements Deserializer<Headers> {
public class HeadersDeserializer {

/**
* Deserializes headers from a byte array using varint encoding per KIP-1271.
* <p>
* The input format is [count][header1][header2]... without a size prefix.
*
* @param topic topic associated with the data
* @param data the serialized byte array (can be null)
* @return the deserialized headers
*/
public Headers deserialize(final String topic, final byte[] data) {
public static Headers deserialize(final byte[] data) {
if (data == null || data.length == 0) {
return new RecordHeaders();
}
Expand Down Expand Up @@ -86,10 +84,4 @@ public Headers deserialize(final String topic, final byte[] data) {

return headers;
}

public static Headers deserialize(final byte[] data) {
try (HeadersDeserializer deserializer = new HeadersDeserializer()) {
return deserializer.deserialize("", data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -47,7 +46,7 @@
* <p>
* This is used by KIP-1271 to serialize headers for storage in state stores.
*/
public class HeadersSerializer implements Serializer<Headers> {
public class HeadersSerializer {

/**
* Serializes headers into a byte array using varint encoding per KIP-1271.
Expand All @@ -58,12 +57,10 @@ public class HeadersSerializer implements Serializer<Headers> {
* For null or empty headers, returns an empty byte array (0 bytes)
* instead of encoding headerCount=0 (1 byte).
*
* @param topic topic associated with data
* @param headers the headers to serialize (can be null)
* @return the serialized byte array (empty array if headers are null or empty)
*/
@Override
public byte[] serialize(final String topic, final Headers headers) {
public static byte[] serialize(final Headers headers) {
final Header[] headersArray = (headers == null) ? new Header[0] : headers.toArray();

if (headersArray.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,20 @@
*/
class ValueTimestampHeadersDeserializer<V> implements WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
private static final HeadersDeserializer HEADERS_DESERIALIZER = new HeadersDeserializer();

public final Deserializer<V> valueDeserializer;
private final LongDeserializer timestampDeserializer;
private final HeadersDeserializer headersDeserializer;

ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) {
Objects.requireNonNull(valueDeserializer);
this.valueDeserializer = valueDeserializer;
this.timestampDeserializer = new LongDeserializer();
this.headersDeserializer = new HeadersDeserializer();
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
valueDeserializer.configure(configs, isKey);
timestampDeserializer.configure(configs, isKey);
headersDeserializer.configure(configs, isKey);
}

@Override
Expand All @@ -79,7 +75,7 @@ public ValueTimestampHeaders<V> deserialize(final String topic, final byte[] val
final int headersSize = ByteUtils.readVarint(buffer);

final byte[] rawHeaders = readBytes(buffer, headersSize);
final Headers headers = headersDeserializer.deserialize(topic, rawHeaders);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. Reviewing other PRs, I was all the time thinking about the topic that we dont need it at all.

final Headers headers = HeadersDeserializer.deserialize(rawHeaders);
final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp);
final byte[] rawValue = readBytes(buffer, buffer.remaining());
Expand All @@ -92,7 +88,6 @@ public ValueTimestampHeaders<V> deserialize(final String topic, final byte[] val
public void close() {
valueDeserializer.close();
timestampDeserializer.close();
headersDeserializer.close();
}

@Override
Expand Down Expand Up @@ -162,6 +157,6 @@ static Headers headers(final byte[] rawValueTimestampHeaders) {
final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
final int headersSize = ByteUtils.readVarint(buffer);
final byte[] rawHeaders = readBytes(buffer, headersSize);
return HEADERS_DESERIALIZER.deserialize("", rawHeaders);
return HeadersDeserializer.deserialize(rawHeaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,17 @@
public class ValueTimestampHeadersSerializer<V> implements WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this class public while ValueTimestampHeadersDeserializer is package-private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea -- but I think it can be package private. Actually same for HeadersSerializer and HeadersDeserializer. Let me make this change.

public final Serializer<V> valueSerializer;
private final LongSerializer timestampSerializer;
private final HeadersSerializer headersSerializer;

ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
Objects.requireNonNull(valueSerializer);
this.valueSerializer = valueSerializer;
this.timestampSerializer = new LongSerializer();
this.headersSerializer = new HeadersSerializer();
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
valueSerializer.configure(configs, isKey);
timestampSerializer.configure(configs, isKey);
headersSerializer.configure(configs, isKey);
}

@Override
Expand Down Expand Up @@ -95,7 +92,7 @@ private byte[] serialize(final String topic, final V plainValue, final long time
final byte[] rawTimestamp = timestampSerializer.serialize(topic, timestamp);

// empty (byte[0]) for null/empty headers, or [count][header1][header2]... for non-empty
final byte[] rawHeaders = headersSerializer.serialize(topic, headers);
final byte[] rawHeaders = HeadersSerializer.serialize(headers);

// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand All @@ -116,7 +113,6 @@ private byte[] serialize(final String topic, final V plainValue, final long time
public void close() {
valueSerializer.close();
timestampSerializer.close();
headersSerializer.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,17 @@

public class HeadersDeserializerTest {

private final HeadersSerializer serializer = new HeadersSerializer();
private final HeadersDeserializer deserializer = new HeadersDeserializer();

@Test
public void shouldDeserializeNullData() {
final Headers headers = deserializer.deserialize("", null);
final Headers headers = HeadersDeserializer.deserialize(null);

assertNotNull(headers);
assertEquals(0, headers.toArray().length);
}

@Test
public void shouldDeserializeEmptyData() {
final Headers headers = deserializer.deserialize("", new byte[0]);
final Headers headers = HeadersDeserializer.deserialize(new byte[0]);

assertNotNull(headers);
assertEquals(0, headers.toArray().length);
Expand All @@ -53,8 +50,8 @@ public void shouldDeserializeEmptyData() {
@Test
public void shouldRoundTripEmptyHeaders() {
final Headers original = new RecordHeaders();
final byte[] serialized = serializer.serialize("", original);
final Headers deserialized = deserializer.deserialize("", serialized);
final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);

assertNotNull(deserialized);
assertEquals(0, deserialized.toArray().length);
Expand All @@ -64,8 +61,8 @@ public void shouldRoundTripEmptyHeaders() {
public void shouldRoundTripSingleHeader() {
final Headers original = new RecordHeaders()
.add("key1", "value1".getBytes());
final byte[] serialized = serializer.serialize("", original);
final Headers deserialized = deserializer.deserialize("", serialized);
final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);

assertNotNull(deserialized);
assertEquals(1, deserialized.toArray().length);
Expand All @@ -82,8 +79,8 @@ public void shouldRoundTripMultipleHeaders() {
.add("key0", "value0".getBytes())
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes());
final byte[] serialized = serializer.serialize("", original);
final Headers deserialized = deserializer.deserialize("", serialized);
final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);

final Header[] headerArray = deserialized.toArray();
Expand All @@ -99,8 +96,8 @@ public void shouldRoundTripMultipleHeaders() {
public void shouldRoundTripHeaderWithNullValue() {
final Headers original = new RecordHeaders()
.add("key1", null);
final byte[] serialized = serializer.serialize("", original);
final Headers deserialized = deserializer.deserialize("", serialized);
final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);

assertNotNull(deserialized);
assertEquals(1, deserialized.toArray().length);
Expand All @@ -115,8 +112,8 @@ public void shouldRoundTripHeaderWithNullValue() {
public void shouldRoundTripHeaderWithEmptyValue() {
final Headers original = new RecordHeaders()
.add("key1", new byte[0]);
final byte[] serialized = serializer.serialize("", original);
final Headers deserialized = deserializer.deserialize("", serialized);
final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);

assertNotNull(deserialized);
assertEquals(1, deserialized.toArray().length);
Expand All @@ -135,8 +132,8 @@ public void shouldAllowDuplicateKeys() {
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes())
.add("key2", "value3".getBytes());
final byte[] serialized = serializer.serialize("", original);
final Headers deserialized = deserializer.deserialize("", serialized);
final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);

final Header[] headerArray = deserialized.toArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@

public class HeadersSerializerTest {

private final HeadersSerializer serializer = new HeadersSerializer();
private final HeadersDeserializer deserializer = new HeadersDeserializer();

@Test
public void shouldSerializeNullHeaders() {
final byte[] serialized = serializer.serialize("", null);
final byte[] serialized = HeadersSerializer.serialize(null);

assertNotNull(serialized);
assertEquals(0, serialized.length, "Null headers should serialize to empty byte array (0 bytes)");
Expand All @@ -44,7 +41,7 @@ public void shouldSerializeNullHeaders() {
@Test
public void shouldSerializeEmptyHeaders() {
final Headers headers = new RecordHeaders();
final byte[] serialized = serializer.serialize("", headers);
final byte[] serialized = HeadersSerializer.serialize(headers);

assertNotNull(serialized);
assertEquals(0, serialized.length, "Empty headers should serialize to empty byte array (0 bytes)");
Expand All @@ -54,12 +51,12 @@ public void shouldSerializeEmptyHeaders() {
public void shouldSerializeSingleHeader() {
final Headers headers = new RecordHeaders()
.add("key1", "value1".getBytes());
final byte[] serialized = serializer.serialize("", headers);
final byte[] serialized = HeadersSerializer.serialize(headers);

assertNotNull(serialized);
assertTrue(serialized.length > 0);

final Headers deserialized = deserializer.deserialize("", serialized);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);
assertEquals(1, deserialized.toArray().length);

Expand All @@ -75,12 +72,12 @@ public void shouldSerializeMultipleHeaders() {
.add("key0", "value0".getBytes())
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes());
final byte[] serialized = serializer.serialize("", headers);
final byte[] serialized = HeadersSerializer.serialize(headers);

assertNotNull(serialized);
assertTrue(serialized.length > 0);

final Headers deserialized = deserializer.deserialize("", serialized);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);
assertEquals(3, deserialized.toArray().length);

Expand All @@ -96,12 +93,12 @@ public void shouldSerializeMultipleHeaders() {
public void shouldSerializeHeaderWithNullValue() {
final Headers headers = new RecordHeaders()
.add("key1", null);
final byte[] serialized = serializer.serialize("", headers);
final byte[] serialized = HeadersSerializer.serialize(headers);

assertNotNull(serialized);
assertTrue(serialized.length > 0);

final Headers deserialized = deserializer.deserialize("", serialized);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);
assertEquals(1, deserialized.toArray().length);

Expand All @@ -115,12 +112,12 @@ public void shouldSerializeHeaderWithNullValue() {
public void shouldSerializeHeadersWithEmptyValue() {
final Headers headers = new RecordHeaders()
.add("key1", new byte[0]);
final byte[] serialized = serializer.serialize("", headers);
final byte[] serialized = HeadersSerializer.serialize(headers);

assertNotNull(serialized);
assertTrue(serialized.length > 0);

final Headers deserialized = deserializer.deserialize("", serialized);
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);
assertEquals(1, deserialized.toArray().length);

Expand Down