Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private void shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(final boolean ap
+ File.separator
+ "global");
assertTrue(globalStateDir.mkdirs());
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(globalStateDir, ".checkpoint"));
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(globalStateDir, ".checkpoint_" + globalStore));

// set the checkpointed offset to the commit marker of partition-1
// even if `poll()` won't return any data for partition-1, we should still finish the restore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
Expand All @@ -44,13 +45,12 @@
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;

import org.slf4j.Logger;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -65,7 +65,6 @@

import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.processor.internals.RecordDeserializer.handleDeserializationFailure;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;

Expand Down Expand Up @@ -102,19 +101,20 @@ private static class StateStoreMetadata {

private final Time time;
private final Logger log;
private final String logPrefix;
private final StateDirectory stateDirectory;
private final File baseDir;
private final long taskTimeoutMs;
private final ProcessorTopology topology;
private final OffsetCheckpoint checkpointFile;
private final Duration pollMsPlusRequestTimeout;
private final Consumer<byte[], byte[]> globalConsumer;
private final StateRestoreListener stateRestoreListener;
private final Map<TopicPartition, Long> checkpointFileCache;
private final Map<TopicPartition, Long> currentOffsets;
private final Map<String, String> storeToChangelogTopic;
private final Set<String> globalStoreNames = new HashSet<>();
private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>();
private final Map<String, StateStoreMetadata> storeMetadata = new HashMap<>();
private final boolean eosEnabled;
private InternalProcessorContext<?, ?> globalProcessorContext;
private DeserializationExceptionHandler deserializationExceptionHandler;
private ProcessingExceptionHandler processingExceptionHandler;
Expand All @@ -129,20 +129,18 @@ public GlobalStateManagerImpl(final LogContext logContext,
final StreamsConfig config) {
this.time = time;
this.topology = topology;
this.stateDirectory = stateDirectory;
baseDir = stateDirectory.globalStateDir();
storeToChangelogTopic = topology.storeToChangelogTopic();
checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
checkpointFileCache = new HashMap<>();
currentOffsets = new HashMap<>();

// Find non persistent store's topics
for (final StateStore store : topology.globalStateStores()) {
globalStoreNames.add(store.name());
if (!store.persistent()) {
globalNonPersistentStoresTopics.add(changelogFor(store.name()));
}
}

log = logContext.logger(GlobalStateManagerImpl.class);
logPrefix = logContext.logPrefix();
this.globalConsumer = globalConsumer;
this.stateRestoreListener = stateRestoreListener;

Expand All @@ -160,6 +158,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
@SuppressWarnings("deprecation")
final boolean globalEnabled = config.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG);
processingExceptionHandler = globalEnabled ? config.processingExceptionHandler() : null;
eosEnabled = StreamsConfigUtils.eosEnabled(config);
}

@Override
Expand All @@ -169,41 +168,40 @@ public void setGlobalProcessorContext(final InternalProcessorContext<?, ?> globa

@Override
public Set<String> initialize() {
try {
checkpointFileCache.putAll(checkpointFile.read());
} catch (final IOException e) {
throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
}

droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
globalProcessorContext.taskId().toString(),
globalProcessorContext.metrics()
);

final Set<String> changelogTopics = new HashSet<>();
final Map<TopicPartition, StateStore> wrappedStores = new HashMap<>();
for (final StateStore stateStore : topology.globalStateStores()) {
final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
changelogTopics.add(sourceTopic);
stateStore.init(globalProcessorContext, stateStore);
}
final List<TopicPartition> storePartitions = topicPartitionsForStore(stateStore);
final StateStore maybeWrappedStore = LegacyCheckpointingStateStore.maybeWrapStore(
stateStore, eosEnabled, new HashSet<>(storePartitions), stateDirectory, null, logPrefix);
maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);

// make sure each topic-partition from checkpointFileCache is associated with a global state store
checkpointFileCache.keySet().forEach(tp -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed this check an it makes sense since we're moving away from checkpoint files, with the new approach I'm thinking there's no way for this to get out sync now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'm not really sure what this check was achieving. Does it matter if the checkpoint contains unknown topic-partitions? Sure, it would indicate a discrepancy, but do we want to crash, or just correct it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do want to keep this, the right place to do this check is LegacyCheckpointingStateStore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'm not really sure what this check was achieving. Does it matter if the checkpoint contains unknown topic-partitions? Sure, it would indicate a discrepancy, but do we want to crash, or just correct it?

Good point

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just leave this is as it is for now and we can revisit

if (!changelogTopics.contains(tp.topic())) {
log.error(
"Encountered a topic-partition in the global checkpoint file not associated with any global" +
" state store, topic-partition: {}, checkpoint file: {}. If this topic-partition is no longer valid," +
" an application reset and state store directory cleanup will be required.",
tp.topic(),
checkpointFile
);
throw new StreamsException("Encountered a topic-partition not associated with any global state store");
for (final TopicPartition storePartition : storePartitions) {
wrappedStores.put(storePartition, maybeWrappedStore);
}
});
}

// migrate offsets from legacy checkpoint file into the stores
LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, stateDirectory, null, wrappedStores);

