Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ public void onRestoreEnd(final TopicPartition topicPartition,
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
task00.toString(),
".checkpoint"
".checkpoint_" + stateStoreName
).toFile();
assertTrue(checkpointFile.exists());
final Map<TopicPartition, Long> checkpoints = new OffsetCheckpoint(checkpointFile).read();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.ThreadCache;

Expand Down Expand Up @@ -77,7 +78,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;

/**
Expand Down Expand Up @@ -263,7 +263,7 @@ public void initializeStartupStores(final TopologyMetadata topologyMetadata,
try {
// We only handle TaskCorruptedException at this point. Any other exception is considered fatal.
StateManagerUtil.registerStateStores(log, threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
temporaryStateManager.checkpoint();
temporaryStateManager.flush();
} catch (final TaskCorruptedException tce) {
// At this point, we only log a warning and continue with the startup store initialization.
// The task-corrupted exception will be handled in the first Task assignment phase.
Expand Down Expand Up @@ -426,13 +426,6 @@ private String getNamedTopologyDirName(final String topologyName) {
return "__" + topologyName + "__";
}

/**
* @return The File handle for the checkpoint in the given task's directory
*/
File checkpointFileFor(final TaskId taskId) {
return new File(getOrCreateDirectoryForTask(taskId), StateManagerUtil.CHECKPOINT_FILE_NAME);
}

/**
* Decide if the directory of the task is empty or not
*/
Expand All @@ -444,7 +437,7 @@ boolean directoryForTaskIsEmpty(final TaskId taskId) {

private boolean taskDirIsEmpty(final File taskDir) {
final File[] storeDirs = taskDir.listFiles(pathname ->
!pathname.getName().equals(CHECKPOINT_FILE_NAME));
!pathname.getName().startsWith(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME));

boolean taskDirEmpty = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ static void registerStateStores(final Logger log,
// We should only load checkpoint AFTER the corresponding state directory lock has been acquired and
// the state stores have been registered; we should not try to load at the state manager construction time.
// See https://issues.apache.org/jira/browse/KAFKA-8574
stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty);
stateMgr.initializeStoreOffsets(storeDirsEmpty);
log.debug("Initialized state stores");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -404,14 +403,6 @@ public void resume() {
// just transit the state without any logical changes: suspended and restoring states
// are not actually any different for inner modules

// Deleting checkpoint file before transition to RESTORING state (KAFKA-10362)
try {
stateMgr.deleteCheckPointFileIfEOSEnabled();
log.debug("Deleted check point file upon resuming with EOS enabled");
} catch (final IOException ioe) {
log.error("Encountered error while deleting the checkpoint file due to this exception", ioe);
}

transitionTo(State.RESTORING);
log.info("Resumed to restoring state");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,7 @@ private void createTasks() {
when(builder.buildSubtopology(0)).thenReturn(topology);
when(topology.sinkTopics()).thenReturn(emptySet());
when(stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(mock(File.class));
when(stateDirectory.checkpointFileFor(task00)).thenReturn(mock(File.class));
when(stateDirectory.getOrCreateDirectoryForTask(task01)).thenReturn(mock(File.class));
when(stateDirectory.checkpointFileFor(task01)).thenReturn(mock(File.class));
when(topology.source("topic")).thenReturn(sourceNode);
when(sourceNode.timestampExtractor()).thenReturn(mock(TimestampExtractor.class));
when(topology.sources()).thenReturn(Collections.singleton(sourceNode));
Expand Down
Loading
Loading