KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (5/N) #21455
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (5/N) #21455aliehsaeedii wants to merge 15 commits intoapache:trunkfrom
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
| return returnHeadersStore ? | ||
| new RocksDBTimestampedStoreWithHeaders(name, metricsScope()) : | ||
| returnTimestampedStore ? | ||
| new RocksDBTimestampedStore(name, metricsScope()) : | ||
| new RocksDBStore(name, metricsScope()); |
There was a problem hiding this comment.
Given this condition is getting complex, should we use if statement?
For example
if (returnHeadersStore)
return new RocksDBTimestampedStoreWithHeaders(name, metricsScope());
if (returnTimestampedStore)
return new RocksDBTimestampedStore(name, metricsScope());
return new RocksDBStore(name, metricsScope());
| * {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders} and | ||
| * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore}. |
There was a problem hiding this comment.
Could you please use import instead of full qualified name?
| * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore}. | ||
| * <p> | ||
| * If a user provides a supplier for {@code TimestampedKeyValueStore} (without headers) via | ||
| * {@link org.apache.kafka.streams.kstream.Materialized#as(KeyValueBytesStoreSupplier)} when building |
| import static org.hamcrest.MatcherAssert.assertThat; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.core.IsInstanceOf.instanceOf; |
There was a problem hiding this comment.
As previous PRs, we should avoid using hamcrest
| import static org.hamcrest.CoreMatchers.equalTo; | ||
| import static org.hamcrest.MatcherAssert.assertThat; |
There was a problem hiding this comment.
As previous PRs, we should avoid using hamcrest
streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Outdated
Show resolved
Hide resolved
mjsax
left a comment
There was a problem hiding this comment.
Made a pass (skipped tests for now)
| final boolean returnHeadersStore) { | ||
| this.name = name; | ||
| this.returnTimestampedStore = returnTimestampedStore; | ||
| this.returnHeadersStore = returnHeadersStore; |
There was a problem hiding this comment.
Should we add a check for invalid combination of returnTimestampedStore and returnHeadersStore?
We only support 3 out of 4 combinations, right?
There was a problem hiding this comment.
Due to the first else, the 4th combination (both true) is served with the first if which sematically is not wrong as headers stores are both ts and are WithHeaders. No idea if considering 4th combination helps. Should we throw exception for now?
There was a problem hiding this comment.
Should we throw exception for now?
That was the idea.
| if (!enableCaching) { | ||
| return inner; | ||
| } | ||
| return new CachingKeyValueStore(inner, true); |
There was a problem hiding this comment.
I was just wondering about true parameter -- seems to be related to IQv2. Do we need to do something about this (I believe yes from a quick look into the code) -- can we write a proper test, that IQv2 is disabled, and does not crash anything if one tries to use it, and provides a good error message?
...ava/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Generic iterator adapter for range/all operations | ||
| */ | ||
| private static class TimestampedToHeadersIteratorAdapter<K> implements KeyValueIterator<K, byte[]> { |
There was a problem hiding this comment.
Why are we nesting this as private classes? For the ts/kv-adapter we have it as top level classes, and re-use them for other store types, too. Can't we re-use them for other store types for the header case?
There was a problem hiding this comment.
For the ts/kv-adapter we have it as top level classes
I see private static class KeyValueToTimestampedKeyValueAdapterIterator inside KeyValueToTimestampedKeyValueByteStoreAdapter
There was a problem hiding this comment.
There is two classes with very similar names... KeyValueToTimestampedKeyValueIteratorAdapter (is a standalone class), and there is also KeyValueToTimestampedKeyValueAdapterIterator which is the private one... (we would rename the private one as a pointed out on some other comment to get better names -- very easy to mix up)
...src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
Outdated
Show resolved
Hide resolved
|
|
||
| private final RocksDbIterator rocksDbIterator; | ||
|
|
||
| public TimestampedToHeadersRocksIteratorAdapter(final RocksDbIterator rocksDbIterator) { |
There was a problem hiding this comment.
This naming is much better than the one from KeyValueToTimestampedKeyValueByteStoreAdapter.KeyValueToTimestampedKeyValueAdapterIterator -- can we update the exiting name over there to align naming schema (best in a follow up PR)
| return returnTimestampedStore ? | ||
| new RocksDBTimestampedStore(name, metricsScope()) : | ||
| new RocksDBStore(name, metricsScope()); | ||
| if (returnHeadersStore) { |
There was a problem hiding this comment.
| if (returnHeadersStore) { | |
| if (returnTimestampedStore && returnHeadersStore) { |
There was a problem hiding this comment.
Maybe somewhat redundant, but this is how the flags are used... seems cleaner?
There was a problem hiding this comment.
Well, it's redundant because returnTimestampedStore == false and returnHeadersStore == true is not something we support. -- And returnHeadersStore == true implies that returnTimestampedStore must be true as well. -- It still feel "cleaner" to check both flags?
|
Build failed |
This PR adds required classes or modifies the existing ones to build
the
TimestampedKeyValueStoreWithHeadersintroduced inKIP-1271.
It include utests as well.