Skip to content

KAFKA-14830: Illegal state error in transactional producer #17022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from

Conversation

kirktrue
Copy link
Contributor

@kirktrue kirktrue commented Aug 27, 2024

When the producer's transaction manager receives a notification that an
error has occurred during a transaction, it takes steps to abort the
transaction and reset its internal state.

Users have reported the following case where a producer experiences
timeouts while in a transaction:

  1. The TransactionManager (TM) starts with state READY and epoch
    set to 0
  2. A transaction (T1) begins and TM sets its internal state to
    IN_TRANSACTION
  3. Batches are created and sent off to their respective brokers
  4. A timeout threshold is hit
  5. T1 starts the abort process
    1. TM state is set to ABORTING_TRANSACTION
    2. The batches involved with T1 are marked as expired
    3. TM is reinitialized, bumping the epoch from 0 to 1 and
      setting its state to READY
  6. A moment later, in the Sender thread, one of the failed batches
    calls handleFailedBatch()
  7. handleFailedBatch() sets the TM state to ABORTABLE_ERROR which
    is an invalid state transition from READY, hence the exception

This change compares the transaction manager's current epoch (1)
with the batch's epoch (0). If they're different, the batch is
considered "stale" and can be ignored (though a DEBUG message is
logged).

@@ -667,14 +667,23 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception)
}

synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) {
maybeTransitionToErrorState(exception);
boolean isStaleBatch = batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() < producerIdAndEpoch.epoch;
Copy link
Member

@jolshan jolshan Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there are any cases where producerIdAndEpoch could have a race -- or is case there the ID and epoch are the same but the issue still happens

btw -- maybe not super common, but could the overflow case be missed here? (new producer id and epoch resets due to epoch reaching max value)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @jolshan!

I'm wondering if there are any cases where producerIdAndEpoch could have a race -- or is case there the ID and epoch are the same but the issue still happens

There are a couple of bug reports with logs. I'll dig through those to see if it's happened in the wild.

btw -- maybe not super common, but could the overflow case be missed here? (new producer id and epoch resets due to epoch reaching max value)

Sounds super rare ;)

If an epoch overflowed, wouldn't that just be interpreted as 'not equal' to the last known epoch, and thus trigger the "stale batch" logic? Perhaps my understanding of staleness is too naive?

Thanks!

@jolshan
Copy link
Member

jolshan commented Aug 28, 2024

I think the overall approach makes sense. But I would like to see some tests to see if the issue is improved. If so the logging could also give us some more insight.

@github-actions github-actions bot added clients small Small PRs labels Oct 17, 2024
@kirktrue kirktrue added transactions Transactions and EOS ci-approved labels Oct 17, 2024
Copy link

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label Jan 16, 2025
@kirktrue kirktrue removed the stale Stale PRs label Jan 16, 2025
@kirktrue
Copy link
Contributor Author

Still needed, just lower priority 😞

@kirktrue
Copy link
Contributor Author

cc @k-raina

@kirktrue kirktrue marked this pull request as ready for review March 31, 2025 20:47
@kirktrue
Copy link
Contributor Author

kirktrue commented Mar 31, 2025

I think the overall approach makes sense. But I would like to see some tests to see if the issue is improved. If so the logging could also give us some more insight.

@jolshan—The unit tests mimic the use cases that were seen in the wild. What other test cases should we consider? Thanks!

@kirktrue kirktrue changed the title [WIP] KAFKA-14830: Illegal state error in transactional producer KAFKA-14830: Illegal state error in transactional producer Apr 7, 2025
@kirktrue kirktrue requested a review from jolshan April 7, 2025 18:48
* Returns {@code true} if the given {@link ProducerBatch} has the same producer ID but a different epoch than the
* {@link #producerIdAndEpoch cached producer ID and epoch}.
*/
synchronized boolean isStaleBatch(ProducerBatch batch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method need to by synchronized?

Also seems it could be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method need to by synchronized?

No, not at present.

Also seems it could be private?

Yes, it could. I might even just inline it in handleFailedBatch() to simplify things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined the 'is stale batch' logic into handleFailedBatch(), so these issues are no longer present.

@@ -737,14 +737,21 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception)
}

synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) {
maybeTransitionToErrorState(exception);
if (!isStaleBatch(batch) && !hasFatalError())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand the !hasFatalError() condition. Can you elaborate? -- I thought we want to call maybeTransitionToErrorState(exception); for any non-stale batch, independent of the current error state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the type of the exception pass in, the logic in maybeTransitionToErrorState() may set the internal state to either FATAL_ERROR or ABORTABLE_ERROR. Assuming there's a race condition of failures, it's possible the transaction manager could be set to a FATAL_ERROR state, followed by a call to handleFailedBatch() that then attempts to set the state to ABORTABLE_ERROR. Transitioning from FATAL_ERROR to any other state results in an IllegalStateException.

This is an attempt to prevent that case. I will add another unit test or two to make sure this is a valid concern.

cc @jolshan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add another unit test or two to make sure this is a valid concern.

Turns out I already had a unit test for that: testBatchesReceivedAfterFatalError(). If I remove the !hasFatalError() condition that test fails:

TransactionalId foobar: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR
java.lang.IllegalStateException: TransactionalId foobar: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR
	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1082)
	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:498)
	at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:735)
	at org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:745)
	at org.apache.kafka.clients.producer.internals.TransactionManagerTest.testBatchesReceivedAfterFatalError(TransactionManagerTest.java:3987)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)

Comment on lines +744 to +745
if (!isStaleBatch && !hasFatalError())
maybeTransitionToErrorState(exception);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment helpful or distracting?

Suggested change
if (!isStaleBatch && !hasFatalError())
maybeTransitionToErrorState(exception);
// It's possible the transaction manager is already in the FATAL_ERROR
// state at this point. Depending on the incoming exception type,
// maybeTransitionToErrorState() could attempt to set the state to
// ABORTABLE_ERROR. For example, suppose a fatal error occurred during a
// transaction, and then moments later one of the batches in that
// transaction failed with a TimeoutException.
// maybeTransitionToErrorState() would then (blindly) attempt to
// transition to ABORTABLE_ERROR, which is invalid and would result
// in an IllegalStateException.
//
// Therefore, only attempt to transition to the FATAL_ERROR state if
// the batch is "fresh" *and* the transaction manager is not already
// in the FATAL_ERROR state.
if (!isStaleBatch && !hasFatalError())
maybeTransitionToErrorState(exception);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants