Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public boolean maybePunctuateSystemTime() {
}

@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) {
throw new UnsupportedOperationException("This task is read-only");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void resume() {
* or flushing state store get IO errors; such error should cause the thread to die
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) {
switch (state()) {
case CREATED:
log.debug("Skipped preparing created task for commit");
Expand All @@ -189,15 +189,19 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
case RUNNING:
case SUSPENDED:
// do not need to flush state store caches in pre-commit since nothing would be sent for standby tasks
log.debug("Prepared {} task for committing", state());
if (!clean) {
log.debug("Skipped preparing {} standby task with id {} for commit since the task is getting closed dirty.", state(), id);
} else {
log.debug("Prepared {} task for committing", state());
}

break;

default:
throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
}

return Collections.emptyMap();
return clean ? Collections.emptyMap() : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,6 @@ public void resume() {
timeCurrentIdlingStarted = Optional.empty();
}


public void flush() {
stateMgr.flushCache();
recordCollector.flush();
Expand All @@ -429,7 +428,7 @@ public void flush() {
* @return offsets that should be committed for this task
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) {
switch (state()) {
case CREATED:
case RESTORING:
Expand All @@ -444,6 +443,10 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
//
// TODO: this should be removed after we decouple caching with emitting
flush();
if (!clean) {
log.debug("Skipped preparing {} task with id {} for commit since the task is getting closed dirty.", state(), id);
return null;
}
hasPendingTxCommit = eosEnabled;

log.debug("Prepared {} task for committing", state());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ default boolean maybePunctuateSystemTime() {
/**
* @throws StreamsException fatal error, should close the thread
*/
Map<TopicPartition, OffsetAndMetadata> prepareCommit();
Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean);

void postCommit(boolean enforceCheckpoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCo
for (final Task task : tasksToCommit) {
// we need to call commitNeeded first since we need to update committable offsets
if (task.commitNeeded()) {
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit(true);
if (!offsetAndMetadata.isEmpty()) {
consumedOffsetsAndMetadata.put(task, offsetAndMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
// we do not need to take the returned offsets since we are not going to commit anyways;
// this call is only used for active tasks to flush the cache before suspending and
// closing the topology
task.prepareCommit();
task.prepareCommit(false);
} catch (final RuntimeException swallow) {
log.warn("Error flushing cache for corrupted task {}. " +
"Since the task is closing dirty, the following exception is swallowed: {}",
Expand Down Expand Up @@ -812,7 +812,7 @@ private Map<TaskId, RuntimeException> closeAndRecycleTasks(final Map<Task, Set<T
// and their changelog positions should not change at all postCommit would not write the checkpoint again.
// 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
// write the checkpoint file.
final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit(true);
if (!offsets.isEmpty()) {
log.error("Task {} should have been committed when it was suspended, but it reports non-empty " +
"offsets {} to commit; this means it failed during last commit and hence should be closed dirty",
Expand Down Expand Up @@ -1264,7 +1264,7 @@ private void prepareCommitAndAddOffsetsToMap(final Set<Task> tasksToPrepare,
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
for (final Task task : tasksToPrepare) {
try {
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(true);
if (!committableOffsets.isEmpty()) {
consumedOffsetsPerTask.put(task, committableOffsets);
}
Expand Down Expand Up @@ -1479,7 +1479,7 @@ private void closeTaskDirty(final Task task, final boolean removeFromTasksRegist
try {
// we call this function only to flush the case if necessary
// before suspending and closing the topology
task.prepareCommit();
task.prepareCommit(false);
} catch (final RuntimeException swallow) {
log.warn("Error flushing cache of dirty task {}. " +
"Since the task is closing dirty, the following exception is swallowed: {}",
Expand Down Expand Up @@ -1630,7 +1630,7 @@ private Collection<Task> tryCloseCleanActiveTasks(final Collection<Task> activeT
// first committing all tasks and then suspend and close them clean
for (final Task task : activeTasksToClose) {
try {
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(true);
tasksToCommit.add(task);
if (!committableOffsets.isEmpty()) {
consumedOffsetsAndMetadataPerTask.put(task, committableOffsets);
Expand Down Expand Up @@ -1719,7 +1719,7 @@ private Collection<Task> tryCloseCleanStandbyTasks(final Collection<Task> standb
// first committing and then suspend / close clean
for (final Task task : standbyTasksToClose) {
try {
task.prepareCommit();
task.prepareCommit(true);
task.postCommit(true);
task.suspend();
closeTaskClean(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void shouldThrowIfCommittingOnIllegalState() {
task.suspend();
task.closeClean();

assertThrows(IllegalStateException.class, task::prepareCommit);
assertThrows(IllegalStateException.class, () -> task.prepareCommit(true));
}

@Test
Expand Down Expand Up @@ -261,13 +261,13 @@ public void shouldFlushAndCheckpointStateManagerOnCommit() {

task = createStandbyTask();
task.initializeIfNeeded();
task.prepareCommit();
task.prepareCommit(true);
task.postCommit(false); // this should not checkpoint

task.prepareCommit();
task.prepareCommit(true);
task.postCommit(false); // this should checkpoint

task.prepareCommit();
task.prepareCommit(true);
task.postCommit(false); // this should not checkpoint

verify(stateManager).checkpoint();
Expand Down Expand Up @@ -322,7 +322,7 @@ public void shouldSuspendAndCommitBeforeCloseClean() {
task = createStandbyTask();
task.initializeIfNeeded();
task.suspend();
task.prepareCommit();
task.prepareCommit(true);
task.postCommit(true);
task.closeClean();

Expand Down Expand Up @@ -360,7 +360,7 @@ public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() {
// could commit if the offset advanced beyond threshold
assertTrue(task.commitNeeded());

task.prepareCommit();
task.prepareCommit(true);
task.postCommit(true);
}

Expand Down Expand Up @@ -389,7 +389,7 @@ public void shouldThrowOnCloseCleanCheckpointError() {
task = createStandbyTask();
task.initializeIfNeeded();

task.prepareCommit();
task.prepareCommit(true);
assertThrows(RuntimeException.class, () -> task.postCommit(true));

assertEquals(RUNNING, task.state());
Expand Down
Loading