diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 30334abc53e1b..ff5160384483d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -178,6 +178,7 @@ public String toString() { private final FixedOrderMap stores = new FixedOrderMap<>(); private final FixedOrderMap globalStores = new FixedOrderMap<>(); + private final StateDirectory stateDirectory; private final File baseDir; private final OffsetCheckpoint checkpointFile; private final boolean stateUpdaterEnabled; @@ -218,6 +219,7 @@ public ProcessorStateManager(final TaskId taskId, this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId); this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId)); + this.stateDirectory = stateDirectory; log.debug("Created state store manager for task {}", taskId); } @@ -330,6 +332,8 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { } } + stateDirectory.updateTaskOffsets(taskId, changelogOffsets()); + if (!loadedCheckpoints.isEmpty()) { log.warn("Some loaded checkpoint offsets cannot find their corresponding state stores: {}", loadedCheckpoints); } @@ -512,10 +516,13 @@ void restore(final StateStoreMetadata storeMetadata, final List writtenOffset store.stateStore.name(), store.offset, store.changelogPartition); } } + + stateDirectory.updateTaskOffsets(taskId, changelogOffsets()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 04a62bad1bfef..67e47480bd571 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.ThreadCache; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -114,6 +115,7 @@ public StateDirectoryProcessFile() { private final StreamsConfig config; private final ConcurrentMap tasksForLocalState = new ConcurrentHashMap<>(); + private final Map taskOffsetSums = new ConcurrentHashMap<>(); /** * Ensures that the state base directory as well as the application's sub-directory are created. @@ -301,6 +303,46 @@ private void closeStartupTasks(final Predicate predicate) { } } + public Map taskOffsetSums(final Set tasks) { + return taskOffsetSums.entrySet().stream() + .filter(e -> tasks.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public void updateTaskOffsets(final TaskId taskId, final Map changelogOffsets) { + if (changelogOffsets.isEmpty()) { + return; + } + + taskOffsetSums.put(taskId, sumOfChangelogOffsets(taskId, changelogOffsets)); + } + + public void removeTaskOffsets(final TaskId taskId) { + taskOffsetSums.remove(taskId); + } + + private long sumOfChangelogOffsets(final TaskId taskId, final Map changelogOffsets) { + long offsetSum = 0L; + for (final Map.Entry changelogEntry : changelogOffsets.entrySet()) { + final long offset = changelogEntry.getValue(); + + if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) { + if (offset < 0) { + throw new StreamsException( + new IllegalStateException("Expected not to get a sentinel offset, but got: " + changelogEntry), + taskId); + } + offsetSum += offset; + if (offsetSum < 0) { + log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", taskId); + return Long.MAX_VALUE; + } + } + } + + return offsetSum; + } + public UUID initializeProcessId() { if (!hasPersistentStores) { final UUID processId = UUID.randomUUID(); @@ -509,6 +551,7 @@ Thread lockOwner(final TaskId taskId) { public void close() { if (hasPersistentStores) { closeStartupTasks(); + taskOffsetSums.clear(); try { stateDirLock.release(); stateDirLockChannel.close(); @@ -589,6 +632,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { final long now = time.milliseconds(); final long lastModifiedMs = taskDir.file().lastModified(); if (now - cleanupDelayMs > lastModifiedMs) { + removeTaskOffsets(id); log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); Utils.delete(taskDir.file()); @@ -626,7 +670,10 @@ private IOException maybeCleanEmptyNamedTopologyDirs(final boolean logExceptionA ); if (namedTopologyDirs != null) { for (final File namedTopologyDir : namedTopologyDirs) { - closeStartupTasks(task -> task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName()))); + final String topologyName = parseNamedTopologyFromDirectory(namedTopologyDir.getName()); + closeStartupTasks(task -> task.id().topologyName().equals(topologyName)); + final Set taskKeys = taskOffsetSums.keySet(); + taskKeys.removeIf(taskId -> taskId.topologyName().equals(topologyName)); final File[] contents = namedTopologyDir.listFiles(); if (contents != null && contents.length == 0) { try { @@ -665,6 +712,8 @@ public void clearLocalStateForNamedTopology(final String topologyName) { } try { closeStartupTasks(task -> task.id().topologyName().equals(topologyName)); + final Set taskKeys = taskOffsetSums.keySet(); + taskKeys.removeIf(taskId -> taskId.topologyName().equals(topologyName)); Utils.delete(namedTopologyDir); } catch (final IOException e) { log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e); @@ -678,6 +727,7 @@ private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception { log.warn("Found some still-locked task directories when user requested to cleaning up the state, " + "since Streams is not running any more these will be ignored to complete the cleanup"); } + taskOffsetSums.clear(); final AtomicReference firstException = new AtomicReference<>(); for (final TaskDirectory taskDir : listAllTaskDirectories()) { final String dirName = taskDir.file().getName(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index 4fff7b600fc0a..e2274f14b7792 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -137,6 +137,7 @@ static void closeStateManager(final Logger log, try { if (wipeStateStore) { log.debug("Wiping state stores for {} task {}", taskType, id); + stateDirectory.removeTaskOffsets(id); // we can just delete the whole dir of the task, including the state store images and the checkpoint files, // and then we write an empty checkpoint file indicating that the previous close is graceful and we just // need to re-bootstrap the restoration from the beginning diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 8a6e27b4c9944..f6c97e6d1e0d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -39,15 +39,12 @@ import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ProcessId; -import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory; import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager; -import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.slf4j.Logger; import java.io.File; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -1337,7 +1334,6 @@ public void signalResume() { * Does not include stateless or non-logged tasks. */ public Map taskOffsetSums() { - final Map taskOffsetSums = new HashMap<>(); // Not all tasks will create directories, and there may be directories for tasks we don't currently own, // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should @@ -1345,27 +1341,13 @@ public Map taskOffsetSums() { final Map tasks = allTasks(); final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = union(HashSet::new, lockedTaskDirectories, tasks.keySet()); - for (final Task task : tasks.values()) { - if (task.state() != State.CREATED && task.state() != State.CLOSED) { - final Map changelogOffsets = task.changelogOffsets(); - if (changelogOffsets.isEmpty()) { - log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", - task.id()); - } else { - taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets)); - } - lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks.remove(task.id()); - } - } - for (final TaskId id : lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) { - final File checkpointFile = stateDirectory.checkpointFileFor(id); - try { - if (checkpointFile.exists()) { - taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); - } - } catch (final IOException e) { - log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); + final Map taskOffsetSums = stateDirectory.taskOffsetSums(lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks); + + // overlay latest offsets from assigned tasks + for (final Task task : tasks.values()) { + if (task.isActive() && task.state() == State.RUNNING && taskOffsetSums.put(task.id(), Task.LATEST_OFFSET) == null) { + log.error("Could not find cached offset for assigned ACTIVE Task {}", task.id()); } } @@ -1384,7 +1366,7 @@ private void tryToLockAllNonEmptyTaskDirectories() { lockedTaskDirectories.clear(); final Map allTasks = allTasks(); - for (final TaskDirectory taskDir : stateDirectory.listNonEmptyTaskDirectories()) { + for (final StateDirectory.TaskDirectory taskDir : stateDirectory.listNonEmptyTaskDirectories()) { final File dir = taskDir.file(); final String namedTopology = taskDir.namedTopology(); try { @@ -1438,34 +1420,6 @@ private void releaseLockedUnassignedTaskDirectories() { } } - private long sumOfChangelogOffsets(final TaskId id, final Map changelogOffsets) { - long offsetSum = 0L; - for (final Map.Entry changelogEntry : changelogOffsets.entrySet()) { - final long offset = changelogEntry.getValue(); - - - if (offset == Task.LATEST_OFFSET) { - // this condition can only be true for active tasks; never for standby - // for this case, the offset of all partitions is set to `LATEST_OFFSET` - // and we "forward" the sentinel value directly - return Task.LATEST_OFFSET; - } else if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) { - if (offset < 0) { - throw new StreamsException( - new IllegalStateException("Expected not to get a sentinel offset, but got: " + changelogEntry), - id); - } - offsetSum += offset; - if (offsetSum < 0) { - log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); - return Long.MAX_VALUE; - } - } - } - - return offsetSum; - } - private void closeTaskDirty(final Task task, final boolean removeFromTasksRegistry) { try { // we call this function only to flush the case if necessary diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java index 4bb8ca5b2dac8..a96e92fded384 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java @@ -171,6 +171,7 @@ public void testCloseStateManagerWithStateStoreWipeOut() { "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).removeTaskOffsets(taskId); inOrder.verify(stateDirectory).unlock(taskId); verifyNoMoreInteractions(stateManager, stateDirectory); } @@ -211,6 +212,7 @@ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException( } inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).removeTaskOffsets(taskId); inOrder.verify(stateDirectory).unlock(taskId); verifyNoMoreInteractions(stateManager, stateDirectory); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index b55c9a12fcfd3..f2203e198a334 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1890,6 +1890,7 @@ public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() throw when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask)); taskManager.handleRebalanceStart(singleton("topic")); + when(stateDirectory.taskOffsetSums(Collections.singleton(taskId00))).thenReturn(mkMap(mkEntry(taskId00, changelogOffset))); assertThat(taskManager.taskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset)))); } @@ -1907,6 +1908,7 @@ public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask)); taskManager.handleRebalanceStart(singleton("topic")); + when(stateDirectory.taskOffsetSums(Collections.singleton(taskId00))).thenReturn(mkMap(mkEntry(taskId00, changelogOffset))); assertThat(taskManager.taskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset)))); } @@ -1919,7 +1921,7 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat .inState(State.RESTORING).build(); final StandbyTask restoringStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) .inState(State.RUNNING).build(); - final long changelogOffsetOfRunningTask = 42L; + final long changelogOffsetOfRunningTask = -2L; final long changelogOffsetOfRestoringStatefulTask = 24L; final long changelogOffsetOfRestoringStandbyTask = 84L; when(runningStatefulTask.changelogOffsets()) @@ -1932,6 +1934,12 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, restoringStatefulTask)); + when(stateDirectory.taskOffsetSums(Set.of(taskId00, taskId01, taskId02))) + .thenReturn(mkMap( + mkEntry(taskId00, changelogOffsetOfRunningTask), + mkEntry(taskId01, changelogOffsetOfRestoringStatefulTask), + mkEntry(taskId02, changelogOffsetOfRestoringStandbyTask) + )); assertThat( taskManager.taskOffsetSums(), @@ -1968,6 +1976,7 @@ private void computeOffsetSumAndVerify(final Map changelog ).get(taskId00); restoringTask.setChangelogOffsets(changelogOffsets); + when(stateDirectory.taskOffsetSums(any())).thenReturn(expectedOffsetSums); assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); } @@ -1991,6 +2000,7 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception { ).get(taskId00); restoringTask.setChangelogOffsets(changelogOffsets); + when(stateDirectory.taskOffsetSums(any())).thenReturn(expectedOffsetSums); assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); } @@ -2004,7 +2014,7 @@ public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception expectLockObtainedFor(taskId00); makeTaskFolders(taskId00.toString()); - writeCheckpointFile(taskId00, changelogOffsets); + when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums); taskManager.handleRebalanceStart(singleton("topic")); @@ -2021,10 +2031,10 @@ public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throw expectLockObtainedFor(taskId00); makeTaskFolders(taskId00.toString()); - writeCheckpointFile(taskId00, changelogOffsets); + when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums); taskManager.handleRebalanceStart(singleton("topic")); - final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask)); taskManager.handleAssignment(taskId00Assignment, emptyMap()); @@ -2046,7 +2056,7 @@ public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Excep makeTaskFolders(taskId00.toString()); writeCheckpointFile(taskId00, changelogOffsets); - final StateMachineTask closedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask closedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); taskManager.handleRebalanceStart(singleton("topic")); @@ -2058,6 +2068,7 @@ public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Excep closedTask.closeClean(); assertThat(closedTask.state(), is(State.CLOSED)); + when(stateDirectory.taskOffsetSums(Collections.singleton(taskId00))).thenReturn(expectedOffsetSums); assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); } @@ -2076,7 +2087,6 @@ public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheck expectLockObtainedFor(taskId00); makeTaskFolders(taskId00.toString()); expectDirectoryNotEmpty(taskId00); - when(stateDirectory.checkpointFileFor(taskId00)).thenReturn(getCheckpointFile(taskId00)); taskManager.handleRebalanceStart(singleton("topic")); assertTrue(taskManager.taskOffsetSums().isEmpty()); @@ -2094,7 +2104,7 @@ public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception expectLockObtainedFor(taskId00); makeTaskFolders(taskId00.toString()); - writeCheckpointFile(taskId00, changelogOffsets); + when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums); taskManager.handleRebalanceStart(singleton("topic")); assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); @@ -2102,7 +2112,7 @@ public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception @Test public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); @@ -2126,7 +2136,7 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { @Test public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public void closeClean() { throw new RuntimeException("KABOOM!"); @@ -2153,8 +2163,8 @@ public void closeClean() { @Test public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager); // `handleAssignment` when(consumer.assignment()).thenReturn(assignment); @@ -2238,7 +2248,7 @@ public void shouldReviveCorruptTasks() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public void postCommit(final boolean enforceCheckpoint) { if (enforceCheckpoint) { @@ -2275,7 +2285,7 @@ public void postCommit(final boolean enforceCheckpoint) { public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -2307,8 +2317,8 @@ public void suspend() { public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); - final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map> firstAssignment = new HashMap<>(taskId00Assignment); firstAssignment.putAll(taskId01Assignment); @@ -2343,8 +2353,8 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { public void shouldNotCommitNonRunningNonCorruptedTasks() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); - final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); nonRunningNonCorruptedTask.setCommitNeeded(); @@ -2431,8 +2441,8 @@ public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStand public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); - final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); - final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateDirectory, stateManager); + final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public Map prepareCommit() { throw new TaskMigratedException("You dropped out of the group!", new RuntimeException()); @@ -2469,10 +2479,10 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>()); - final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); // make sure this will attempt to be committed and throw - final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); uncorruptedActive.setCommitNeeded(); @@ -2513,8 +2523,8 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithAlos() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); - final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public void markChangelogAsCorrupted(final Collection partitions) { fail("Should not try to mark changelogs as corrupted for uncorrupted task"); @@ -2575,7 +2585,7 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false); - final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public void markChangelogAsCorrupted(final Collection partitions) { super.markChangelogAsCorrupted(partitions); @@ -2584,7 +2594,7 @@ public void markChangelogAsCorrupted(final Collection partitions }; final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false); - final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public void markChangelogAsCorrupted(final Collection partitions) { super.markChangelogAsCorrupted(partitions); @@ -2650,12 +2660,12 @@ public void markChangelogAsCorrupted(final Collection partitions @Test public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos() { - final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00); revokedActiveTask.setCommitNeeded(); - final StateMachineTask unrevokedActiveTaskWithCommitNeeded = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final StateMachineTask unrevokedActiveTaskWithCommitNeeded = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public void markChangelogAsCorrupted(final Collection partitions) { fail("Should not try to mark changelogs as corrupted for uncorrupted task"); @@ -2665,7 +2675,7 @@ public void markChangelogAsCorrupted(final Collection partitions unrevokedActiveTaskWithCommitNeeded.setCommittableOffsetsAndMetadata(offsets01); unrevokedActiveTaskWithCommitNeeded.setCommitNeeded(); - final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); final Map expectedCommittedOffsets = new HashMap<>(); expectedCommittedOffsets.putAll(offsets00); @@ -2706,13 +2716,13 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo when(activeTaskCreator.streamsProducer()).thenReturn(producer); final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); - final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); revokedActiveTask.setCommittableOffsetsAndMetadata(revokedActiveTaskOffsets); revokedActiveTask.setCommitNeeded(); final AtomicBoolean unrevokedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false); - final StateMachineTask unrevokedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final StateMachineTask unrevokedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public void markChangelogAsCorrupted(final Collection partitions) { super.markChangelogAsCorrupted(partitions); @@ -2723,7 +2733,7 @@ public void markChangelogAsCorrupted(final Collection partitions unrevokedActiveTask.setCommittableOffsetsAndMetadata(unrevokedTaskOffsets); unrevokedActiveTask.setCommitNeeded(); - final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); final Map expectedCommittedOffsets = new HashMap<>(); expectedCommittedOffsets.putAll(revokedActiveTaskOffsets); @@ -2770,7 +2780,7 @@ public void markChangelogAsCorrupted(final Collection partitions @Test public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() { - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateDirectory, stateManager); when(consumer.assignment()).thenReturn(assignment); when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(task00)); @@ -2787,8 +2797,8 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() { @Test public void shouldAddNonResumedSuspendedTasks() { - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager); when(consumer.assignment()).thenReturn(assignment); when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); @@ -2812,7 +2822,7 @@ public void shouldAddNonResumedSuspendedTasks() { @Test public void shouldUpdateInputPartitionsAfterRebalance() { - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); when(consumer.assignment()).thenReturn(assignment); when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); @@ -2836,7 +2846,7 @@ public void shouldUpdateInputPartitionsAfterRebalance() { @Test public void shouldAddNewActiveTasks() { final Map> assignment = taskId00Assignment; - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00)); @@ -2860,13 +2870,13 @@ public void shouldNotCompleteRestorationIfTasksCannotInitialize() { mkEntry(taskId00, taskId00Partitions), mkEntry(taskId01, taskId01Partitions) ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public void initializeIfNeeded() { throw new LockException("can't lock"); } }; - final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public void initializeIfNeeded() { throw new TimeoutException("timed out"); @@ -2898,7 +2908,7 @@ public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() { final Map> assignment = mkMap( mkEntry(taskId00, taskId00Partitions) ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public void completeRestoration(final java.util.function.Consumer> offsetResetter) { throw new TimeoutException("timeout!"); @@ -2925,7 +2935,7 @@ public void completeRestoration(final java.util.function.Consumer offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); @@ -2945,21 +2955,21 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo final StreamsProducer producer = mock(StreamsProducer.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets00); task00.setCommitNeeded(); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map offsets01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null)); task01.setCommittableOffsetsAndMetadata(offsets01); task01.setCommitNeeded(); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); final Map offsets02 = singletonMap(t1p2, new OffsetAndMetadata(2L, null)); task02.setCommittableOffsetsAndMetadata(offsets02); - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateDirectory, stateManager); final Map expectedCommittedOffsets = new HashMap<>(); expectedCommittedOffsets.putAll(offsets00); @@ -3010,21 +3020,21 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo @Test public void shouldCommitAllNeededTasksOnHandleRevocation() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets00); task00.setCommitNeeded(); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map offsets01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null)); task01.setCommittableOffsetsAndMetadata(offsets01); task01.setCommitNeeded(); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); final Map offsets02 = singletonMap(t1p2, new OffsetAndMetadata(2L, null)); task02.setCommittableOffsetsAndMetadata(offsets02); - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateDirectory, stateManager); final Map expectedCommittedOffsets = new HashMap<>(); expectedCommittedOffsets.putAll(offsets00); @@ -3067,12 +3077,12 @@ public void shouldCommitAllNeededTasksOnHandleRevocation() { @Test public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets00); task00.setCommitNeeded(); - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateDirectory, stateManager); final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); final Map> assignmentStandby = singletonMap(taskId10, taskId10Partitions); @@ -3095,12 +3105,12 @@ public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { @Test public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets00); task00.setCommitNeeded(); - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateDirectory, stateManager); final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); final Map> assignmentStandby = singletonMap(taskId10, taskId10Partitions); @@ -3122,7 +3132,7 @@ public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() { @Test public void shouldNotCommitCreatedTasksOnRevocationOrClosure() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); @@ -3138,7 +3148,7 @@ public void shouldNotCommitCreatedTasksOnRevocationOrClosure() { @Test public void shouldPassUpIfExceptionDuringSuspend() { - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -3177,7 +3187,7 @@ private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final P mkEntry(taskId02, taskId02Partitions), mkEntry(taskId03, taskId03Partitions) ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public Set changelogPartitions() { return singleton(changelog); @@ -3186,7 +3196,7 @@ public Set changelogPartitions() { final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false); final AtomicBoolean closedDirtyTask02 = new AtomicBoolean(false); final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false); - final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -3199,7 +3209,7 @@ public void closeDirty() { closedDirtyTask01.set(true); } }; - final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager) { + final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -3212,7 +3222,7 @@ public void closeDirty() { closedDirtyTask02.set(true); } }; - final Task task03 = new StateMachineTask(taskId03, taskId03Partitions, true, stateManager) { + final Task task03 = new StateMachineTask(taskId03, taskId03Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -3282,7 +3292,7 @@ public void shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanSh final Map> assignment = mkMap( mkEntry(taskId00, taskId00Partitions) ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public Set changelogPartitions() { return singleton(changelog); @@ -3325,9 +3335,9 @@ public Set changelogPartitions() { public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() { setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateDirectory, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager) { @Override public Map prepareCommit() { throw new RuntimeException("task 0_1 prepare commit boom!"); @@ -3354,9 +3364,9 @@ public Map prepareCommit() { @Test public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() { - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -3364,7 +3374,7 @@ public void suspend() { } }; - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); taskManager.addTask(task00); taskManager.addTask(task01); @@ -3390,20 +3400,20 @@ public void shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() { mkEntry(taskId01, taskId01Partitions), mkEntry(taskId02, taskId02Partitions) ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public Set changelogPartitions() { return singleton(changelog); } }; - final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); throw new TaskMigratedException("migrated", new RuntimeException("cause")); } }; - final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager) { + final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -3453,7 +3463,7 @@ public void suspend() { @Test public void shouldCloseStandbyTasksOnShutdown() { final Map> assignment = singletonMap(taskId00, taskId00Partitions); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateDirectory, stateManager); // `handleAssignment` when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00)); @@ -3585,7 +3595,7 @@ public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() { @Test public void shouldInitializeNewActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); when(consumer.assignment()).thenReturn(assignment); when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))) @@ -3603,7 +3613,7 @@ public void shouldInitializeNewActiveTasks() { @Test public void shouldInitialiseNewStandbyTasks() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager); when(consumer.assignment()).thenReturn(assignment); when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01)); @@ -3630,10 +3640,10 @@ public void shouldHandleRebalanceEvents() { @Test public void shouldCommitActiveAndStandbyTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager); when(consumer.assignment()).thenReturn(assignment); when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))) @@ -3659,12 +3669,12 @@ public void shouldCommitActiveAndStandbyTasks() { @Test public void shouldCommitProvidedTasksIfNeeded() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); - final StateMachineTask task03 = new StateMachineTask(taskId03, taskId03Partitions, false, stateManager); - final StateMachineTask task04 = new StateMachineTask(taskId04, taskId04Partitions, false, stateManager); - final StateMachineTask task05 = new StateMachineTask(taskId05, taskId05Partitions, false, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); + final StateMachineTask task03 = new StateMachineTask(taskId03, taskId03Partitions, false, stateDirectory, stateManager); + final StateMachineTask task04 = new StateMachineTask(taskId04, taskId04Partitions, false, stateDirectory, stateManager); + final StateMachineTask task05 = new StateMachineTask(taskId05, taskId05Partitions, false, stateDirectory, stateManager); final Map> assignmentActive = mkMap( mkEntry(taskId00, taskId00Partitions), @@ -3705,7 +3715,7 @@ public void shouldCommitProvidedTasksIfNeeded() { @Test public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateDirectory, stateManager); when(consumer.assignment()).thenReturn(assignment); when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(task00)); @@ -3723,8 +3733,8 @@ public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() { @Test public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throws Exception { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager); makeTaskFolders(taskId00.toString(), taskId01.toString()); expectDirectoryNotEmpty(taskId00, taskId01); @@ -3759,7 +3769,7 @@ public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throw @Test public void shouldCommitViaConsumerIfEosDisabled() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null)); task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); @@ -3783,11 +3793,11 @@ public void shouldCommitViaProducerIfEosV2Enabled() { final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); task01.setCommittableOffsetsAndMetadata(offsetsT01); task01.setCommitNeeded(); taskManager.addTask(task01); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); task02.setCommittableOffsetsAndMetadata(offsetsT02); task02.setCommitNeeded(); taskManager.addTask(task02); @@ -3802,7 +3812,7 @@ public void shouldCommitViaProducerIfEosV2Enabled() { @Test public void shouldPropagateExceptionFromActiveCommit() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public Map prepareCommit() { throw new RuntimeException("opsh."); @@ -3826,7 +3836,7 @@ public Map prepareCommit() { @Test public void shouldPropagateExceptionFromStandbyCommit() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager) { @Override public Map prepareCommit() { throw new RuntimeException("opsh."); @@ -3858,7 +3868,7 @@ public void shouldSendPurgeData() { final InOrder inOrder = inOrder(adminClient); final Map purgableOffsets = new HashMap<>(); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public Map purgeableOffsets() { return purgableOffsets; @@ -3891,7 +3901,7 @@ public void shouldNotSendPurgeDataIfPreviousNotDone() { .thenReturn(new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords))); final Map purgableOffsets = new HashMap<>(); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public Map purgeableOffsets() { return purgableOffsets; @@ -3918,7 +3928,7 @@ public Map purgeableOffsets() { @Test public void shouldIgnorePurgeDataErrors() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); when(consumer.assignment()).thenReturn(assignment); @@ -3940,17 +3950,17 @@ public void shouldIgnorePurgeDataErrors() { @Test public void shouldMaybeCommitAllActiveTasksThatNeedCommit() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets0 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets0); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map offsets1 = singletonMap(t1p1, new OffsetAndMetadata(1L, null)); task01.setCommittableOffsetsAndMetadata(offsets1); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); final Map offsets2 = singletonMap(t1p2, new OffsetAndMetadata(2L, null)); task02.setCommittableOffsetsAndMetadata(offsets2); - final StateMachineTask task03 = new StateMachineTask(taskId03, taskId03Partitions, true, stateManager); - final StateMachineTask task04 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StateMachineTask task03 = new StateMachineTask(taskId03, taskId03Partitions, true, stateDirectory, stateManager); + final StateMachineTask task04 = new StateMachineTask(taskId10, taskId10Partitions, false, stateDirectory, stateManager); final Map expectedCommittedOffsets = new HashMap<>(); expectedCommittedOffsets.putAll(offsets0); @@ -4002,8 +4012,8 @@ public void shouldMaybeCommitAllActiveTasksThatNeedCommit() { @Test public void shouldProcessActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map> firstAssignment = new HashMap<>(); firstAssignment.put(taskId00, taskId00Partitions); @@ -4054,10 +4064,10 @@ public void shouldNotFailOnTimeoutException() { final AtomicReference timeoutException = new AtomicReference<>(); timeoutException.set(new TimeoutException("Skip me!")); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); task00.transitionTo(State.RESTORING); task00.transitionTo(State.RUNNING); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager) { @Override public boolean process(final long wallClockTime) { final TimeoutException exception = timeoutException.get(); @@ -4069,7 +4079,7 @@ public boolean process(final long wallClockTime) { }; task01.transitionTo(State.RESTORING); task01.transitionTo(State.RUNNING); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); task02.transitionTo(State.RESTORING); task02.transitionTo(State.RUNNING); @@ -4114,7 +4124,7 @@ public boolean process(final long wallClockTime) { @Test public void shouldPropagateTaskMigratedExceptionsInProcessActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public boolean process(final long wallClockTime) { throw new TaskMigratedException("migrated", new RuntimeException("cause")); @@ -4137,7 +4147,7 @@ public boolean process(final long wallClockTime) { @Test public void shouldWrapRuntimeExceptionsInProcessActiveTasksAndSetTaskId() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public boolean process(final long wallClockTime) { throw new RuntimeException("oops"); @@ -4164,7 +4174,7 @@ public boolean process(final long wallClockTime) { @Test public void shouldPropagateTaskMigratedExceptionsInPunctuateActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public boolean maybePunctuateStreamTime() { throw new TaskMigratedException("migrated", new RuntimeException("cause")); @@ -4184,7 +4194,7 @@ public boolean maybePunctuateStreamTime() { @Test public void shouldPropagateKafkaExceptionsInPunctuateActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public boolean maybePunctuateStreamTime() { throw new KafkaException("oops"); @@ -4204,7 +4214,7 @@ public boolean maybePunctuateStreamTime() { @Test public void shouldPunctuateActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public boolean maybePunctuateStreamTime() { return true; @@ -4230,7 +4240,7 @@ public boolean maybePunctuateSystemTime() { @Test public void shouldReturnFalseWhenThereAreStillNonRunningTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public Set changelogPartitions() { return singleton(new TopicPartition("fake", 0)); @@ -4247,7 +4257,7 @@ public Set changelogPartitions() { @Test public void shouldHaveRemainingPartitionsUncleared() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); @@ -4276,7 +4286,7 @@ public void shouldHaveRemainingPartitionsUncleared() { @Test public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() { - final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -4284,7 +4294,7 @@ public void suspend() { } }; - final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateManager) { + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -4308,7 +4318,7 @@ public void suspend() { @Test public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() { - final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -4316,7 +4326,7 @@ public void suspend() { } }; - final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateManager) { + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -4338,7 +4348,7 @@ public void suspend() { @Test public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() { - final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -4346,7 +4356,7 @@ public void suspend() { } }; - final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateManager) { + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); @@ -4388,13 +4398,13 @@ private Map handleAssignment(final Map> standbyAssignment, final Map> restoringActiveAssignment) { final Set runningTasks = runningActiveAssignment.entrySet().stream() - .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateManager)) + .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateDirectory, stateManager)) .collect(Collectors.toSet()); final Set standbyTasks = standbyAssignment.entrySet().stream() - .map(t -> new StateMachineTask(t.getKey(), t.getValue(), false, stateManager)) + .map(t -> new StateMachineTask(t.getKey(), t.getValue(), false, stateDirectory, stateManager)) .collect(Collectors.toSet()); final Set restoringTasks = restoringActiveAssignment.entrySet().stream() - .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateManager)) + .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateDirectory, stateManager)) .collect(Collectors.toSet()); // give the restoring tasks some uncompleted changelog partitions so they'll stay in restoring restoringTasks.forEach(t -> ((StateMachineTask) t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L))); @@ -4451,7 +4461,7 @@ private void expectDirectoryNotEmpty(final TaskId... tasks) { @Test public void shouldThrowTaskMigratedExceptionOnCommitFailed() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); @@ -4476,8 +4486,8 @@ public void shouldThrowTaskMigratedExceptionOnCommitFailed() { @SuppressWarnings("unchecked") @Test public void shouldNotFailForTimeoutExceptionOnConsumerCommit() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); task00.setCommittableOffsetsAndMetadata(taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0)))); task01.setCommittableOffsetsAndMetadata(taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0)))); @@ -4511,11 +4521,11 @@ public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV doThrow(new TimeoutException("KABOOM!")).doNothing().when(producer).commitTransaction(allOffsets, null); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager); task00.setCommittableOffsetsAndMetadata(offsetsT00); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); task01.setCommittableOffsetsAndMetadata(offsetsT01); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateDirectory, stateManager); task00.setCommitNeeded(); task01.setCommitNeeded(); @@ -4534,7 +4544,7 @@ public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV @Test public void shouldStreamsExceptionOnCommitError() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); @@ -4554,7 +4564,7 @@ public void shouldStreamsExceptionOnCommitError() { @Test public void shouldFailOnCommitFatal() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); @@ -4573,14 +4583,14 @@ public void shouldFailOnCommitFatal() { @Test public void shouldSuspendAllTasksButSkipCommitIfSuspendingFailsDuringRevocation() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateDirectory, stateManager) { @Override public void suspend() { super.suspend(); throw new RuntimeException("KABOOM!"); } }; - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateDirectory, stateManager); final Map> assignment = new HashMap<>(taskId00Assignment); assignment.putAll(taskId01Assignment); @@ -4827,8 +4837,9 @@ private static class StateMachineTask extends AbstractTask implements Task { StateMachineTask(final TaskId id, final Set partitions, final boolean active, + final StateDirectory stateDirectory, final ProcessorStateManager processorStateManager) { - super(id, null, null, processorStateManager, partitions, (new TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task", StateMachineTask.class); + super(id, null, stateDirectory, processorStateManager, partitions, (new TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task", StateMachineTask.class); this.active = active; } @@ -4994,6 +5005,7 @@ public Map purgeableOffsets() { void setChangelogOffsets(final Map changelogOffsets) { this.changelogOffsets = changelogOffsets; + stateDirectory.updateTaskOffsets(id, changelogOffsets); } @Override