Skip to content

KAFKA-10789: Streamlining Tests in ChangeLoggingKeyValueBytesStoreTest #18816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.TestUtils;
Expand All @@ -44,6 +43,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand All @@ -53,6 +53,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -62,6 +63,10 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -72,8 +77,9 @@
public class ChangeLoggingKeyValueBytesStoreTest {

private final MockRecordCollector collector = new MockRecordCollector();
private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv");
private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner);
@Mock
private InMemoryKeyValueStore innerMock;
private ChangeLoggingKeyValueBytesStore store;
private InternalMockProcessorContext<?, ?> context;
private final StreamsConfig streamsConfig = streamsConfigMock();
private final Bytes hi = Bytes.wrap("hi".getBytes());
Expand All @@ -89,9 +95,86 @@ public class ChangeLoggingKeyValueBytesStoreTest {
public void before() {
context = mockContext();
context.setTime(0);
store = new ChangeLoggingKeyValueBytesStore(innerMock);
store.init(context, store);
}

private void mockPosition() {
when(innerMock.getPosition()).thenReturn(Position.emptyPosition());
}

private void mockGet(final Map<Bytes, byte[]> mockMap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using mockMap? It seems unnecessarily complex? -- It seems much more straightforward to just "expect" calls into the innerMock store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out!

We're using mockMap here to simulate the internal state of the underlying store, since the tests in CachingInMemoryKeyValueStoreTest are being streamlined to rely on a mocked underlyingStore as per the KIP-614 review discussion.

Because innerMock is a mock and doesn’t retain state on its own, we use mockMap to mimic how the real store would behave across multiple interactions — especially for verifying behavior like reads after writes. This lets us preserve the semantics of the store while still keeping the actual store mocked, as requested.

That said, I'm open to simplifying it further if there's a cleaner way to preserve the same test coverage and behavior expectations.

https://issues.apache.org/jira/browse/KAFKA-10789

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we want to mock, but we use two different techniques and mix them...

First you use

    @Mock
    private InMemoryKeyValueStore innerMock;

For this case, to me, all call a test make into this mock, should be stubbed via corresponding when(...).thanAnswer(...) code setup and the mock itself is stateless.

If we make the mock stateful, we can just keep using new InMemoryKeyValueStore to begin with, that is just an Map<Bytes, bytes[]> internally, too.

\cc @cadonna who requested to rewrite this using mocks to clarify, in cases I misunderstand the purpose of the ticket.

when(innerMock.get(any(Bytes.class))).thenAnswer(invocation -> mockMap.get(invocation.getArgument(0)));
}

private void mockPut(final Map<Bytes, byte[]> mockMap) {
doAnswer(invocation -> {
mockMap.put(invocation.getArgument(0), invocation.getArgument(1));
StoreQueryUtils.updatePosition(innerMock.getPosition(), context);
return null;
}).when(innerMock).put(any(Bytes.class), any(byte[].class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use doAnswer(...).when(...) here?

In mockGet we use when(...).thanAnswer(...) what I find much easier to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the question!

We use doAnswer(...).when(...) here because put is a void method. Mockito requires this syntax for mocking void methods — when(...).thenAnswer(...) only works for methods that return a value.

In contrast, get returns a value, so we can use when(...).thenAnswer(...) there, which I agree is a bit more readable.

    @Override
    public synchronized void put(final Bytes key, final byte[] value) {
        putInternal(key, value);
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. My bad. Thanks.

}

private void mockPutAll(final Map<Bytes, byte[]> mockMap) {
doAnswer(invocation -> {
final List<KeyValue<Bytes, byte[]>> entries = invocation.getArgument(0);
for (final KeyValue<Bytes, byte[]> entry : entries) {
mockMap.put(entry.key, entry.value);
}
return null;
}).when(innerMock).putAll(anyList());
}
private void mockDelete(final Map<Bytes, byte[]> mockMap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you forgot the add empty lines between methods. More of the same below.

doAnswer(invocation -> {
final Bytes key = invocation.getArgument(0);
final byte[] oldValue = mockMap.get(key);
mockMap.remove(key);
return oldValue;
}).when(innerMock).delete(any(Bytes.class));
}
private void mockPutIfAbsent(final Map<Bytes, byte[]> mockMap) {
doAnswer(invocation -> {
final Bytes key = invocation.getArgument(0);
final byte[] value = invocation.getArgument(1);
return mockMap.putIfAbsent(key, value);
}).when(innerMock).putIfAbsent(any(Bytes.class), any(byte[].class));
}
private void mockPrefixScan(final Map<Bytes, byte[]> mockMap) {
when(innerMock.prefixScan(anyString(), any())).thenAnswer(invocation -> {
final String prefix = invocation.getArgument(0);
final List<KeyValue<Bytes, byte[]>> matchingRecords = new ArrayList<>();
for (final Map.Entry<Bytes, byte[]> entry : mockMap.entrySet()) {
if (entry.getKey().toString().startsWith(prefix)) {
matchingRecords.add(KeyValue.pair(entry.getKey(), entry.getValue()));
}
}
return new KeyValueIterator<Bytes, byte[]>() {
private final Iterator<KeyValue<Bytes, byte[]>> iterator = matchingRecords.iterator();

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public KeyValue<Bytes, byte[]> next() {
return iterator.next();
}

@Override
public void close() {
// No resources to clean up in this mock
}

@Override
public Bytes peekNextKey() {
return null;
}
};
});
}


private InternalMockProcessorContext mockContext() {
return new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Expand All @@ -112,28 +195,37 @@ public void after() {

@Test
public void shouldDelegateInit() {
final InternalMockProcessorContext context = mockContext();
final KeyValueStore<Bytes, byte[]> innerMock = mock(InMemoryKeyValueStore.class);
final InternalMockProcessorContext mockContext = mockContext();
final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock);
outer.init(context, outer);
verify(innerMock).init(context, outer);
outer.init(mockContext, outer);
verify(innerMock).init(mockContext, outer);
}

@Test
public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockGet(mockMap);
mockPosition();

store.put(hi, there);
assertThat(inner.get(hi), equalTo(there));
assertThat(innerMock.get(hi), equalTo(there));
assertThat(collector.collected().size(), equalTo(1));
assertThat(collector.collected().get(0).key(), equalTo(hi));
assertThat(collector.collected().get(0).value(), equalTo(there));
}

@Test
public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPutAll(mockMap);
mockGet(mockMap);
mockPosition();

store.putAll(Arrays.asList(KeyValue.pair(hi, there),
KeyValue.pair(hello, world)));
assertThat(inner.get(hi), equalTo(there));
assertThat(inner.get(hello), equalTo(world));
assertThat(innerMock.get(hi), equalTo(there));
assertThat(innerMock.get(hello), equalTo(world));

assertThat(collector.collected().size(), equalTo(2));
assertThat(collector.collected().get(0).key(), equalTo(hi));
Expand All @@ -144,20 +236,37 @@ public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {

@Test
public void shouldPropagateDelete() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockGet(mockMap);
mockDelete(mockMap);
mockPosition();

store.put(hi, there);
store.delete(hi);
assertThat(inner.approximateNumEntries(), equalTo(0L));
assertThat(inner.get(hi), nullValue());

assertThat(innerMock.approximateNumEntries(), equalTo(0L));
assertThat(innerMock.get(hi), nullValue());
}

@Test
public void shouldReturnOldValueOnDelete() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockDelete(mockMap);
mockPosition();

store.put(hi, there);
assertThat(store.delete(hi), equalTo(there));
}

@Test
public void shouldLogKeyNullOnDelete() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockDelete(mockMap);
mockPosition();

store.put(hi, there);
assertThat(store.delete(hi), equalTo(there));

Expand All @@ -170,19 +279,34 @@ public void shouldLogKeyNullOnDelete() {

@Test
public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPutIfAbsent(mockMap);
mockGet(mockMap);
mockPosition();

store.putIfAbsent(hi, there);
assertThat(inner.get(hi), equalTo(there));
assertThat(innerMock.get(hi), equalTo(there));
}

@Test
public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPutIfAbsent(mockMap);
mockGet(mockMap);
mockPosition();

store.put(hi, there);
store.putIfAbsent(hi, world);
assertThat(inner.get(hi), equalTo(there));
assertThat(innerMock.get(hi), equalTo(there));
}

@Test
public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPutIfAbsent(mockMap);
mockPosition();

store.putIfAbsent(hi, there);

assertThat(collector.collected().size(), equalTo(1));
Expand All @@ -192,6 +316,11 @@ public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {

@Test
public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPutIfAbsent(mockMap);
mockPosition();

store.put(hi, there);
store.putIfAbsent(hi, world);

Expand All @@ -202,23 +331,42 @@ public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {

@Test
public void shouldReturnCurrentValueOnPutIfAbsent() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPutIfAbsent(mockMap);
mockPosition();

store.put(hi, there);
assertThat(store.putIfAbsent(hi, world), equalTo(there));
}

@Test
public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPutIfAbsent(mockMap);
mockPosition();

assertThat(store.putIfAbsent(hi, there), is(nullValue()));
}

@Test
public void shouldReturnValueOnGetWhenExists() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockGet(mockMap);
mockPosition();

store.put(hello, world);
assertThat(store.get(hello), equalTo(world));
}

@Test
public void shouldGetRecordsWithPrefixKey() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPrefixScan(mockMap);
mockPosition();

store.put(hi, there);
store.put(Bytes.increment(hi), world);

Expand All @@ -242,11 +390,18 @@ public void shouldGetRecordsWithPrefixKey() {

@Test
public void shouldReturnNullOnGetWhenDoesntExist() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockGet(mockMap);

assertThat(store.get(hello), is(nullValue()));
}

@Test
public void shouldLogPositionOnPut() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPosition();

context.setRecordContext(new ProcessorRecordContext(-1, INPUT_OFFSET, INPUT_PARTITION, INPUT_TOPIC_NAME, new RecordHeaders()));
context.setTime(1L);
store.put(hi, there);
Expand All @@ -264,13 +419,13 @@ public void shouldLogPositionOnPut() {
}

private StreamsConfig streamsConfigMock() {
final StreamsConfig streamsConfig = mock(StreamsConfig.class);
final StreamsConfig mockedStreamsConfig = mock(StreamsConfig.class);

final Map<String, Object> myValues = new HashMap<>();
myValues.put(InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, true);
when(streamsConfig.originals()).thenReturn(myValues);
when(streamsConfig.values()).thenReturn(Collections.emptyMap());
when(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id");
return streamsConfig;
when(mockedStreamsConfig.originals()).thenReturn(myValues);
when(mockedStreamsConfig.values()).thenReturn(Collections.emptyMap());
when(mockedStreamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id");
return mockedStreamsConfig;
}
}