diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java index a895b71e4e952..dd5a2c6e1d79b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java @@ -180,7 +180,7 @@ public boolean maybePunctuateSystemTime() { } @Override - public Map prepareCommit() { + public Map prepareCommit(final boolean clean) { throw new UnsupportedOperationException("This task is read-only"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 417f754ca2c36..4c6e6674bdbcf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -179,7 +179,7 @@ public void resume() { * or flushing state store get IO errors; such error should cause the thread to die */ @Override - public Map prepareCommit() { + public Map prepareCommit(final boolean clean) { switch (state()) { case CREATED: log.debug("Skipped preparing created task for commit"); @@ -189,7 +189,11 @@ public Map prepareCommit() { case RUNNING: case SUSPENDED: // do not need to flush state store caches in pre-commit since nothing would be sent for standby tasks - log.debug("Prepared {} task for committing", state()); + if (!clean) { + log.debug("Skipped preparing {} standby task with id {} for commit since the task is getting closed dirty.", state(), id); + } else { + log.debug("Prepared {} task for committing", state()); + } break; @@ -197,7 +201,7 @@ public Map prepareCommit() { throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing "); } - return Collections.emptyMap(); + return clean ? Collections.emptyMap() : null; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 424d6f7af6148..93737d8228933 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -417,7 +417,6 @@ public void resume() { timeCurrentIdlingStarted = Optional.empty(); } - public void flush() { stateMgr.flushCache(); recordCollector.flush(); @@ -429,7 +428,7 @@ public void flush() { * @return offsets that should be committed for this task */ @Override - public Map prepareCommit() { + public Map prepareCommit(final boolean clean) { switch (state()) { case CREATED: case RESTORING: @@ -444,6 +443,10 @@ public Map prepareCommit() { // // TODO: this should be removed after we decouple caching with emitting flush(); + if (!clean) { + log.debug("Skipped preparing {} task with id {} for commit since the task is getting closed dirty.", state(), id); + return null; + } hasPendingTxCommit = eosEnabled; log.debug("Prepared {} task for committing", state()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 484c1ca574b6c..ba09700af8afb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -201,7 +201,7 @@ default boolean maybePunctuateSystemTime() { /** * @throws StreamsException fatal error, should close the thread */ - Map prepareCommit(); + Map prepareCommit(final boolean clean); void postCommit(boolean enforceCheckpoint); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java index c993787503e19..91deab0dd9dab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java @@ -142,7 +142,7 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection tasksToCo for (final Task task : tasksToCommit) { // we need to call commitNeeded first since we need to update committable offsets if (task.commitNeeded()) { - final Map offsetAndMetadata = task.prepareCommit(); + final Map offsetAndMetadata = task.prepareCommit(true); if (!offsetAndMetadata.isEmpty()) { consumedOffsetsAndMetadata.put(task, offsetAndMetadata); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index eccf0c8f33d86..9376e6887f359 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -278,7 +278,7 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina // we do not need to take the returned offsets since we are not going to commit anyways; // this call is only used for active tasks to flush the cache before suspending and // closing the topology - task.prepareCommit(); + task.prepareCommit(false); } catch (final RuntimeException swallow) { log.warn("Error flushing cache for corrupted task {}. " + "Since the task is closing dirty, the following exception is swallowed: {}", @@ -812,7 +812,7 @@ private Map closeAndRecycleTasks(final Map offsets = task.prepareCommit(); + final Map offsets = task.prepareCommit(true); if (!offsets.isEmpty()) { log.error("Task {} should have been committed when it was suspended, but it reports non-empty " + "offsets {} to commit; this means it failed during last commit and hence should be closed dirty", @@ -1264,7 +1264,7 @@ private void prepareCommitAndAddOffsetsToMap(final Set tasksToPrepare, final Map> consumedOffsetsPerTask) { for (final Task task : tasksToPrepare) { try { - final Map committableOffsets = task.prepareCommit(); + final Map committableOffsets = task.prepareCommit(true); if (!committableOffsets.isEmpty()) { consumedOffsetsPerTask.put(task, committableOffsets); } @@ -1479,7 +1479,7 @@ private void closeTaskDirty(final Task task, final boolean removeFromTasksRegist try { // we call this function only to flush the case if necessary // before suspending and closing the topology - task.prepareCommit(); + task.prepareCommit(false); } catch (final RuntimeException swallow) { log.warn("Error flushing cache of dirty task {}. " + "Since the task is closing dirty, the following exception is swallowed: {}", @@ -1630,7 +1630,7 @@ private Collection tryCloseCleanActiveTasks(final Collection activeT // first committing all tasks and then suspend and close them clean for (final Task task : activeTasksToClose) { try { - final Map committableOffsets = task.prepareCommit(); + final Map committableOffsets = task.prepareCommit(true); tasksToCommit.add(task); if (!committableOffsets.isEmpty()) { consumedOffsetsAndMetadataPerTask.put(task, committableOffsets); @@ -1719,7 +1719,7 @@ private Collection tryCloseCleanStandbyTasks(final Collection standb // first committing and then suspend / close clean for (final Task task : standbyTasksToClose) { try { - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); task.suspend(); closeTaskClean(task); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index e953a61fc1f3e..768f3787d0b6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -213,7 +213,7 @@ public void shouldThrowIfCommittingOnIllegalState() { task.suspend(); task.closeClean(); - assertThrows(IllegalStateException.class, task::prepareCommit); + assertThrows(IllegalStateException.class, () -> task.prepareCommit(true)); } @Test @@ -261,13 +261,13 @@ public void shouldFlushAndCheckpointStateManagerOnCommit() { task = createStandbyTask(); task.initializeIfNeeded(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // this should not checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // this should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // this should not checkpoint verify(stateManager).checkpoint(); @@ -322,7 +322,7 @@ public void shouldSuspendAndCommitBeforeCloseClean() { task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); task.closeClean(); @@ -360,7 +360,7 @@ public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() { // could commit if the offset advanced beyond threshold assertTrue(task.commitNeeded()); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); } @@ -389,7 +389,7 @@ public void shouldThrowOnCloseCleanCheckpointError() { task = createStandbyTask(); task.initializeIfNeeded(); - task.prepareCommit(); + task.prepareCommit(true); assertThrows(RuntimeException.class, () -> task.postCommit(true)); assertEquals(RUNNING, task.state()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index bcf24ee7df888..98807cd63423e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -645,6 +645,22 @@ public void shouldProcessInOrder() { assertEquals(asList(201, 202, 203), source2.values); } + @Test + public void shouldNotGetOffsetsIfPrepareCommitDirty() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatefulTask(createConfig("100"), false); + + task.addRecords(partition1, List.of(getConsumerRecordWithOffsetAsTimestamp(partition1, 0))); + task.addRecords(partition2, List.of(getConsumerRecordWithOffsetAsTimestamp(partition2, 0))); + + assertTrue(task.process(0L)); + assertTrue(task.commitNeeded()); + + // committableOffsetsAndMetadata() has not been called, otherwise prepareCommit() would have returned a map + assertNull(task.prepareCommit(false)); + } + @Test public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() { when(stateManager.taskId()).thenReturn(taskId); @@ -660,7 +676,7 @@ public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() { )); assertTrue(task.process(time.milliseconds())); - task.prepareCommit(); + task.prepareCommit(true); assertTrue(task.process(time.milliseconds())); task.postCommit(false); assertTrue(task.process(time.milliseconds())); @@ -683,7 +699,7 @@ public void shouldNotProcessRecordsAfterPrepareCommitWhenEosV2Enabled() { )); assertTrue(task.process(time.milliseconds())); - task.prepareCommit(); + task.prepareCommit(true); assertFalse(task.process(time.milliseconds())); task.postCommit(false); assertTrue(task.process(time.milliseconds())); @@ -1328,7 +1344,7 @@ public void shouldRespectCommitNeeded() { assertTrue(task.process(0L)); assertTrue(task.commitNeeded()); - task.prepareCommit(); + task.prepareCommit(true); assertTrue(task.commitNeeded()); task.postCommit(true); @@ -1338,7 +1354,7 @@ public void shouldRespectCommitNeeded() { assertTrue(task.maybePunctuateStreamTime()); assertTrue(task.commitNeeded()); - task.prepareCommit(); + task.prepareCommit(true); assertTrue(task.commitNeeded()); task.postCommit(true); @@ -1349,7 +1365,7 @@ public void shouldRespectCommitNeeded() { assertTrue(task.maybePunctuateSystemTime()); assertTrue(task.commitNeeded()); - task.prepareCommit(); + task.prepareCommit(true); assertTrue(task.commitNeeded()); task.postCommit(true); @@ -1374,7 +1390,7 @@ public void shouldCommitNextOffsetAndProcessorMetadataFromQueueIfAvailable() { task.process(0L); processorSystemTime.mockProcessor.addProcessorMetadata("key2", 200L); - final Map offsetsAndMetadata = task.prepareCommit(); + final Map offsetsAndMetadata = task.prepareCommit(true); final TopicPartitionMetadata expected = new TopicPartitionMetadata(3L, new ProcessorMetadata( mkMap( @@ -1413,7 +1429,7 @@ public void shouldCommitFetchedNextOffsetIfRecordQueueIsEmpty() { final TopicPartitionMetadata metadata = new TopicPartitionMetadata(0, new ProcessorMetadata()); assertTrue(task.commitNeeded()); - assertThat(task.prepareCommit(), equalTo( + assertThat(task.prepareCommit(true), equalTo( mkMap( mkEntry(partition1, new OffsetAndMetadata(3L, Optional.of(2), metadata.encode())) ) @@ -1430,7 +1446,7 @@ public void shouldCommitFetchedNextOffsetIfRecordQueueIsEmpty() { task.process(0L); assertTrue(task.commitNeeded()); - assertThat(task.prepareCommit(), equalTo( + assertThat(task.prepareCommit(true), equalTo( mkMap( mkEntry(partition1, new OffsetAndMetadata(3L, Optional.of(2), metadata.encode())), mkEntry(partition2, new OffsetAndMetadata(1L, Optional.of(0), metadata.encode())) @@ -1486,7 +1502,7 @@ public void shouldCommitOldProcessorMetadataWhenNotDirty() { assertTrue(task.commitNeeded()); - assertThat(task.prepareCommit(), equalTo( + assertThat(task.prepareCommit(true), equalTo( mkMap( mkEntry(partition1, new OffsetAndMetadata(1L, Optional.of(1), expectedMetadata1.encode())), mkEntry(partition2, new OffsetAndMetadata(2L, Optional.of(1), expectedMetadata2.encode())) @@ -1509,7 +1525,7 @@ public void shouldCommitOldProcessorMetadataWhenNotDirty() { assertTrue(task.commitNeeded()); // Processor metadata not updated, we just need to commit to partition1 again with new offset - assertThat(task.prepareCommit(), equalTo( + assertThat(task.prepareCommit(true), equalTo( mkMap(mkEntry(partition1, new OffsetAndMetadata(2L, Optional.of(1), expectedMetadata3.encode()))) )); task.postCommit(false); @@ -1526,7 +1542,7 @@ public void shouldFailOnCommitIfTaskIsClosed() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, - task::prepareCommit + () -> task.prepareCommit(true) ); assertThat(thrown.getMessage(), is("Illegal state CLOSED while preparing active task 0_0 for committing")); @@ -1820,10 +1836,10 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // should not checkpoint assertThat("Map was empty", task.highWaterMark().size() == 2); @@ -1847,10 +1863,10 @@ public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold assertThat("Map was empty", task.highWaterMark().size() == 2); @@ -1866,7 +1882,7 @@ public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); final File checkpointFile = new File( stateDirectory.getOrCreateDirectoryForTask(taskId), @@ -2011,7 +2027,7 @@ public void shouldMaybeReturnOffsetsForRepartitionTopicsForPurging(final boolean assertTrue(task.process(0L)); assertTrue(task.process(0L)); - task.prepareCommit(); + task.prepareCommit(true); if (doCommit) { task.updateCommittedOffsets(repartition, 10L); } @@ -2050,7 +2066,7 @@ public void shouldThrowIfCommittingOnIllegalState() { task.transitionTo(SUSPENDED); task.transitionTo(Task.State.CLOSED); - assertThrows(IllegalStateException.class, task::prepareCommit); + assertThrows(IllegalStateException.class, () -> task.prepareCommit(true)); } @Test @@ -2101,7 +2117,7 @@ public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); task.suspend(); @@ -2123,7 +2139,7 @@ public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); // should checkpoint - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold task.suspend(); @@ -2207,7 +2223,7 @@ public void shouldCheckpointOnCloseRestoringIfNoProgress() { task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); // should flush and checkpoint task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); // should flush and checkpoint task.closeClean(); @@ -2277,7 +2293,7 @@ public void shouldCheckpointOffsetsOnPostCommit() { assertTrue(task.commitNeeded()); task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(false); assertEquals(SUSPENDED, task.state()); @@ -2307,7 +2323,7 @@ public void shouldThrowExceptionOnCloseCleanError() { assertTrue(task.commitNeeded()); task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); // should checkpoint assertThrows(ProcessorStateException.class, () -> task.closeClean()); @@ -2336,7 +2352,7 @@ public void shouldThrowOnCloseCleanFlushError() { task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset))); task.process(100L); - assertThrows(ProcessorStateException.class, task::prepareCommit); + assertThrows(ProcessorStateException.class, () -> task.prepareCommit(true)); assertEquals(RUNNING, task.state()); @@ -2369,7 +2385,7 @@ public void shouldThrowOnCloseCleanCheckpointError() { assertTrue(task.commitNeeded()); task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); assertThrows(ProcessorStateException.class, () -> task.postCommit(true)); assertEquals(Task.State.SUSPENDED, task.state()); @@ -2672,7 +2688,7 @@ public void shouldUpdateOffsetIfAllRecordsHaveInvalidTimestamp() { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(RecordQueue.UNKNOWN, new ProcessorMetadata()).encode())))) @@ -2704,7 +2720,7 @@ public void shouldUpdateOffsetIfValidRecordFollowsInvalidTimestamp() { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode())))) ); } @@ -2734,14 +2750,14 @@ public void shouldUpdateOffsetIfInvalidTimestampeRecordFollowsValid() { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) ); assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) ); } @@ -2771,7 +2787,7 @@ public void shouldUpdateOffsetIfAllRecordsAreCorrupted() { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(RecordQueue.UNKNOWN, new ProcessorMetadata()).encode())))) @@ -2803,7 +2819,7 @@ public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset + 1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode())))) ); } @@ -2834,14 +2850,14 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() { assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) ); assertTrue(task.process(offset)); assertTrue(task.commitNeeded()); assertThat( - task.prepareCommit(), + task.prepareCommit(true), equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new TopicPartitionMetadata(0, new ProcessorMetadata()).encode())))) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 9d7df53adbe95..d8bb35c000a5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -459,7 +459,7 @@ public void shouldRemoveUnusedFailedActiveTaskFromStateUpdaterAndCloseDirty() { taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()); - verify(activeTaskToClose).prepareCommit(); + verify(activeTaskToClose).prepareCommit(false); verify(activeTaskToClose).suspend(); verify(activeTaskToClose).closeDirty(); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); @@ -500,7 +500,7 @@ public void shouldRemoveUnusedFailedStandbyTaskFromStateUpdaterAndCloseDirty() { taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()); - verify(standbyTaskToClose).prepareCommit(); + verify(standbyTaskToClose).prepareCommit(false); verify(standbyTaskToClose).suspend(); verify(standbyTaskToClose).closeDirty(); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); @@ -996,7 +996,7 @@ public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithState taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions))); - verify(activeTaskToRecycle).prepareCommit(); + verify(activeTaskToRecycle).prepareCommit(true); verify(tasks).addPendingTasksToInit(Set.of(standbyTask)); verify(tasks).removeTask(activeTaskToRecycle); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); @@ -1019,7 +1019,7 @@ public void shouldAddRecycledStandbyTasksFromActiveToTaskRegistryWithStateUpdate taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions))); - verify(activeTaskToRecycle).prepareCommit(); + verify(activeTaskToRecycle).prepareCommit(true); verify(tasks).replaceActiveWithStandby(standbyTask); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); verify(standbyTaskCreator).createTasks(Collections.emptyMap()); @@ -1059,7 +1059,7 @@ public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdat taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); - verify(activeTaskToClose).prepareCommit(); + verify(activeTaskToClose).prepareCommit(true); verify(activeTaskToClose).closeClean(); verify(tasks).removeTask(activeTaskToClose); verify(standbyTaskCreator).createTasks(Collections.emptyMap()); @@ -1536,10 +1536,10 @@ public void shouldCloseDirtyWhenRemoveFailedActiveTasksFromStateUpdaterOnPartiti taskManager.handleLostAll(); - verify(task1).prepareCommit(); + verify(task1).prepareCommit(false); verify(task1).suspend(); verify(task1).closeDirty(); - verify(task2).prepareCommit(); + verify(task2).prepareCommit(false); verify(task2).suspend(); verify(task2).closeDirty(); } @@ -1569,7 +1569,7 @@ public void shouldCloseTasksWhenRemoveFailedActiveTasksFromStateUpdaterOnPartiti verify(task1).suspend(); verify(task1).closeClean(); - verify(task2).prepareCommit(); + verify(task2).prepareCommit(false); verify(task2).suspend(); verify(task2).closeDirty(); verify(task3).suspend(); @@ -2386,10 +2386,10 @@ public void shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningSt taskManager.handleCorruption(Set.of(taskId02)); verify(activeRestoringTask, never()).commitNeeded(); - verify(activeRestoringTask, never()).prepareCommit(); + verify(activeRestoringTask, never()).prepareCommit(true); verify(activeRestoringTask, never()).postCommit(anyBoolean()); verify(standbyTask, never()).commitNeeded(); - verify(standbyTask, never()).prepareCommit(); + verify(standbyTask, never()).prepareCommit(true); verify(standbyTask, never()).postCommit(anyBoolean()); } @@ -2418,9 +2418,9 @@ public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStand taskManager.handleCorruption(Set.of(taskId02)); verify(activeRestoringTask, never()).commitNeeded(); - verify(activeRestoringTask, never()).prepareCommit(); + verify(activeRestoringTask, never()).prepareCommit(true); verify(activeRestoringTask, never()).postCommit(anyBoolean()); - verify(standbyTask).prepareCommit(); + verify(standbyTask).prepareCommit(true); verify(standbyTask).postCommit(anyBoolean()); } @@ -2431,7 +2431,7 @@ public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorrupte final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { @Override - public Map prepareCommit() { + public Map prepareCommit(final boolean clean) { throw new TaskMigratedException("You dropped out of the group!", new RuntimeException()); } }; @@ -3394,7 +3394,7 @@ public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { @Override - public Map prepareCommit() { + public Map prepareCommit(final boolean clean) { throw new RuntimeException("task 0_1 prepare commit boom!"); } }; @@ -3560,7 +3560,7 @@ public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() { verify(activeTaskCreator).close(); verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); - verify(failedStatefulTask).prepareCommit(); + verify(failedStatefulTask).prepareCommit(false); verify(failedStatefulTask).suspend(); verify(failedStatefulTask).closeDirty(); } @@ -3634,16 +3634,16 @@ public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() { verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); verify(tasks).addTask(removedStatefulTask); verify(tasks).addTask(removedStandbyTask); - verify(removedFailedStatefulTask).prepareCommit(); + verify(removedFailedStatefulTask).prepareCommit(false); verify(removedFailedStatefulTask).suspend(); verify(removedFailedStatefulTask).closeDirty(); - verify(removedFailedStandbyTask).prepareCommit(); + verify(removedFailedStandbyTask).prepareCommit(false); verify(removedFailedStandbyTask).suspend(); verify(removedFailedStandbyTask).closeDirty(); - verify(removedFailedStatefulTaskDuringRemoval).prepareCommit(); + verify(removedFailedStatefulTaskDuringRemoval).prepareCommit(false); verify(removedFailedStatefulTaskDuringRemoval).suspend(); verify(removedFailedStatefulTaskDuringRemoval).closeDirty(); - verify(removedFailedStandbyTaskDuringRemoval).prepareCommit(); + verify(removedFailedStandbyTaskDuringRemoval).prepareCommit(false); verify(removedFailedStandbyTaskDuringRemoval).suspend(); verify(removedFailedStandbyTaskDuringRemoval).closeDirty(); } @@ -3869,7 +3869,7 @@ public void shouldCommitViaProducerIfEosV2Enabled() { public void shouldPropagateExceptionFromActiveCommit() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { @Override - public Map prepareCommit() { + public Map prepareCommit(final boolean clean) { throw new RuntimeException("opsh."); } }; @@ -3893,7 +3893,7 @@ public Map prepareCommit() { public void shouldPropagateExceptionFromStandbyCommit() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager) { @Override - public Map prepareCommit() { + public Map prepareCommit(final boolean clean) { throw new RuntimeException("opsh."); } }; @@ -4689,7 +4689,7 @@ public void shouldConvertStandbyTaskToActiveTask() { final StandbyTask standbyTask = mock(StandbyTask.class); when(standbyTask.id()).thenReturn(taskId00); when(standbyTask.isActive()).thenReturn(false); - when(standbyTask.prepareCommit()).thenReturn(Collections.emptyMap()); + when(standbyTask.prepareCommit(true)).thenReturn(Collections.emptyMap()); final StreamTask activeTask = mock(StreamTask.class); when(activeTask.id()).thenReturn(taskId00); @@ -4939,10 +4939,13 @@ public boolean commitRequested() { } @Override - public Map prepareCommit() { + public Map prepareCommit(final boolean clean) { commitPrepared = true; if (commitNeeded) { + if (!clean) { + return null; + } return committableOffsets; } else { return Collections.emptyMap(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java index aac2dd36b49da..d43670429b135 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java @@ -66,7 +66,7 @@ public void setUp() { when(task.isProcessable(anyLong())).thenReturn(true); when(task.id()).thenReturn(new TaskId(0, 0, "A")); when(task.process(anyLong())).thenReturn(true); - when(task.prepareCommit()).thenReturn(Collections.emptyMap()); + when(task.prepareCommit(true)).thenReturn(Collections.emptyMap()); } @AfterEach diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index a4cee67ad5fae..81c90d043cec4 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -591,7 +591,7 @@ private void completeAllProcessableWork() { // Process the record ... task.process(mockWallClockTime.milliseconds()); task.maybePunctuateStreamTime(); - commit(task.prepareCommit()); + commit(task.prepareCommit(true)); task.postCommit(true); captureOutputsAndReEnqueueInternalResults(); } @@ -709,7 +709,7 @@ public void advanceWallClockTime(final Duration advance) { mockWallClockTime.sleep(advance.toMillis()); if (task != null) { task.maybePunctuateSystemTime(); - commit(task.prepareCommit()); + commit(task.prepareCommit(true)); task.postCommit(true); } completeAllProcessableWork(); @@ -1130,7 +1130,7 @@ public SessionStore getSessionStore(final String name) { public void close() { if (task != null) { task.suspend(); - task.prepareCommit(); + task.prepareCommit(true); task.postCommit(true); task.closeClean(); }