Skip to content

KAFKA-14830: Illegal state error in transactional producer #17022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -737,14 +737,21 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception)
}

synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) {
maybeTransitionToErrorState(exception);
if (!isStaleBatch(batch) && !hasFatalError())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand the !hasFatalError() condition. Can you elaborate? -- I thought we want to call maybeTransitionToErrorState(exception); for any non-stale batch, independent of the current error state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the type of the exception pass in, the logic in maybeTransitionToErrorState() may set the internal state to either FATAL_ERROR or ABORTABLE_ERROR. Assuming there's a race condition of failures, it's possible the transaction manager could be set to a FATAL_ERROR state, followed by a call to handleFailedBatch() that then attempts to set the state to ABORTABLE_ERROR. Transitioning from FATAL_ERROR to any other state results in an IllegalStateException.

This is an attempt to prevent that case. I will add another unit test or two to make sure this is a valid concern.

cc @jolshan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add another unit test or two to make sure this is a valid concern.

Turns out I already had a unit test for that: testBatchesReceivedAfterFatalError(). If I remove the !hasFatalError() condition that test fails:

TransactionalId foobar: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR
java.lang.IllegalStateException: TransactionalId foobar: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR
	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1082)
	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:498)
	at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:735)
	at org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:745)
	at org.apache.kafka.clients.producer.internals.TransactionManagerTest.testBatchesReceivedAfterFatalError(TransactionManagerTest.java:3987)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the question here? I think we want to avoid extra errors if we are already in fatal.

maybeTransitionToErrorState(exception);

removeInFlightBatch(batch);

if (hasFatalError()) {
log.debug("Ignoring batch {} with producer id {}, epoch {}, and sequence number {} " +
"since the producer is already in fatal error state", batch, batch.producerId(),
batch.producerEpoch(), batch.baseSequence(), exception);
return;
} else if (isStaleBatch(batch)) {
log.debug("Ignoring stale batch {} with producer id {}, epoch {}, and sequence number {} " +
"since the producer has been re-initialized with producer id {} and epoch {}", batch, batch.producerId(),
batch.producerEpoch(), batch.baseSequence(), producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, exception);
return;
}

if (exception instanceof OutOfOrderSequenceException && !isTransactional()) {
Expand Down Expand Up @@ -772,6 +779,14 @@ synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException except
}
}

/**
* Returns {@code true} if the given {@link ProducerBatch} has the same producer ID but a different epoch than the
* {@link #producerIdAndEpoch cached producer ID and epoch}.
*/
synchronized boolean isStaleBatch(ProducerBatch batch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method need to by synchronized?

Also seems it could be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method need to by synchronized?

No, not at present.

Also seems it could be private?

Yes, it could. I might even just inline it in handleFailedBatch() to simplify things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined the 'is stale batch' logic into handleFailedBatch(), so these issues are no longer present.

return batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() != producerIdAndEpoch.epoch;
}

synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
return txnPartitionMap.getOrCreate(topicPartition).hasInflightBatches();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
Expand Down Expand Up @@ -3948,6 +3949,44 @@ public void testTransactionAbortableExceptionInAddOffsetsToTxn() {
assertAbortableError(TransactionAbortableException.class);
}

@Test
public void testBatchesReceivedAfterAbortableError() {
doInitTransactions();
transactionManager.beginTransaction();

ProducerBatch batch = writeIdempotentBatchWithValue(transactionManager, tp1, "first");

// The producer's connection to the broker is tenuous, so this mimics the catch block for ApiException in
// KafkaProducer.doSend().
transactionManager.maybeTransitionToErrorState(new DisconnectException("test"));

// The above error is bubbled up to the user who then aborts the transaction...
TransactionalRequestResult result = transactionManager.beginAbort();

// The transaction manager handles the abort internally and re-initializes the epoch
short bumpedEpoch = epoch + 1;
prepareInitPidResponse(Errors.NONE, false, producerId, bumpedEpoch);
runUntil(result::isCompleted);

// This mimics a slower produce response that receives the timeout on the client after the above rollback
// has completed. The failed batch should not attempt to change the state since it's stale.
transactionManager.handleFailedBatch(batch, new TimeoutException(), false);
}

@Test
public void testBatchesReceivedAfterFatalError() {
doInitTransactions();
transactionManager.beginTransaction();

ProducerBatch batch = writeIdempotentBatchWithValue(transactionManager, tp1, "first");

// This mimics something that causes the transaction manager to enter its FATAL_ERROR state.
transactionManager.transitionToFatalError(Errors.PRODUCER_FENCED.exception());

// However, even with this failure, the failed batch should not attempt to update to ABORTABLE_ERROR.
transactionManager.handleFailedBatch(batch, new TimeoutException(), false);
}

@Test
public void testTransactionAbortableExceptionInTxnOffsetCommit() {
final TopicPartition tp = new TopicPartition("foo", 0);
Expand Down
Loading