Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public interface HeadersBytesStore {
* <p>
* Empty headers are represented as 0 bytes (headersSize=0, no headersBytes),
*
* @param plainValue the legacy timestamped value bytes
* @param valueAndTimestamp the legacy timestamped value bytes
* @return the value in header-embedded format with empty headers
*/
static byte[] convertToHeaderFormat(final byte[] plainValue) {
if (plainValue == null) {
static byte[] convertToHeaderFormat(final byte[] valueAndTimestamp) {
if (valueAndTimestamp == null) {
return null;
}

Expand All @@ -53,9 +53,9 @@ static byte[] convertToHeaderFormat(final byte[] plainValue) {
// headersBytes = [] (empty, 0 bytes)
// Result: [0x00][payload]
return ByteBuffer
.allocate(1 + plainValue.length)
.allocate(1 + valueAndTimestamp.length)
.put((byte) 0x00)
.put(plainValue)
.put(valueAndTimestamp)
.array();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -148,39 +150,75 @@ public String topic() {
*
* @param rawKey the key as raw bytes
* @return the key as typed object
* @deprecated Since 4.3. Use {@link #keyFrom(byte[], Headers)} instead.
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that make sense to replace this deprecated method with the newly introduced one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that we should add @SupressWarning and replace them in follow-up PRs.

public K keyFrom(final byte[] rawKey) {
return keySerde.deserializer().deserialize(topic, rawKey);
return keyFrom(rawKey, new RecordHeaders());
}

/**
* Deserialize the key from raw bytes.
*
* @param rawKey the key as raw bytes
* @return the key as typed object
*/
public K keyFrom(final byte[] rawKey, final Headers headers) {
return keySerde.deserializer().deserialize(topic, headers, rawKey);
}

/**
* Deserialize the value from raw bytes.
*
* @param rawValue the value as raw bytes
* @return the value as typed object
* @deprecated Since 4.3. Use {@link #valueFrom(byte[], Headers)} instead.
*/
@Deprecated
public V valueFrom(final byte[] rawValue) {
return valueSerde.deserializer().deserialize(topic, rawValue);
return valueFrom(rawValue, new RecordHeaders());
}

/**
* Deserialize the value from raw bytes.
*
* @param rawValue the value as raw bytes
* @return the value as typed object
*/
public V valueFrom(final byte[] rawValue, final Headers headers) {
return valueSerde.deserializer().deserialize(topic, headers, rawValue);
}

/**
* Serialize the given key.
*
* @param key the key to be serialized
* @return the serialized key
* @deprecated Since 4.3. Use {@link #rawKey(Object, Headers)} instead.
*/
@Deprecated
public byte[] rawKey(final K key) {
return rawKey(key, new RecordHeaders());
}

/**
* Serialize the given key.
*
* @param key the key to be serialized
* @return the serialized key
*/
public byte[] rawKey(final K key, final Headers headers) {
try {
return keySerde.serializer().serialize(topic, key);
return keySerde.serializer().serialize(topic, headers, key);
} catch (final ClassCastException e) {
final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
throw new StreamsException(
String.format("A serializer (%s) is not compatible to the actual key type " +
"(key type: %s). Change the default Serdes in StreamConfig or " +
"provide correct Serdes via method parameters.",
keySerializer().getClass().getName(),
keyClass),
e);
String.format("A serializer (%s) is not compatible to the actual key type " +
"(key type: %s). Change the default Serdes in StreamConfig or " +
"provide correct Serdes via method parameters.",
keySerializer().getClass().getName(),
keyClass),
e);
}
}

Expand All @@ -189,11 +227,24 @@ public byte[] rawKey(final K key) {
*
* @param value the value to be serialized
* @return the serialized value
* @deprecated Since 4.3. Use {@link #rawValue(Object, Headers)} instead.
*/
@Deprecated
@SuppressWarnings("rawtypes")
public byte[] rawValue(final V value) {
return rawValue(value, new RecordHeaders());
}

/**
* Serialize the given value.
*
* @param value the value to be serialized
* @return the serialized value
*/
@SuppressWarnings("rawtypes")
public byte[] rawValue(final V value, final Headers headers) {
try {
return valueSerde.serializer().serialize(topic, value);
return valueSerde.serializer().serialize(topic, headers, value);
} catch (final ClassCastException e) {
final String valueClass;
final Class<? extends Serializer> serializerClass;
Expand All @@ -205,12 +256,12 @@ public byte[] rawValue(final V value) {
valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
}
throw new StreamsException(
String.format("A serializer (%s) is not compatible to the actual value type " +
"(value type: %s). Change the default Serdes in StreamConfig or " +
"provide correct Serdes via method parameters.",
serializerClass.getName(),
valueClass),
e);
String.format("A serializer (%s) is not compatible to the actual value type " +
"(value type: %s). Change the default Serdes in StreamConfig or " +
"provide correct Serdes via method parameters.",
serializerClass.getName(),
valueClass),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
* @param <K>
* @param <V>
*/
// TODO: replace with new method in follow-up PR of KIP-1271
@SuppressWarnings("deprecation")
public class MeteredKeyValueStore<K, V>
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
implements KeyValueStore<K, V>, MeteredStateStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;

// TODO: replace with new method in follow-up PR of KIP-1271
@SuppressWarnings("deprecation")
public class MeteredSessionStore<K, V>
extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V>
implements SessionStore<K, V>, MeteredStateStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
* @param <K>
* @param <V>
*/
// TODO: replace with new method in follow-up PR of KIP-1271
@SuppressWarnings("deprecation")
public class MeteredTimestampedKeyValueStore<K, V>
extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
implements TimestampedKeyValueStore<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
* @param <K> The key type
* @param <V> The (raw) value type
*/
// TODO: replace with new method in follow-up PR of KIP-1271
@SuppressWarnings("deprecation")
public class MeteredVersionedKeyValueStore<K, V>
extends WrappedStateStore<VersionedBytesStore, K, V>
implements VersionedKeyValueStore<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;

// TODO: replace with new method in follow-up PR of KIP-1271
@SuppressWarnings("deprecation")
public class MeteredWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
implements WindowStore<K, V>, MeteredStateStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;

// TODO: replace with new method in follow-up PR of KIP-1271
@SuppressWarnings("deprecation")
public class PrefixedWindowKeySchemas {

private static final int PREFIX_SIZE = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;

// TODO: replace with new method in follow-up PR of KIP-1271
@SuppressWarnings("deprecation")
public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {

private static final Logger LOG = LoggerFactory.getLogger(WindowKeySchema.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde;
Expand All @@ -27,8 +32,13 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("unchecked")
public class StateSerdesTest {
Expand Down Expand Up @@ -137,4 +147,48 @@ public void shouldThrowIfIncompatibleSerdeForKey() throws ClassNotFoundException
"Change the default Serdes in StreamConfig or provide correct Serdes via method parameters."));
}

@Test
public void shouldSerializeAndDeserializeKeyWithHeaders() {
final Serde<String> spyKeySerde = spy(Serdes.String());
final Serializer<String> spySerializer = spy(spyKeySerde.serializer());
final Deserializer<String> spyDeserializer = spy(spyKeySerde.deserializer());
when(spyKeySerde.serializer()).thenReturn(spySerializer);
when(spyKeySerde.deserializer()).thenReturn(spyDeserializer);

final StateSerdes<String, String> stateSerdes =
new StateSerdes<>("test-topic", spyKeySerde, Serdes.String());

final Headers headers = new RecordHeaders();
final String key = "test-key";
final byte[] rawKey = stateSerdes.rawKey(key, headers);

verify(spySerializer).serialize(eq("test-topic"), eq(headers), eq("test-key"));

final String deserializedKey = stateSerdes.keyFrom(rawKey, headers);
verify(spyDeserializer).deserialize(eq("test-topic"), eq(headers), eq(rawKey));
assertEquals(key, deserializedKey);
}

@Test
public void shouldSerializeAndDeserializeValueWithHeaders() {
final Serde<String> spyValueSerde = spy(Serdes.String());
final Serializer<String> spySerializer = spy(spyValueSerde.serializer());
final Deserializer<String> spyDeserializer = spy(spyValueSerde.deserializer());
when(spyValueSerde.serializer()).thenReturn(spySerializer);
when(spyValueSerde.deserializer()).thenReturn(spyDeserializer);

final StateSerdes<String, String> stateSerdes =
new StateSerdes<>("test-topic", Serdes.String(), spyValueSerde);

final Headers headers = new RecordHeaders()
.add("header-key", "header-value".getBytes());
final String value = "test-value";
final byte[] serialized = stateSerdes.rawValue(value, headers);

verify(spySerializer).serialize(eq("test-topic"), eq(headers), eq("test-value"));

final String deserialized = stateSerdes.valueFrom(serialized, headers);
verify(spyDeserializer).deserialize(eq("test-topic"), eq(headers), eq(serialized));
assertEquals(value, deserialized);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
Expand Down Expand Up @@ -169,11 +170,11 @@ private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic)
final Deserializer<String> valueDeserializer = mock(Deserializer.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
when(keySerde.serializer()).thenReturn(keySerializer);
when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
when(keySerializer.serialize(topic, new RecordHeaders(), KEY)).thenReturn(KEY.getBytes());
when(valueSerde.deserializer()).thenReturn(valueDeserializer);
when(valueDeserializer.deserialize(topic, VALUE_BYTES)).thenReturn(VALUE);
when(valueDeserializer.deserialize(topic, new RecordHeaders(), VALUE_BYTES)).thenReturn(VALUE);
when(valueSerde.serializer()).thenReturn(valueSerializer);
when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES);
when(valueSerializer.serialize(topic, new RecordHeaders(), VALUE)).thenReturn(VALUE_BYTES);
when(inner.get(KEY_BYTES)).thenReturn(VALUE_BYTES);
metered = new MeteredKeyValueStore<>(
inner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
Expand Down Expand Up @@ -173,11 +174,11 @@ private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic)
final Deserializer<String> valueDeserializer = mock(Deserializer.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
when(keySerde.serializer()).thenReturn(keySerializer);
when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
when(keySerializer.serialize(topic, new RecordHeaders(), KEY)).thenReturn(KEY.getBytes());
when(valueSerde.deserializer()).thenReturn(valueDeserializer);
when(valueDeserializer.deserialize(topic, VALUE_BYTES)).thenReturn(VALUE);
when(valueDeserializer.deserialize(topic, new RecordHeaders(), VALUE_BYTES)).thenReturn(VALUE);
when(valueSerde.serializer()).thenReturn(valueSerializer);
when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES);
when(valueSerializer.serialize(topic, new RecordHeaders(), VALUE)).thenReturn(VALUE_BYTES);
when(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, END_TIMESTAMP)).thenReturn(VALUE_BYTES);
store = new MeteredSessionStore<>(
innerStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
Expand Down Expand Up @@ -181,11 +182,11 @@ private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic)
final Deserializer<ValueAndTimestamp<String>> valueDeserializer = mock(Deserializer.class);
final Serializer<ValueAndTimestamp<String>> valueSerializer = mock(Serializer.class);
when(keySerde.serializer()).thenReturn(keySerializer);
when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
when(keySerializer.serialize(topic, new RecordHeaders(), KEY)).thenReturn(KEY.getBytes());
when(valueSerde.deserializer()).thenReturn(valueDeserializer);
when(valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).thenReturn(VALUE_AND_TIMESTAMP);
when(valueDeserializer.deserialize(topic, new RecordHeaders(), VALUE_AND_TIMESTAMP_BYTES)).thenReturn(VALUE_AND_TIMESTAMP);
when(valueSerde.serializer()).thenReturn(valueSerializer);
when(valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
when(valueSerializer.serialize(topic, new RecordHeaders(), VALUE_AND_TIMESTAMP)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
when(inner.get(KEY_BYTES)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
metered = new MeteredTimestampedKeyValueStore<>(
inner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -176,11 +177,11 @@ private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic)
@SuppressWarnings("unchecked")
final Serializer<ValueAndTimestamp<String>> valueSerializer = mock(Serializer.class);
when(keySerde.serializer()).thenReturn(keySerializer);
when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
when(keySerializer.serialize(topic, new RecordHeaders(), KEY)).thenReturn(KEY.getBytes());
when(valueSerde.deserializer()).thenReturn(valueDeserializer);
when(valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).thenReturn(VALUE_AND_TIMESTAMP);
when(valueDeserializer.deserialize(topic, new RecordHeaders(), VALUE_AND_TIMESTAMP_BYTES)).thenReturn(VALUE_AND_TIMESTAMP);
when(valueSerde.serializer()).thenReturn(valueSerializer);
when(valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
when(valueSerializer.serialize(topic, new RecordHeaders(), VALUE_AND_TIMESTAMP)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
when(innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
store = new MeteredTimestampedWindowStore<>(
innerStoreMock,
Expand Down
Loading