Skip to content

Conversation

@Nikita-Shupletsov
Copy link
Contributor

Added logic to close pending tasks to init.
Made standby task closure similar to the one for active tasks. Added a separate method for getting standby tasks from task registry. Added an integration test that reproduces the issue.

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Added logic to close pending tasks to init.
Made standby task closure similar to the one for active tasks.
Added a separate method for getting standby tasks from task registry.
Added an integration test that reproduces the issue.
@github-actions github-actions bot added triage PRs from the community streams labels Jan 27, 2026
@mjsax mjsax added ci-approved and removed triage PRs from the community labels Jan 29, 2026
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a first pass.

Can you update the PR description adding context on what the bug exactly is, and when we hit it? It seems to be related to not closing "pending tasks", but might be good to give some more context.


streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default IIRC -- no need to set it

streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Already the default.

@Override
public boolean setFlushListener(final CacheFlushListener<Bytes, byte[]> listener,
final boolean sendOldValues) {
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return super.setFlushListener(...)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we just implement CachedStateStore, there is no super


@Override
public void flushCache() {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call super.flushCache()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we just implement CachedStateStore, there is no super

}

@Test
public void shouldClosePendingTasksToInitAfterRebalance() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to add a test description at the top, explaining what we are exactly testing.

Starting two KS app
Blocking the first KS app during the rebalance (which is started when the second apps starts) because <explanation>

and so forth. I am not sure I understand the test yet, and I am sure if somebody come back to it at some point in the future they might struggle, too.

thread.setStateListener((t, newState, oldState) -> {
originalListener.onChange(t, newState, oldState);
listener.onChange(t, newState, oldState);
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change? It seems you try to allow to register a second state listener? In general, we allow to only register one -- why do we need two? (Also, if we need multiple, why limit to two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it overrides the default one, and KafkaStreams#state doesn't work.
with this change if we add a special state listener, we keep the old one as well.
if for some reason we would like to add two additional state listeners, this implementation will work too. we will just need to call it twice. then it will wrap it twice and all three listeners will be called instead of one.

the reason I changed the method was pretty much because I wanted to have a listener and a to be able to call KafkaStreams#state and get the actual state as well

}

if (taskToRemove.isActive()) {
if (pendingTasksToInit.contains(taskToRemove)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We throw above if a task is not CLOSED or SUSPENDED, but pendingTaskToInit should be in state CREATED ? -- Do we need to update the above condition?

Are we testing this code path properly? If no test fails, it seems we never call removeTask() with a pending task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's supposed to be CLOSED too.
I added it because in TaskManager when we close a task, we remove it from the task registry(e.g. https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1405), so we expect the pending task to be closed as well.

final AtomicReference<RuntimeException> firstException) {
if (!clean) {
return activeTaskIterable();
return activeTasksToClose;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding. The original code here was not wrong in the sense that before this PR the activeTasksToClose that got passed in was the same as activeTaskIterable()? But with this PR, we also include pending task in activeTasksToClose, and that's why we need to update the code here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason why I decided to change this was that if we follow the call:

  • https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1405, here we pass two lists of tasks to close. the active ones used to accept activeTaskIterable as well, but it was changed at some point.
  • then we get into closeAndCleanUpTasks, then into tryCloseCleanActiveTasks, tryCloseCleanStandbyTasks
  • at this point we check if we even need to try to close them cleanly. if not Returns the set of active tasks that must be closed dirty
  • at this point it makes sense to return the list we received, not a different list. because it's a potential place for a bug: we ask the method to close one set of tasks, but it closes a different set of tasks.

even without my change it's already a bit incorrect, because we take the active tasks from the task registry and from the state updater: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1708-L1709, which isn't really a big deal, because we handle the state updater before this call, so there should be nothing there.
but if it changes, it will be pretty hard to notice that slight change in the behavior.

on top of that, as you mentioned, this list doesn't contain the pending tasks. so if we keep it as is, when we shutdown dirtily, we will not close them.

}

@Override
public synchronized void removeTask(final Task taskToRemove) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we also need a fix for this method in more details? It make sense to me, that we need to fix shutdown(), but not totally sure about this one.

Not saying we don't need to fix it, I just don't see the full picture of the bug yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I mentioned above, this method is called during shutdown when we finish closing a task(cleanly or dirtily). without this change this method will be throwing Attempted to remove an active task that is not owned or Attempted to remove a standby task that is not owned

…ms/integration/RebalanceTaskClosureIntegrationTest.java

Co-authored-by: Matthias J. Sax <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants