-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-10789: Streamlining Tests in ChangeLoggingKeyValueBytesStoreTest #18816
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 4 commits
181091c
6265440
f128915
d493559
05cc362
84c49c0
d205133
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,6 @@ | |
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; | ||
import org.apache.kafka.streams.query.Position; | ||
import org.apache.kafka.streams.state.KeyValueIterator; | ||
import org.apache.kafka.streams.state.KeyValueStore; | ||
import org.apache.kafka.test.InternalMockProcessorContext; | ||
import org.apache.kafka.test.MockRecordCollector; | ||
import org.apache.kafka.test.TestUtils; | ||
|
@@ -44,6 +43,7 @@ | |
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
import org.mockito.junit.jupiter.MockitoSettings; | ||
import org.mockito.quality.Strictness; | ||
|
@@ -53,6 +53,7 @@ | |
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
|
@@ -62,18 +63,23 @@ | |
import static org.hamcrest.CoreMatchers.nullValue; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.hasEntry; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.anyList; | ||
import static org.mockito.ArgumentMatchers.anyString; | ||
import static org.mockito.Mockito.doAnswer; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
@SuppressWarnings("rawtypes") | ||
@ExtendWith(MockitoExtension.class) | ||
@MockitoSettings(strictness = Strictness.STRICT_STUBS) | ||
public class ChangeLoggingKeyValueBytesStoreTest { | ||
class ChangeLoggingKeyValueBytesStoreTest { | ||
|
||
private final MockRecordCollector collector = new MockRecordCollector(); | ||
private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv"); | ||
private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner); | ||
@Mock | ||
private InMemoryKeyValueStore innerMock; | ||
private ChangeLoggingKeyValueBytesStore store; | ||
private InternalMockProcessorContext<?, ?> context; | ||
private final StreamsConfig streamsConfig = streamsConfigMock(); | ||
private final Bytes hi = Bytes.wrap("hi".getBytes()); | ||
|
@@ -89,8 +95,81 @@ public class ChangeLoggingKeyValueBytesStoreTest { | |
public void before() { | ||
context = mockContext(); | ||
context.setTime(0); | ||
store = new ChangeLoggingKeyValueBytesStore(innerMock); | ||
store.init(context, store); | ||
} | ||
private void mockPosition() { | ||
leaf-soba marked this conversation as resolved.
Show resolved
Hide resolved
|
||
when(innerMock.getPosition()).thenReturn(Position.emptyPosition()); | ||
} | ||
private void mockGet(final Map<Bytes, byte[]> mockMap) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing that out! We're using mockMap here to simulate the internal state of the underlying store, since the tests in CachingInMemoryKeyValueStoreTest are being streamlined to rely on a mocked underlyingStore as per the KIP-614 review discussion. Because innerMock is a mock and doesn’t retain state on its own, we use mockMap to mimic how the real store would behave across multiple interactions — especially for verifying behavior like reads after writes. This lets us preserve the semantics of the store while still keeping the actual store mocked, as requested. That said, I'm open to simplifying it further if there's a cleaner way to preserve the same test coverage and behavior expectations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we want to mock, but we use two different techniques and mix them... First you use
For this case, to me, all call a test make into this mock, should be stubbed via corresponding If we make the mock stateful, we can just keep using \cc @cadonna who requested to rewrite this using mocks to clarify, in cases I misunderstand the purpose of the ticket. |
||
when(innerMock.get(any(Bytes.class))).thenAnswer(invocation -> mockMap.get(invocation.getArgument(0))); | ||
} | ||
private void mockPut(final Map<Bytes, byte[]> mockMap) { | ||
doAnswer(invocation -> { | ||
mockMap.put(invocation.getArgument(0), invocation.getArgument(1)); | ||
StoreQueryUtils.updatePosition(innerMock.getPosition(), context); | ||
return null; | ||
}).when(innerMock).put(any(Bytes.class), any(byte[].class)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we use In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the question! We use doAnswer(...).when(...) here because put is a void method. Mockito requires this syntax for mocking void methods — when(...).thenAnswer(...) only works for methods that return a value. In contrast, get returns a value, so we can use when(...).thenAnswer(...) there, which I agree is a bit more readable.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah. My bad. Thanks. |
||
} | ||
private void mockPutAll(final Map<Bytes, byte[]> mockMap) { | ||
doAnswer(invocation -> { | ||
final List<KeyValue<Bytes, byte[]>> entries = invocation.getArgument(0); | ||
for (final KeyValue<Bytes, byte[]> entry : entries) { | ||
mockMap.put(entry.key, entry.value); | ||
} | ||
return null; | ||
}).when(innerMock).putAll(anyList()); | ||
} | ||
private void mockDelete(final Map<Bytes, byte[]> mockMap) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems you forgot the add empty lines between methods. More of the same below. |
||
doAnswer(invocation -> { | ||
final Bytes key = invocation.getArgument(0); | ||
final byte[] oldValue = mockMap.get(key); | ||
mockMap.remove(key); | ||
return oldValue; | ||
}).when(innerMock).delete(any(Bytes.class)); | ||
} | ||
private void mockPutIfAbsent(final Map<Bytes, byte[]> mockMap) { | ||
doAnswer(invocation -> { | ||
final Bytes key = invocation.getArgument(0); | ||
final byte[] value = invocation.getArgument(1); | ||
return mockMap.putIfAbsent(key, value); | ||
}).when(innerMock).putIfAbsent(any(Bytes.class), any(byte[].class)); | ||
} | ||
private void mockPrefixScan(final Map<Bytes, byte[]> mockMap) { | ||
when(innerMock.prefixScan(anyString(), any())).thenAnswer(invocation -> { | ||
final String prefix = invocation.getArgument(0); | ||
final List<KeyValue<Bytes, byte[]>> matchingRecords = new ArrayList<>(); | ||
for (final Map.Entry<Bytes, byte[]> entry : mockMap.entrySet()) { | ||
if (entry.getKey().toString().startsWith(prefix)) { | ||
matchingRecords.add(KeyValue.pair(entry.getKey(), entry.getValue())); | ||
} | ||
} | ||
return new KeyValueIterator<Bytes, byte[]>() { | ||
private final Iterator<KeyValue<Bytes, byte[]>> iterator = matchingRecords.iterator(); | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return iterator.hasNext(); | ||
} | ||
|
||
@Override | ||
public KeyValue<Bytes, byte[]> next() { | ||
return iterator.next(); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
// No resources to clean up in this mock | ||
} | ||
|
||
@Override | ||
public Bytes peekNextKey() { | ||
return null; | ||
} | ||
}; | ||
}); | ||
} | ||
|
||
|
||
private InternalMockProcessorContext mockContext() { | ||
return new InternalMockProcessorContext<>( | ||
|
@@ -111,29 +190,38 @@ public void after() { | |
} | ||
|
||
@Test | ||
public void shouldDelegateInit() { | ||
final InternalMockProcessorContext context = mockContext(); | ||
final KeyValueStore<Bytes, byte[]> innerMock = mock(InMemoryKeyValueStore.class); | ||
void shouldDelegateInit() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question as above for the class: I thought test methods must be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In JUnit 5, test classes and methods can have default (package-private) visibility—they don’t need to be public to be recognized and executed. However, I noticed that most of the existing Kafka unit tests still use public, so I’ll follow the current convention and revert the change for consistency. |
||
final InternalMockProcessorContext mockContext = mockContext(); | ||
final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock); | ||
outer.init(context, outer); | ||
verify(innerMock).init(context, outer); | ||
outer.init(mockContext, outer); | ||
verify(innerMock).init(mockContext, outer); | ||
} | ||
|
||
@Test | ||
public void shouldWriteKeyValueBytesToInnerStoreOnPut() { | ||
void shouldWriteKeyValueBytesToInnerStoreOnPut() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockGet(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hi, there); | ||
assertThat(inner.get(hi), equalTo(there)); | ||
assertThat(innerMock.get(hi), equalTo(there)); | ||
assertThat(collector.collected().size(), equalTo(1)); | ||
assertThat(collector.collected().get(0).key(), equalTo(hi)); | ||
assertThat(collector.collected().get(0).value(), equalTo(there)); | ||
} | ||
|
||
@Test | ||
public void shouldWriteAllKeyValueToInnerStoreOnPutAll() { | ||
void shouldWriteAllKeyValueToInnerStoreOnPutAll() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPutAll(mockMap); | ||
mockGet(mockMap); | ||
mockPosition(); | ||
|
||
store.putAll(Arrays.asList(KeyValue.pair(hi, there), | ||
KeyValue.pair(hello, world))); | ||
assertThat(inner.get(hi), equalTo(there)); | ||
assertThat(inner.get(hello), equalTo(world)); | ||
assertThat(innerMock.get(hi), equalTo(there)); | ||
assertThat(innerMock.get(hello), equalTo(world)); | ||
|
||
assertThat(collector.collected().size(), equalTo(2)); | ||
assertThat(collector.collected().get(0).key(), equalTo(hi)); | ||
|
@@ -143,21 +231,38 @@ public void shouldWriteAllKeyValueToInnerStoreOnPutAll() { | |
} | ||
|
||
@Test | ||
public void shouldPropagateDelete() { | ||
void shouldPropagateDelete() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockGet(mockMap); | ||
mockDelete(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hi, there); | ||
store.delete(hi); | ||
assertThat(inner.approximateNumEntries(), equalTo(0L)); | ||
assertThat(inner.get(hi), nullValue()); | ||
|
||
assertThat(innerMock.approximateNumEntries(), equalTo(0L)); | ||
assertThat(innerMock.get(hi), nullValue()); | ||
} | ||
|
||
@Test | ||
public void shouldReturnOldValueOnDelete() { | ||
void shouldReturnOldValueOnDelete() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockDelete(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hi, there); | ||
assertThat(store.delete(hi), equalTo(there)); | ||
} | ||
|
||
@Test | ||
public void shouldLogKeyNullOnDelete() { | ||
void shouldLogKeyNullOnDelete() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockDelete(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hi, there); | ||
assertThat(store.delete(hi), equalTo(there)); | ||
|
||
|
@@ -169,20 +274,35 @@ public void shouldLogKeyNullOnDelete() { | |
} | ||
|
||
@Test | ||
public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() { | ||
void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPutIfAbsent(mockMap); | ||
mockGet(mockMap); | ||
mockPosition(); | ||
|
||
store.putIfAbsent(hi, there); | ||
assertThat(inner.get(hi), equalTo(there)); | ||
assertThat(innerMock.get(hi), equalTo(there)); | ||
} | ||
|
||
@Test | ||
public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() { | ||
void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockPutIfAbsent(mockMap); | ||
mockGet(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hi, there); | ||
store.putIfAbsent(hi, world); | ||
assertThat(inner.get(hi), equalTo(there)); | ||
assertThat(innerMock.get(hi), equalTo(there)); | ||
} | ||
|
||
@Test | ||
public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() { | ||
void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPutIfAbsent(mockMap); | ||
mockPosition(); | ||
|
||
store.putIfAbsent(hi, there); | ||
|
||
assertThat(collector.collected().size(), equalTo(1)); | ||
|
@@ -191,7 +311,12 @@ public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() { | |
} | ||
|
||
@Test | ||
public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() { | ||
void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockPutIfAbsent(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hi, there); | ||
store.putIfAbsent(hi, world); | ||
|
||
|
@@ -201,24 +326,43 @@ public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() { | |
} | ||
|
||
@Test | ||
public void shouldReturnCurrentValueOnPutIfAbsent() { | ||
void shouldReturnCurrentValueOnPutIfAbsent() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockPutIfAbsent(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hi, there); | ||
assertThat(store.putIfAbsent(hi, world), equalTo(there)); | ||
} | ||
|
||
@Test | ||
public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() { | ||
void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPutIfAbsent(mockMap); | ||
mockPosition(); | ||
|
||
assertThat(store.putIfAbsent(hi, there), is(nullValue())); | ||
} | ||
|
||
@Test | ||
public void shouldReturnValueOnGetWhenExists() { | ||
void shouldReturnValueOnGetWhenExists() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockGet(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hello, world); | ||
assertThat(store.get(hello), equalTo(world)); | ||
} | ||
|
||
@Test | ||
public void shouldGetRecordsWithPrefixKey() { | ||
void shouldGetRecordsWithPrefixKey() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockPrefixScan(mockMap); | ||
mockPosition(); | ||
|
||
store.put(hi, there); | ||
store.put(Bytes.increment(hi), world); | ||
|
||
|
@@ -241,12 +385,19 @@ public void shouldGetRecordsWithPrefixKey() { | |
} | ||
|
||
@Test | ||
public void shouldReturnNullOnGetWhenDoesntExist() { | ||
void shouldReturnNullOnGetWhenDoesntExist() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockGet(mockMap); | ||
|
||
assertThat(store.get(hello), is(nullValue())); | ||
} | ||
|
||
@Test | ||
public void shouldLogPositionOnPut() { | ||
void shouldLogPositionOnPut() { | ||
final Map<Bytes, byte[]> mockMap = new HashMap<>(); | ||
mockPut(mockMap); | ||
mockPosition(); | ||
|
||
context.setRecordContext(new ProcessorRecordContext(-1, INPUT_OFFSET, INPUT_PARTITION, INPUT_TOPIC_NAME, new RecordHeaders())); | ||
context.setTime(1L); | ||
store.put(hi, there); | ||
|
@@ -264,13 +415,13 @@ public void shouldLogPositionOnPut() { | |
} | ||
|
||
private StreamsConfig streamsConfigMock() { | ||
final StreamsConfig streamsConfig = mock(StreamsConfig.class); | ||
final StreamsConfig mockedStreamsConfig = mock(StreamsConfig.class); | ||
|
||
final Map<String, Object> myValues = new HashMap<>(); | ||
myValues.put(InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, true); | ||
when(streamsConfig.originals()).thenReturn(myValues); | ||
when(streamsConfig.values()).thenReturn(Collections.emptyMap()); | ||
when(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id"); | ||
return streamsConfig; | ||
when(mockedStreamsConfig.originals()).thenReturn(myValues); | ||
when(mockedStreamsConfig.values()).thenReturn(Collections.emptyMap()); | ||
when(mockedStreamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id"); | ||
return mockedStreamsConfig; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can a test class be package private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In JUnit 5, test classes and methods can have default (package-private) visibility—they don’t need to be public to be recognized and executed. However, I noticed that most of the existing Kafka unit tests still use public, so I’ll follow the current convention and revert the change for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to the info. Interesting. Did not know that. -- Guess we use
public
historically, and I also don't see much value to change it to package private -- sounds like a not of unnecessary noise on PRs, w/o any clear benefit.