Skip to content

KAFKA-14830: Illegal state error in transactional producer #17022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -667,14 +667,23 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception)
}

synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) {
maybeTransitionToErrorState(exception);
boolean isStaleBatch = batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() < producerIdAndEpoch.epoch;
Copy link
Member

@jolshan jolshan Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there are any cases where producerIdAndEpoch could have a race -- or is case there the ID and epoch are the same but the issue still happens

btw -- maybe not super common, but could the overflow case be missed here? (new producer id and epoch resets due to epoch reaching max value)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @jolshan!

I'm wondering if there are any cases where producerIdAndEpoch could have a race -- or is case there the ID and epoch are the same but the issue still happens

There are a couple of bug reports with logs. I'll dig through those to see if it's happened in the wild.

btw -- maybe not super common, but could the overflow case be missed here? (new producer id and epoch resets due to epoch reaching max value)

Sounds super rare ;)

If an epoch overflowed, wouldn't that just be interpreted as 'not equal' to the last known epoch, and thus trigger the "stale batch" logic? Perhaps my understanding of staleness is too naive?

Thanks!


if (!isStaleBatch && !hasFatalError())
maybeTransitionToErrorState(exception);
Comment on lines +744 to +745
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment helpful or distracting?

Suggested change
if (!isStaleBatch && !hasFatalError())
maybeTransitionToErrorState(exception);
// It's possible the transaction manager is already in the FATAL_ERROR
// state at this point. Depending on the incoming exception type,
// maybeTransitionToErrorState() could attempt to set the state to
// ABORTABLE_ERROR. For example, suppose a fatal error occurred during a
// transaction, and then moments later one of the batches in that
// transaction failed with a TimeoutException.
// maybeTransitionToErrorState() would then (blindly) attempt to
// transition to ABORTABLE_ERROR, which is invalid and would result
// in an IllegalStateException.
//
// Therefore, only attempt to transition to the FATAL_ERROR state if
// the batch is "fresh" *and* the transaction manager is not already
// in the FATAL_ERROR state.
if (!isStaleBatch && !hasFatalError())
maybeTransitionToErrorState(exception);

Copy link
Contributor

@k-raina k-raina Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        // Therefore, only attempt to transition to the FATAL_ERROR state if
        // the batch is "fresh" *and* the transaction manager is not already
        // in the FATAL_ERROR state.

FATAL_ERROR to FATAL_ERROR is not an invalid transition. Do you mean only attempt transition to the ABORTABLE_ERROR state

EDIT: Okay i got that check is trying to say "Do not allow any transitions if transaction is in fatal state"


removeInFlightBatch(batch);

if (hasFatalError()) {
log.debug("Ignoring batch {} with producer id {}, epoch {}, and sequence number {} " +
"since the producer is already in fatal error state", batch, batch.producerId(),
batch.producerEpoch(), batch.baseSequence(), exception);
return;
} else if (isStaleBatch) {
log.debug("Ignoring stale batch {} with producer id {}, epoch {}, and sequence number {} " +
"since the producer has been re-initialized with producer id {} and epoch {}", batch, batch.producerId(),
batch.producerEpoch(), batch.baseSequence(), producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, exception);
return;
}

if (exception instanceof OutOfOrderSequenceException && !isTransactional()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
Expand Down Expand Up @@ -3610,6 +3611,44 @@ public void testTransactionAbortableExceptionInAddOffsetsToTxn() {
assertAbortableError(TransactionAbortableException.class);
}

@Test
public void testBatchesReceivedAfterAbortableError() {
doInitTransactions();
transactionManager.beginTransaction();

ProducerBatch batch = writeIdempotentBatchWithValue(transactionManager, tp1, "first");

// The producer's connection to the broker is tenuous, so this mimics the catch block for ApiException in
// KafkaProducer.doSend().
transactionManager.maybeTransitionToErrorState(new DisconnectException("test"));

// The above error is bubbled up to the user who then aborts the transaction...
TransactionalRequestResult result = transactionManager.beginAbort();

// The transaction manager handles the abort internally and re-initializes the epoch
short bumpedEpoch = epoch + 1;
prepareInitPidResponse(Errors.NONE, false, producerId, bumpedEpoch);
runUntil(result::isCompleted);

// This mimics a slower produce response that receives the timeout on the client after the above rollback
// has completed. The failed batch should not attempt to change the state since it's stale.
transactionManager.handleFailedBatch(batch, new TimeoutException(), false);
}

@Test
public void testBatchesReceivedAfterFatalError() {
doInitTransactions();
transactionManager.beginTransaction();

ProducerBatch batch = writeIdempotentBatchWithValue(transactionManager, tp1, "first");

// This mimics something that causes the transaction manager to enter its FATAL_ERROR state.
transactionManager.transitionToFatalError(Errors.PRODUCER_FENCED.exception());

// However, even with this failure, the failed batch should not attempt to update to ABORTABLE_ERROR.
transactionManager.handleFailedBatch(batch, new TimeoutException(), false);
}

@Test
public void testTransactionAbortableExceptionInTxnOffsetCommit() {
final TopicPartition tp = new TopicPartition("foo", 0);
Expand Down
Loading