Skip to content

Commit 4783f6a

Browse files
authored
KAFKA-20120: Create HeadersSerializer and HeadersDeserializer (apache#21401)
This PR adds `HeadersSerializer` and `HeadersDeserializer` as infrastructure of KIP-1271. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
1 parent 9d5bbf5 commit 4783f6a

4 files changed

Lines changed: 463 additions & 0 deletions

File tree

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.header.Headers;
20+
import org.apache.kafka.common.header.internals.RecordHeaders;
21+
import org.apache.kafka.common.serialization.Deserializer;
22+
import org.apache.kafka.common.utils.ByteUtils;
23+
24+
import java.nio.ByteBuffer;
25+
import java.nio.charset.StandardCharsets;
26+
27+
/**
28+
* Deserializer for Kafka Headers.
29+
*
30+
* Deserialization format (per KIP-1271):
31+
* [NumHeaders(varint)][Header1][Header2]...
32+
*
33+
* Each header:
34+
* [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
35+
*
36+
* Note: ValueLength is -1 for null values (encoded as varint).
37+
* All integers are decoded from varints (signed varint encoding).
38+
*
39+
* This deserializer expects the headersBytes portion without a size prefix.
40+
* The size prefix is handled by the outer deserializer (e.g., ValueTimestampHeadersDeserializer).
41+
*
42+
* This is used by KIP-1271 to deserialize headers from state stores.
43+
*/
44+
public class HeadersDeserializer implements Deserializer<Headers> {
45+
46+
/**
47+
* Deserializes headers from a byte array using varint encoding per KIP-1271.
48+
* <p>
49+
* The input format is [count][header1][header2]... without a size prefix.
50+
*
51+
* @param topic topic associated with the data
52+
* @param data the serialized byte array (can be null)
53+
* @return the deserialized headers
54+
*/
55+
public Headers deserialize(final String topic, final byte[] data) {
56+
if (data == null || data.length == 0) {
57+
return new RecordHeaders();
58+
}
59+
60+
final ByteBuffer buffer = ByteBuffer.wrap(data);
61+
final int headersCount = ByteUtils.readVarint(buffer);
62+
63+
if (headersCount == 0) {
64+
return new RecordHeaders();
65+
}
66+
67+
final RecordHeaders headers = new RecordHeaders();
68+
69+
for (int i = 0; i < headersCount; i++) {
70+
final int keyLength = ByteUtils.readVarint(buffer);
71+
final byte[] keyBytes = new byte[keyLength];
72+
buffer.get(keyBytes);
73+
final String key = new String(keyBytes, StandardCharsets.UTF_8);
74+
75+
final int valueLength = ByteUtils.readVarint(buffer);
76+
final byte[] value;
77+
if (valueLength == -1) {
78+
value = null;
79+
} else {
80+
value = new byte[valueLength];
81+
buffer.get(value);
82+
}
83+
84+
headers.add(key, value);
85+
}
86+
87+
return headers;
88+
}
89+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.header.Header;
20+
import org.apache.kafka.common.header.Headers;
21+
import org.apache.kafka.common.serialization.Serializer;
22+
import org.apache.kafka.common.utils.ByteUtils;
23+
24+
import java.io.ByteArrayOutputStream;
25+
import java.io.DataOutputStream;
26+
import java.io.IOException;
27+
import java.nio.charset.StandardCharsets;
28+
29+
/**
30+
* Serializer for Kafka Headers.
31+
* <p>
32+
* Serialization format (per KIP-1271):
33+
* [NumHeaders(varint)][Header1][Header2]...
34+
* <p>
35+
* Each header:
36+
* [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
37+
* <p>
38+
* Note: ValueLength is -1 for null values (encoded as varint).
39+
* All integers are encoded as varints (signed varint encoding).
40+
* <p>
41+
* This serializer produces the headersBytes portion. The headersSize prefix
42+
* is added by the outer serializer (e.g., ValueTimestampHeadersSerializer).
43+
* <p>
44+
* This is used by KIP-1271 to serialize headers for storage in state stores.
45+
*/
46+
public class HeadersSerializer implements Serializer<Headers> {
47+
48+
/**
49+
* Serializes headers into a byte array using varint encoding per KIP-1271.
50+
* <p>
51+
* The output format is [count][header1][header2]... without a size prefix.
52+
* The size prefix is added by the outer serializer that uses this.
53+
*
54+
* @param topic topic associated with data
55+
* @param headers the headers to serialize (can be null)
56+
* @return the serialized byte array
57+
*/
58+
@Override
59+
public byte[] serialize(final String topic, final Headers headers) {
60+
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
61+
final DataOutputStream out = new DataOutputStream(baos)) {
62+
63+
final Header[] headerArray = (headers == null) ? new Header[0] : headers.toArray();
64+
ByteUtils.writeVarint(headerArray.length, out);
65+
66+
for (final Header header : headerArray) {
67+
final byte[] keyBytes = header.key().getBytes(StandardCharsets.UTF_8);
68+
final byte[] valueBytes = header.value();
69+
70+
ByteUtils.writeVarint(keyBytes.length, out);
71+
out.write(keyBytes);
72+
73+
// Write value length and value bytes (varint + raw bytes)
74+
// null is represented as -1, encoded as varint
75+
if (valueBytes == null) {
76+
ByteUtils.writeVarint(-1, out);
77+
} else {
78+
ByteUtils.writeVarint(valueBytes.length, out);
79+
out.write(valueBytes);
80+
}
81+
}
82+
83+
return baos.toByteArray();
84+
} catch (IOException e) {
85+
throw new RuntimeException("Failed to serialize headers", e);
86+
}
87+
}
88+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.header.Header;
20+
import org.apache.kafka.common.header.Headers;
21+
import org.apache.kafka.common.header.internals.RecordHeaders;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertNotNull;
28+
import static org.junit.jupiter.api.Assertions.assertNull;
29+
30+
public class HeadersDeserializerTest {
31+
32+
private final HeadersSerializer serializer = new HeadersSerializer();
33+
private final HeadersDeserializer deserializer = new HeadersDeserializer();
34+
35+
@Test
36+
public void shouldDeserializeNullData() {
37+
final Headers headers = deserializer.deserialize("", null);
38+
39+
assertNotNull(headers);
40+
assertEquals(0, headers.toArray().length);
41+
}
42+
43+
@Test
44+
public void shouldDeserializeEmptyData() {
45+
final Headers headers = deserializer.deserialize("", new byte[0]);
46+
47+
assertNotNull(headers);
48+
assertEquals(0, headers.toArray().length);
49+
}
50+
51+
@Test
52+
public void shouldRoundTripEmptyHeaders() {
53+
final Headers original = new RecordHeaders();
54+
final byte[] serialized = serializer.serialize("", original);
55+
final Headers deserialized = deserializer.deserialize("", serialized);
56+
57+
assertNotNull(deserialized);
58+
assertEquals(0, deserialized.toArray().length);
59+
}
60+
61+
@Test
62+
public void shouldRoundTripSingleHeader() {
63+
final Headers original = new RecordHeaders()
64+
.add("key1", "value1".getBytes());
65+
final byte[] serialized = serializer.serialize("", original);
66+
final Headers deserialized = deserializer.deserialize("", serialized);
67+
68+
assertNotNull(deserialized);
69+
assertEquals(1, deserialized.toArray().length);
70+
71+
final Header header = deserialized.lastHeader("key1");
72+
assertNotNull(header);
73+
assertEquals("key1", header.key());
74+
assertArrayEquals("value1".getBytes(), header.value());
75+
}
76+
77+
@Test
78+
public void shouldRoundTripMultipleHeaders() {
79+
final Headers original = new RecordHeaders()
80+
.add("key0", "value0".getBytes())
81+
.add("key1", "value1".getBytes())
82+
.add("key2", "value2".getBytes());
83+
final byte[] serialized = serializer.serialize("", original);
84+
final Headers deserialized = deserializer.deserialize("", serialized);
85+
assertNotNull(deserialized);
86+
87+
final Header[] headerArray = deserialized.toArray();
88+
assertEquals(3, headerArray.length);
89+
for (int i = 0; i < headerArray.length; i++) {
90+
Header next = headerArray[i];
91+
assertEquals("key" + i, next.key());
92+
assertArrayEquals(("value" + i).getBytes(), next.value());
93+
}
94+
}
95+
96+
@Test
97+
public void shouldRoundTripHeaderWithNullValue() {
98+
final Headers original = new RecordHeaders()
99+
.add("key1", null);
100+
final byte[] serialized = serializer.serialize("", original);
101+
final Headers deserialized = deserializer.deserialize("", serialized);
102+
103+
assertNotNull(deserialized);
104+
assertEquals(1, deserialized.toArray().length);
105+
106+
final Header header = deserialized.lastHeader("key1");
107+
assertNotNull(header);
108+
assertEquals("key1", header.key());
109+
assertNull(header.value());
110+
}
111+
112+
@Test
113+
public void shouldRoundTripHeaderWithEmptyValue() {
114+
final Headers original = new RecordHeaders()
115+
.add("key1", new byte[0]);
116+
final byte[] serialized = serializer.serialize("", original);
117+
final Headers deserialized = deserializer.deserialize("", serialized);
118+
119+
assertNotNull(deserialized);
120+
assertEquals(1, deserialized.toArray().length);
121+
122+
final Header header = deserialized.lastHeader("key1");
123+
assertNotNull(header);
124+
assertEquals("key1", header.key());
125+
assertArrayEquals(new byte[0], header.value());
126+
}
127+
}

0 commit comments

Comments
 (0)