Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
throw new UnsupportedOperationException("This task is read-only");
}

@Override
public void flush() {
throw new UnsupportedOperationException("This task is read-only");
}

@Override
public void postCommit(final boolean enforceCheckpoint) {
throw new UnsupportedOperationException("This task is read-only");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
return Collections.emptyMap();
}

@Override
public void flush() {
throw new UnsupportedOperationException("Flushing behavior is not required for standby tasks.");
}

@Override
public void postCommit(final boolean enforceCheckpoint) {
switch (state()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public void resume() {
timeCurrentIdlingStarted = Optional.empty();
}


@Override
public void flush() {
stateMgr.flushCache();
recordCollector.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ default boolean maybePunctuateSystemTime() {
*/
Map<TopicPartition, OffsetAndMetadata> prepareCommit();

void flush();

void postCommit(boolean enforceCheckpoint);

default Map<TopicPartition, Long> purgeableOffsets() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ private void closeTaskDirty(final Task task, final boolean removeFromTasksRegist
try {
// we call this function only to flush the case if necessary
// before suspending and closing the topology
task.prepareCommit();
task.flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SteamTask.prepareCommit() set a boolean flag hasPending TxCommit if using EOS and the commitNeeded flag is true - is it OK to bypass that? I'm thinking so but I'd like to confirm.

} catch (final RuntimeException swallow) {
log.warn("Error flushing cache of dirty task {}. " +
"Since the task is closing dirty, the following exception is swallowed: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void shouldRemoveUnusedFailedActiveTaskFromStateUpdaterAndCloseDirty() {

taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());

verify(activeTaskToClose).prepareCommit();
verify(activeTaskToClose).flush();
verify(activeTaskToClose).suspend();
verify(activeTaskToClose).closeDirty();
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
Expand Down Expand Up @@ -500,7 +500,7 @@ public void shouldRemoveUnusedFailedStandbyTaskFromStateUpdaterAndCloseDirty() {

taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());

verify(standbyTaskToClose).prepareCommit();
verify(standbyTaskToClose).flush();
verify(standbyTaskToClose).suspend();
verify(standbyTaskToClose).closeDirty();
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
Expand Down Expand Up @@ -1536,10 +1536,10 @@ public void shouldCloseDirtyWhenRemoveFailedActiveTasksFromStateUpdaterOnPartiti

taskManager.handleLostAll();

verify(task1).prepareCommit();
verify(task1).flush();
verify(task1).suspend();
verify(task1).closeDirty();
verify(task2).prepareCommit();
verify(task2).flush();
verify(task2).suspend();
verify(task2).closeDirty();
}
Expand Down Expand Up @@ -1569,7 +1569,7 @@ public void shouldCloseTasksWhenRemoveFailedActiveTasksFromStateUpdaterOnPartiti

verify(task1).suspend();
verify(task1).closeClean();
verify(task2).prepareCommit();
verify(task2).flush();
verify(task2).suspend();
verify(task2).closeDirty();
verify(task3).suspend();
Expand Down Expand Up @@ -2179,7 +2179,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {

// `handleLostAll`
taskManager.handleLostAll();
assertThat(task00.commitPrepared, is(true));
assertThat(task00.flushed, is(true));
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
Expand Down Expand Up @@ -3560,7 +3560,7 @@ public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() {

verify(activeTaskCreator).close();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(failedStatefulTask).prepareCommit();
verify(failedStatefulTask).flush();
verify(failedStatefulTask).suspend();
verify(failedStatefulTask).closeDirty();
}
Expand Down Expand Up @@ -3634,16 +3634,16 @@ public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() {
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(tasks).addTask(removedStatefulTask);
verify(tasks).addTask(removedStandbyTask);
verify(removedFailedStatefulTask).prepareCommit();
verify(removedFailedStatefulTask).flush();
verify(removedFailedStatefulTask).suspend();
verify(removedFailedStatefulTask).closeDirty();
verify(removedFailedStandbyTask).prepareCommit();
verify(removedFailedStandbyTask).flush();
verify(removedFailedStandbyTask).suspend();
verify(removedFailedStandbyTask).closeDirty();
verify(removedFailedStatefulTaskDuringRemoval).prepareCommit();
verify(removedFailedStatefulTaskDuringRemoval).flush();
verify(removedFailedStatefulTaskDuringRemoval).suspend();
verify(removedFailedStatefulTaskDuringRemoval).closeDirty();
verify(removedFailedStandbyTaskDuringRemoval).prepareCommit();
verify(removedFailedStandbyTaskDuringRemoval).flush();
verify(removedFailedStandbyTaskDuringRemoval).suspend();
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
}
Expand Down Expand Up @@ -4880,6 +4880,7 @@ private static class StateMachineTask extends AbstractTask implements Task {
private boolean commitNeeded = false;
private boolean commitRequested = false;
private boolean commitPrepared = false;
private boolean flushed = false;
private boolean commitCompleted = false;
private Map<TopicPartition, OffsetAndMetadata> committableOffsets = Collections.emptyMap();
private Map<TopicPartition, Long> purgeableOffsets;
Expand Down Expand Up @@ -4949,6 +4950,11 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
}
}

@Override
public void flush() {
flushed = true;
}

@Override
public void postCommit(final boolean enforceCheckpoint) {
commitNeeded = false;
Expand Down