Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -148,6 +149,16 @@ public void logChange(final String storeName,
throw new UnsupportedOperationException("this should not happen: logChange() not supported in global processor context.");
}

@Override
public void logChange(final String storeName,
final Bytes key,
final byte[] value,
final long timestamp,
final Headers headers,
final Position position) {
throw new UnsupportedOperationException("this should not happen: logChange() not supported in global processor context.");
}

@Override
public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
throw new UnsupportedOperationException("this should not happen: transitionToActive() not supported in global processor context.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -122,6 +123,13 @@ void logChange(final String storeName,
final long timestamp,
final Position position);

void logChange(final String storeName,
final Bytes key,
final byte[] value,
final long timestamp,
final Headers headers,
final Position position);

String changelogFor(final String storeName);

void addProcessorMetadataKeyValue(final String key, final long value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,7 @@ public void logChange(final String storeName,
if (!consistencyEnabled) {
headers = null;
} else {
// Add the vector clock to the header part of every record
headers = new RecordHeaders();
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
headers.add(new RecordHeader(ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY,
PositionSerde.serialize(position).array()));
addVectorClockToHeaders(headers = new RecordHeaders(), position);
}

collector.send(
Expand All @@ -153,6 +149,40 @@ public void logChange(final String storeName,
null);
}

@Override
public void logChange(final String storeName,
final Bytes key,
final byte[] value,
final long timestamp,
final Headers headers,
final Position position) {
throwUnsupportedOperationExceptionIfStandby("logChange");

final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName);

if (consistencyEnabled) {
addVectorClockToHeaders(headers, position);
}

collector.send(
changelogPartition.topic(),
key,
value,
headers,
changelogPartition.partition(),
timestamp,
BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER,
null,
null);
}

private void addVectorClockToHeaders(Headers headers, Position position) {
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
headers.add(new RecordHeader(ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY,
PositionSerde.serialize(position).array()));
}

/**
* @throws StreamsException if an attempt is made to access this state store from an unknown node
* @throws UnsupportedOperationException if the current streamTask type is standby
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.List;

import static org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
import static org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
import static org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;

/**
* Change-logging wrapper for a timestamped key-value bytes store whose values also carry headers.
* <p>
* the header-aware serialized value format produced by {@link ValueTimestampHeadersSerializer}.
* <p>
* Semantics:
* - The inner store value format is:
* [ varint header_length ][ header_bytes ][ 8-byte timestamp ][ value_bytes ]
* - The changelog record value logged via {@code log(...)} remains just {@code value_bytes}
* (no timestamp, no headers), and the timestamp is logged separately.
*/
public class ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
extends ChangeLoggingKeyValueBytesStore {

ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(final KeyValueStore<Bytes, byte[]> inner) {
super(inner);
}

@Override
public void put(final Bytes key,
final byte[] valueTimestampHeaders) {
wrapped().put(key, valueTimestampHeaders);
log(
key,
rawValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
headers(valueTimestampHeaders)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we need to do the same as for ts, and use internalContext.recordContext().headers() if valueTimestampHeaders is null.

);
}

@Override
public byte[] putIfAbsent(final Bytes key,
final byte[] valueTimestampHeaders) {
final byte[] previous = wrapped().putIfAbsent(key, valueTimestampHeaders);
if (previous == null) {
// then it was absent
log(
key,
rawValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
headers(valueTimestampHeaders)
);
}
return previous;
}

@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueTimestampHeaders = entry.value;
log(
entry.key,
rawValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
headers(valueTimestampHeaders)
);
}
}

void log(final Bytes key, final byte[] value, final long timestamp, final Headers headers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just extend InternalProcessorContext.log(...) with a new Headers parameter and not add a new overload, and update all existing code accordingly, I believe we can remove the method and reuse the existing one from ChangeLoggingKeyValueBytesStore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a very big change. It will chnage many tests.

internalContext.logChange(name(), key, value, timestamp, headers, wrapped().getPosition());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,17 @@ static Headers headers(final byte[] rawValueTimestampHeaders) {
final byte[] rawHeaders = readBytes(buffer, headersSize);
return HEADERS_DESERIALIZER.deserialize("", rawHeaders);
}
/**
* Extract raw value from serialized ValueTimestampHeaders.
*/
static byte[] rawValue(final byte[] rawValueTimestampHeaders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have static <T> T value(...) above -- given that we need rawValue here, wondering if value(...) from above is actually needed or not at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that method returns the deserialized value while raawValue returns a byte[]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am not questioning that we add rawValue in this PR, I am asking why did we add value(...) in a previous PR, and when would we use it?

But it's somewhat unrelated to this PR, and it's only internal code here. So we could remove value(...) if unused also at some point in the future. Was just wondering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it at some point if not needed: https://issues.apache.org/jira/browse/KAFKA-20193

if (rawValueTimestampHeaders == null) {
return null;
}

final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
final int headersSize = ByteUtils.readVarint(buffer);
buffer.position(buffer.position() + headersSize + Long.BYTES);
return readBytes(buffer, buffer.remaining());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ public void logChange(final String storeName,
final Position position) {
}

@Override
public void logChange(final String storeName,
final Bytes key,
final byte[] value,
final long timestamp,
final Headers headers,
final Position position) {
}

@Override
public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
}
Expand Down
Loading