KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (4/N)#21454
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (4/N)#21454frankvicky merged 7 commits intoapache:trunkfrom
Conversation
| assertEquals(hi, collector.collected().get(0).key()); | ||
| assertArrayEquals(there.value(), (byte[]) collector.collected().get(0).value()); | ||
| assertEquals(97L, collector.collected().get(0).timestamp()); | ||
| Headers headers0 = collector.collected().get(0).headers(); |
| assertEquals(hi, collector.collected().get(0).key()); | ||
| assertArrayEquals(there.value(), (byte[]) collector.collected().get(0).value()); | ||
| assertEquals(97L, collector.collected().get(0).timestamp()); | ||
| Headers headers0 = collector.collected().get(0).headers(); |
| assertEquals(hello, collector.collected().get(1).key()); | ||
| assertArrayEquals(world.value(), (byte[]) collector.collected().get(1).value()); | ||
| assertEquals(98L, collector.collected().get(1).timestamp()); | ||
| Headers headers1 = collector.collected().get(1).headers(); |
| assertArrayEquals(there.value(), (byte[]) collector.collected().get(0).value()); | ||
| assertEquals(97L, collector.collected().get(0).timestamp()); | ||
|
|
||
| Headers headers = collector.collected().get(0).headers(); |
| assertEquals(hi, collector.collected().get(0).key()); | ||
| assertArrayEquals(there.value(), (byte[]) collector.collected().get(0).value()); | ||
| assertEquals(97L, collector.collected().get(0).timestamp()); | ||
| Headers headers0 = collector.collected().get(0).headers(); |
...ams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
Show resolved
Hide resolved
| /** | ||
| * Extract raw value from serialized ValueTimestampHeaders. | ||
| */ | ||
| static byte[] rawValue(final byte[] rawValueTimestampHeaders) { |
There was a problem hiding this comment.
We have static <T> T value(...) above -- given that we need rawValue here, wondering if value(...) from above is actually needed or not at all?
There was a problem hiding this comment.
that method returns the deserialized value while raawValue returns a byte[]
There was a problem hiding this comment.
Yes, I am not questioning that we add rawValue in this PR, I am asking why did we add value(...) in a previous PR, and when would we use it?
But it's somewhat unrelated to this PR, and it's only internal code here. So we could remove value(...) if unused also at some point in the future. Was just wondering.
There was a problem hiding this comment.
We can remove it at some point if not needed: https://issues.apache.org/jira/browse/KAFKA-20193
| } | ||
| } | ||
|
|
||
| void log(final Bytes key, final byte[] value, final long timestamp, final Headers headers) { |
There was a problem hiding this comment.
If we just extend InternalProcessorContext.log(...) with a new Headers parameter and not add a new overload, and update all existing code accordingly, I believe we can remove the method and reuse the existing one from ChangeLoggingKeyValueBytesStore
There was a problem hiding this comment.
It's a very big change. It will chnage many tests.
...kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeadersTest.java
Show resolved
Hide resolved
...kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeadersTest.java
Show resolved
Hide resolved
| /** | ||
| * Extract raw value from serialized ValueTimestampHeaders. | ||
| */ | ||
| static byte[] rawValue(final byte[] rawValueTimestampHeaders) { |
There was a problem hiding this comment.
Yes, I am not questioning that we add rawValue in this PR, I am asking why did we add value(...) in a previous PR, and when would we use it?
But it's somewhat unrelated to this PR, and it's only internal code here. So we could remove value(...) if unused also at some point in the future. Was just wondering.
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
Outdated
Show resolved
Hide resolved
|
Failing unit test. Needs a fix. |
| valueTimestampHeaders == null | ||
| ? internalContext.recordContext().timestamp() | ||
| : timestamp(valueTimestampHeaders), | ||
| headers(valueTimestampHeaders) |
There was a problem hiding this comment.
Think we need to do the same as for ts, and use internalContext.recordContext().headers() if valueTimestampHeaders is null.
550d18e to
d81dddb
Compare
frankvicky
left a comment
There was a problem hiding this comment.
LGTM, thanks for the PR.
This PR implements the changelogging layer of the
TimestampedKeyValueStoreWithHeadersintroduced in KIP-1271.Reviewers: TengYao Chi frankvicky@apache.org, Matthias J. Sax
matthias@confluent.io