diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c78134c72ecf2..d530e9c01e738 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -737,7 +737,13 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception) } synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) { - maybeTransitionToErrorState(exception); + // Compare the batch with the current ProducerIdAndEpoch. If the producer IDs are the *same* but the epochs + // are *different*, consider the batch as stale. + boolean isStaleBatch = batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() != producerIdAndEpoch.epoch; + + if (!isStaleBatch && !hasFatalError()) + maybeTransitionToErrorState(exception); + removeInFlightBatch(batch); if (hasFatalError()) { @@ -745,6 +751,11 @@ synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException except "since the producer is already in fatal error state", batch, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), exception); return; + } else if (isStaleBatch) { + log.debug("Ignoring stale batch {} with producer id {}, epoch {}, and sequence number {} " + + "since the producer has been re-initialized with producer id {} and epoch {}", batch, batch.producerId(), + batch.producerEpoch(), batch.baseSequence(), producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, exception); + return; } if (exception instanceof OutOfOrderSequenceException && !isTransactional()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 0d582bf80168d..78bf30ac079e2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidTxnStateException; @@ -3948,6 +3949,44 @@ public void testTransactionAbortableExceptionInAddOffsetsToTxn() { assertAbortableError(TransactionAbortableException.class); } + @Test + public void testBatchesReceivedAfterAbortableError() { + doInitTransactions(); + transactionManager.beginTransaction(); + + ProducerBatch batch = writeIdempotentBatchWithValue(transactionManager, tp1, "first"); + + // The producer's connection to the broker is tenuous, so this mimics the catch block for ApiException in + // KafkaProducer.doSend(). + transactionManager.maybeTransitionToErrorState(new DisconnectException("test")); + + // The above error is bubbled up to the user who then aborts the transaction... + TransactionalRequestResult result = transactionManager.beginAbort(); + + // The transaction manager handles the abort internally and re-initializes the epoch + short bumpedEpoch = epoch + 1; + prepareInitPidResponse(Errors.NONE, false, producerId, bumpedEpoch); + runUntil(result::isCompleted); + + // This mimics a slower produce response that receives the timeout on the client after the above rollback + // has completed. The failed batch should not attempt to change the state since it's stale. + transactionManager.handleFailedBatch(batch, new TimeoutException(), false); + } + + @Test + public void testBatchesReceivedAfterFatalError() { + doInitTransactions(); + transactionManager.beginTransaction(); + + ProducerBatch batch = writeIdempotentBatchWithValue(transactionManager, tp1, "first"); + + // This mimics something that causes the transaction manager to enter its FATAL_ERROR state. + transactionManager.transitionToFatalError(Errors.PRODUCER_FENCED.exception()); + + // However, even with this failure, the failed batch should not attempt to update to ABORTABLE_ERROR. + transactionManager.handleFailedBatch(batch, new TimeoutException(), false); + } + @Test public void testTransactionAbortableExceptionInTxnOffsetCommit() { final TopicPartition tp = new TopicPartition("foo", 0);