Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ public void onRestoreEnd(final TopicPartition topicPartition,
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
task00.toString(),
".checkpoint"
".checkpoint_" + stateStoreName
).toFile();
assertTrue(checkpointFile.exists());
final Map<TopicPartition, Long> checkpoints = new OffsetCheckpoint(checkpointFile).read();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.ThreadCache;

Expand Down Expand Up @@ -77,7 +78,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;

/**
Expand Down Expand Up @@ -263,7 +263,7 @@ public void initializeStartupStores(final TopologyMetadata topologyMetadata,
try {
// We only handle TaskCorruptedException at this point. Any other exception is considered fatal.
StateManagerUtil.registerStateStores(log, threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
temporaryStateManager.checkpoint();
temporaryStateManager.flush();
} catch (final TaskCorruptedException tce) {
// At this point, we only log a warning and continue with the startup store initialization.
// The task-corrupted exception will be handled in the first Task assignment phase.
Expand Down Expand Up @@ -426,13 +426,6 @@ private String getNamedTopologyDirName(final String topologyName) {
return "__" + topologyName + "__";
}

/**
* @return The File handle for the checkpoint in the given task's directory
*/
File checkpointFileFor(final TaskId taskId) {
return new File(getOrCreateDirectoryForTask(taskId), StateManagerUtil.CHECKPOINT_FILE_NAME);
}

/**
* Decide if the directory of the task is empty or not
*/
Expand All @@ -444,7 +437,7 @@ boolean directoryForTaskIsEmpty(final TaskId taskId) {

private boolean taskDirIsEmpty(final File taskDir) {
final File[] storeDirs = taskDir.listFiles(pathname ->
!pathname.getName().equals(CHECKPOINT_FILE_NAME));
!pathname.getName().equals(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be updated as it looks like the filter is going miss the new checkpoint file names of checkpoint_<store name>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot, I also found another place that does a similar check. Both have been updated to use startsWith instead of equals, to handle both legacy and per-store checkpoint file names.


boolean taskDirEmpty = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ static void registerStateStores(final Logger log,
// We should only load checkpoint AFTER the corresponding state directory lock has been acquired and
// the state stores have been registered; we should not try to load at the state manager construction time.
// See https://issues.apache.org/jira/browse/KAFKA-8574
stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty);
stateMgr.initializeStoreOffsets(storeDirsEmpty);
log.debug("Initialized state stores");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -404,14 +403,6 @@ public void resume() {
// just transit the state without any logical changes: suspended and restoring states
// are not actually any different for inner modules

// Deleting checkpoint file before transition to RESTORING state (KAFKA-10362)
try {
stateMgr.deleteCheckPointFileIfEOSEnabled();
log.debug("Deleted check point file upon resuming with EOS enabled");
} catch (final IOException ioe) {
log.error("Encountered error while deleting the checkpoint file due to this exception", ioe);
}

transitionTo(State.RESTORING);
log.info("Resumed to restoring state");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,7 @@ private void createTasks() {
when(builder.buildSubtopology(0)).thenReturn(topology);
when(topology.sinkTopics()).thenReturn(emptySet());
when(stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(mock(File.class));
when(stateDirectory.checkpointFileFor(task00)).thenReturn(mock(File.class));
when(stateDirectory.getOrCreateDirectoryForTask(task01)).thenReturn(mock(File.class));
when(stateDirectory.checkpointFileFor(task01)).thenReturn(mock(File.class));
when(topology.source("topic")).thenReturn(sourceNode);
when(sourceNode.timestampExtractor()).thenReturn(mock(TimestampExtractor.class));
when(topology.sources()).thenReturn(Collections.singleton(sourceNode));
Expand Down
Loading
Loading