// restore or reprocess each registered store using the now-populated currentOffsets
for (final StateStoreMetadata metadata : storeMetadata.values()) {
// load the committed offsets from the store
final StateStore store = metadata.stateStore;
if (store.persistent()) {
for (final TopicPartition partition : metadata.changelogPartitions) {
final Long offset = store.committedOffset(partition);
if (offset != null) {
currentOffsets.put(partition, offset);
}
}
}

// restore or reprocess each registered store using the now-populated currentOffsets
try {
if (metadata.reprocessFactory.isPresent()) {
reprocessState(metadata);
Expand All @@ -219,7 +217,7 @@ public Set<String> initialize() {
}

public StateStore globalStore(final String name) {
return globalStores.getOrDefault(name, Optional.empty()).orElse(null);
return LegacyCheckpointingStateStore.maybeUnwrapStore(globalStores.getOrDefault(name, Optional.empty()).orElse(null));
}

@Override
Expand Down Expand Up @@ -267,9 +265,9 @@ public void registerStore(final StateStore store,
);

final Optional<InternalTopologyBuilder.ReprocessFactory<?, ?, ?, ?>> reprocessFactory = topology
.storeNameToReprocessOnRestore().getOrDefault(store.name(), Optional.empty());
.storeNameToReprocessOnRestore().getOrDefault(store.name(), Optional.empty());
storeMetadata.put(store.name(), new StateStoreMetadata(
store, topicPartitions, reprocessFactory, stateRestoreCallback, converterForStore(store), highWatermarks));
store, topicPartitions, reprocessFactory, stateRestoreCallback, converterForStore(store), highWatermarks));
}

private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
Expand Down Expand Up @@ -310,7 +308,7 @@ private void reprocessState(final StateStoreMetadata storeMetadata) {

globalConsumer.assign(Collections.singletonList(topicPartition));
long offset;
final Long checkpoint = checkpointFileCache.get(topicPartition);
final Long checkpoint = currentOffsets.get(topicPartition);
if (checkpoint != null) {
globalConsumer.seek(topicPartition, checkpoint);
offset = checkpoint;
Expand Down Expand Up @@ -441,7 +439,7 @@ private void reprocessState(final StateStoreMetadata storeMetadata) {
stateRestoreListener.onBatchRestored(topicPartition, storeMetadata.stateStore.name(), offset, batchRestoreCount);
}
stateRestoreListener.onRestoreEnd(topicPartition, storeMetadata.stateStore.name(), restoreCount);
checkpointFileCache.put(topicPartition, offset);
currentOffsets.put(topicPartition, offset);

}
}
Expand All @@ -452,7 +450,7 @@ private void restoreState(final StateStoreMetadata storeMetadata) {

globalConsumer.assign(Collections.singletonList(topicPartition));
long offset;
final Long checkpoint = checkpointFileCache.get(topicPartition);
final Long checkpoint = currentOffsets.get(topicPartition);
if (checkpoint != null) {
globalConsumer.seek(topicPartition, checkpoint);
offset = checkpoint;
Expand Down Expand Up @@ -497,7 +495,7 @@ private void restoreState(final StateStoreMetadata storeMetadata) {
restoreCount += restoreRecords.size();
}
stateRestoreListener.onRestoreEnd(topicPartition, storeMetadata.stateStore.name(), restoreCount);
checkpointFileCache.put(topicPartition, offset);
currentOffsets.put(topicPartition, offset);
}
}

Expand Down Expand Up @@ -561,7 +559,17 @@ public void flush() {
final StateStore store = entry.getValue().get();
try {
log.trace("Committing global store={}", store.name());
store.commit(Map.of());
// construct per-store Map of offsets to commit
final List<TopicPartition> storePartitions = storeMetadata.get(store.name()).changelogPartitions;
final Map<TopicPartition, Long> storeOffsets = new HashMap<>(storePartitions.size());

// only add offsets for persistent stores
if (store.persistent()) {
for (final TopicPartition storePartition : storePartitions) {
storeOffsets.put(storePartition, currentOffsets.get(storePartition));
}
}
store.commit(storeOffsets);
} catch (final RuntimeException e) {
throw new ProcessorStateException(
String.format("Failed to commit global state store %s", store.name()),
Expand All @@ -574,6 +582,10 @@ public void flush() {
}
}

@Override
public void checkpoint() {
}

@Override
public void close() {
if (globalStores.isEmpty()) {
Expand Down Expand Up @@ -606,28 +618,7 @@ public void close() {

@Override
public void updateChangelogOffsets(final Map<TopicPartition, Long> offsets) {
checkpointFileCache.putAll(offsets);
}

@Override
public void checkpoint() {
final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();

// Skip non persistent store
for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointFileCache.entrySet()) {
final String topic = topicPartitionOffset.getKey().topic();
if (!globalNonPersistentStoresTopics.contains(topic)) {
filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
}
}

try {
checkpointFile.write(filteredOffsets);
} catch (final IOException e) {
log.warn("Failed to write offset checkpoint file to {} for global stores." +
" This may occur if OS cleaned the state.dir in case when it is located in the (default) ${java.io.tmpdir}/kafka-streams directory." +
" Changing the location of state.dir may resolve the problem", checkpointFile, e);
}
currentOffsets.putAll(offsets);
}

@Override
Expand All @@ -637,7 +628,7 @@ public TaskType taskType() {

@Override
public Map<TopicPartition, Long> changelogOffsets() {
return Collections.unmodifiableMap(checkpointFileCache);
return Collections.unmodifiableMap(currentOffsets);
}

public final String changelogFor(final String storeName) {
Expand Down
Loading
Loading