diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 154517d3b94f4..633e04865eb47 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -35,7 +35,6 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; import org.apache.kafka.test.TestUtils; @@ -44,6 +43,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -53,6 +53,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -62,6 +63,10 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasEntry; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -72,8 +77,9 @@ public class ChangeLoggingKeyValueBytesStoreTest { private final MockRecordCollector collector = new MockRecordCollector(); - private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv"); - private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner); + @Mock + private InMemoryKeyValueStore innerMock; + private ChangeLoggingKeyValueBytesStore store; private InternalMockProcessorContext context; private final StreamsConfig streamsConfig = streamsConfigMock(); private final Bytes hi = Bytes.wrap("hi".getBytes()); @@ -89,9 +95,86 @@ public class ChangeLoggingKeyValueBytesStoreTest { public void before() { context = mockContext(); context.setTime(0); + store = new ChangeLoggingKeyValueBytesStore(innerMock); store.init(context, store); } + private void mockPosition() { + when(innerMock.getPosition()).thenReturn(Position.emptyPosition()); + } + + private void mockGet(final Map mockMap) { + when(innerMock.get(any(Bytes.class))).thenAnswer(invocation -> mockMap.get(invocation.getArgument(0))); + } + + private void mockPut(final Map mockMap) { + doAnswer(invocation -> { + mockMap.put(invocation.getArgument(0), invocation.getArgument(1)); + StoreQueryUtils.updatePosition(innerMock.getPosition(), context); + return null; + }).when(innerMock).put(any(Bytes.class), any(byte[].class)); + } + + private void mockPutAll(final Map mockMap) { + doAnswer(invocation -> { + final List> entries = invocation.getArgument(0); + for (final KeyValue entry : entries) { + mockMap.put(entry.key, entry.value); + } + return null; + }).when(innerMock).putAll(anyList()); + } + private void mockDelete(final Map mockMap) { + doAnswer(invocation -> { + final Bytes key = invocation.getArgument(0); + final byte[] oldValue = mockMap.get(key); + mockMap.remove(key); + return oldValue; + }).when(innerMock).delete(any(Bytes.class)); + } + private void mockPutIfAbsent(final Map mockMap) { + doAnswer(invocation -> { + final Bytes key = invocation.getArgument(0); + final byte[] value = invocation.getArgument(1); + return mockMap.putIfAbsent(key, value); + }).when(innerMock).putIfAbsent(any(Bytes.class), any(byte[].class)); + } + private void mockPrefixScan(final Map mockMap) { + when(innerMock.prefixScan(anyString(), any())).thenAnswer(invocation -> { + final String prefix = invocation.getArgument(0); + final List> matchingRecords = new ArrayList<>(); + for (final Map.Entry entry : mockMap.entrySet()) { + if (entry.getKey().toString().startsWith(prefix)) { + matchingRecords.add(KeyValue.pair(entry.getKey(), entry.getValue())); + } + } + return new KeyValueIterator() { + private final Iterator> iterator = matchingRecords.iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public KeyValue next() { + return iterator.next(); + } + + @Override + public void close() { + // No resources to clean up in this mock + } + + @Override + public Bytes peekNextKey() { + return null; + } + }; + }); + } + + private InternalMockProcessorContext mockContext() { return new InternalMockProcessorContext<>( TestUtils.tempDirectory(), @@ -112,17 +195,21 @@ public void after() { @Test public void shouldDelegateInit() { - final InternalMockProcessorContext context = mockContext(); - final KeyValueStore innerMock = mock(InMemoryKeyValueStore.class); + final InternalMockProcessorContext mockContext = mockContext(); final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock); - outer.init(context, outer); - verify(innerMock).init(context, outer); + outer.init(mockContext, outer); + verify(innerMock).init(mockContext, outer); } @Test public void shouldWriteKeyValueBytesToInnerStoreOnPut() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockGet(mockMap); + mockPosition(); + store.put(hi, there); - assertThat(inner.get(hi), equalTo(there)); + assertThat(innerMock.get(hi), equalTo(there)); assertThat(collector.collected().size(), equalTo(1)); assertThat(collector.collected().get(0).key(), equalTo(hi)); assertThat(collector.collected().get(0).value(), equalTo(there)); @@ -130,10 +217,15 @@ public void shouldWriteKeyValueBytesToInnerStoreOnPut() { @Test public void shouldWriteAllKeyValueToInnerStoreOnPutAll() { + final Map mockMap = new HashMap<>(); + mockPutAll(mockMap); + mockGet(mockMap); + mockPosition(); + store.putAll(Arrays.asList(KeyValue.pair(hi, there), KeyValue.pair(hello, world))); - assertThat(inner.get(hi), equalTo(there)); - assertThat(inner.get(hello), equalTo(world)); + assertThat(innerMock.get(hi), equalTo(there)); + assertThat(innerMock.get(hello), equalTo(world)); assertThat(collector.collected().size(), equalTo(2)); assertThat(collector.collected().get(0).key(), equalTo(hi)); @@ -144,20 +236,37 @@ public void shouldWriteAllKeyValueToInnerStoreOnPutAll() { @Test public void shouldPropagateDelete() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockGet(mockMap); + mockDelete(mockMap); + mockPosition(); + store.put(hi, there); store.delete(hi); - assertThat(inner.approximateNumEntries(), equalTo(0L)); - assertThat(inner.get(hi), nullValue()); + + assertThat(innerMock.approximateNumEntries(), equalTo(0L)); + assertThat(innerMock.get(hi), nullValue()); } @Test public void shouldReturnOldValueOnDelete() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockDelete(mockMap); + mockPosition(); + store.put(hi, there); assertThat(store.delete(hi), equalTo(there)); } @Test public void shouldLogKeyNullOnDelete() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockDelete(mockMap); + mockPosition(); + store.put(hi, there); assertThat(store.delete(hi), equalTo(there)); @@ -170,19 +279,34 @@ public void shouldLogKeyNullOnDelete() { @Test public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() { + final Map mockMap = new HashMap<>(); + mockPutIfAbsent(mockMap); + mockGet(mockMap); + mockPosition(); + store.putIfAbsent(hi, there); - assertThat(inner.get(hi), equalTo(there)); + assertThat(innerMock.get(hi), equalTo(there)); } @Test public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockPutIfAbsent(mockMap); + mockGet(mockMap); + mockPosition(); + store.put(hi, there); store.putIfAbsent(hi, world); - assertThat(inner.get(hi), equalTo(there)); + assertThat(innerMock.get(hi), equalTo(there)); } @Test public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() { + final Map mockMap = new HashMap<>(); + mockPutIfAbsent(mockMap); + mockPosition(); + store.putIfAbsent(hi, there); assertThat(collector.collected().size(), equalTo(1)); @@ -192,6 +316,11 @@ public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() { @Test public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockPutIfAbsent(mockMap); + mockPosition(); + store.put(hi, there); store.putIfAbsent(hi, world); @@ -202,23 +331,42 @@ public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() { @Test public void shouldReturnCurrentValueOnPutIfAbsent() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockPutIfAbsent(mockMap); + mockPosition(); + store.put(hi, there); assertThat(store.putIfAbsent(hi, world), equalTo(there)); } @Test public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() { + final Map mockMap = new HashMap<>(); + mockPutIfAbsent(mockMap); + mockPosition(); + assertThat(store.putIfAbsent(hi, there), is(nullValue())); } @Test public void shouldReturnValueOnGetWhenExists() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockGet(mockMap); + mockPosition(); + store.put(hello, world); assertThat(store.get(hello), equalTo(world)); } @Test public void shouldGetRecordsWithPrefixKey() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockPrefixScan(mockMap); + mockPosition(); + store.put(hi, there); store.put(Bytes.increment(hi), world); @@ -242,11 +390,18 @@ public void shouldGetRecordsWithPrefixKey() { @Test public void shouldReturnNullOnGetWhenDoesntExist() { + final Map mockMap = new HashMap<>(); + mockGet(mockMap); + assertThat(store.get(hello), is(nullValue())); } @Test public void shouldLogPositionOnPut() { + final Map mockMap = new HashMap<>(); + mockPut(mockMap); + mockPosition(); + context.setRecordContext(new ProcessorRecordContext(-1, INPUT_OFFSET, INPUT_PARTITION, INPUT_TOPIC_NAME, new RecordHeaders())); context.setTime(1L); store.put(hi, there); @@ -264,13 +419,13 @@ public void shouldLogPositionOnPut() { } private StreamsConfig streamsConfigMock() { - final StreamsConfig streamsConfig = mock(StreamsConfig.class); + final StreamsConfig mockedStreamsConfig = mock(StreamsConfig.class); final Map myValues = new HashMap<>(); myValues.put(InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, true); - when(streamsConfig.originals()).thenReturn(myValues); - when(streamsConfig.values()).thenReturn(Collections.emptyMap()); - when(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id"); - return streamsConfig; + when(mockedStreamsConfig.originals()).thenReturn(myValues); + when(mockedStreamsConfig.values()).thenReturn(Collections.emptyMap()); + when(mockedStreamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id"); + return mockedStreamsConfig; } }