From c0762b772d0183188c21ac3f1815683aed0e4867 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sun, 15 Mar 2026 19:00:07 -0700 Subject: [PATCH] MINOR: code cleanup Remove unnecessary overrides and unused code. --- .../MeteredTimestampedKeyValueStore.java | 16 -------- .../MeteredTimestampedKeyValueStoreTest.java | 15 ------- .../test/InternalMockProcessorContext.java | 40 ------------------- 3 files changed, 71 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 4a4533cd804cc..bbc7214a84230 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -98,22 +98,6 @@ protected Serde> prepareValueSerdeForStore(final Serde getWithBinary(final K key) { - try { - return maybeMeasureLatency( - () -> { - final byte[] rawValue = wrapped().get(serializeKey(key)); - return new RawAndDeserializedValue<>(rawValue, deserializeValue(rawValue)); - }, - time, - getSensor - ); - } catch (final ProcessorStateException e) { - final String message = String.format(e.getMessage(), key); - throw new ProcessorStateException(message, e); - } - } - public boolean putIfDifferentValues( final K key, final ValueAndTimestamp newValue, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 7583d452d33ec..602be7a9e6f60 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -40,7 +40,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; -import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue; import org.apache.kafka.test.KeyValueIteratorStub; import org.junit.jupiter.api.Test; @@ -61,8 +60,6 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -233,18 +230,6 @@ public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { assertTrue((Double) metric.metricValue() > 0); } - @Test - public void shouldGetWithBinary() { - setUp(); - when(inner.get(KEY_BYTES)).thenReturn(VALUE_AND_TIMESTAMP_BYTES); - - init(); - - final RawAndDeserializedValue valueWithBinary = metered.getWithBinary(KEY); - assertEquals(VALUE_AND_TIMESTAMP, valueWithBinary.value); - assertArrayEquals(VALUE_AND_TIMESTAMP_BYTES, valueWithBinary.rawValue); - } - @Test public void shouldNotPutIfSameValuesAndGreaterTimestamp() { setUp(); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 2fa2cee1f6c59..0f4012f779ad5 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -397,14 +397,6 @@ public void setTime(final long timestamp) { this.timestamp = timestamp; } - @Override - public long timestamp() { - if (recordContext == null) { - return timestamp; - } - return recordContext.timestamp(); - } - @Override public long currentSystemTimeMs() { return time.milliseconds(); @@ -415,38 +407,6 @@ public long currentStreamTimeMs() { throw new UnsupportedOperationException("this method is not supported in InternalMockProcessorContext"); } - @Override - public String topic() { - if (recordContext == null) { - return null; - } - return recordContext.topic(); - } - - @Override - public int partition() { - if (recordContext == null) { - return -1; - } - return recordContext.partition(); - } - - @Override - public long offset() { - if (recordContext == null) { - return -1L; - } - return recordContext.offset(); - } - - @Override - public Headers headers() { - if (recordContext == null) { - return new RecordHeaders(); - } - return recordContext.headers(); - } - @Override public TaskType taskType() { return taskType;