Skip to content

Commit a0203dc

Browse files
authored
KAFKA-20322: Add API version discovery true for transaction manager (apache#21782)
[KIP-1228](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1228%3A+Add+Transaction+Version+to+WriteTxnMarkersRequest) added a WriteTxnMarkersRequest v2 with a TransactionVersion field. However, TransactionMarkerChannelManager creates its NetworkClient with discoverBrokerVersions=false, which disables API version negotiation with peer brokers. Without version discovery, the ApiVersions cache is never populated — apiVersions.get(nodeId) returns null in NetworkClient.doSend(), causing it to fall through to builder.latestAllowedVersion() which blindly uses the highest version the sending broker knows about rather than negotiating a mutually supported version. In this fix we enable discovery and also fix the system test that could've caught this issue earlier. The system test was run locally and I verified that it failed without the fix and passed with the fix. Reviewers: Artem Livshits <alivshits@confluent.io>, David Jacot <djacot@confluent.io>
1 parent a80deda commit a0203dc

3 files changed

Lines changed: 62 additions & 14 deletions

File tree

core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ object TransactionMarkerChannelManager {
9595
config.connectionSetupTimeoutMs,
9696
config.connectionSetupTimeoutMaxMs,
9797
time,
98-
false,
98+
true,
9999
new ApiVersions,
100100
logContext,
101101
MetadataRecoveryStrategy.NONE

core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ import org.apache.kafka.clients.{ClientResponse, NetworkClient}
2525
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
2626
import org.apache.kafka.common.record.internal.RecordBatch
2727
import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
28-
import org.apache.kafka.common.utils.MockTime
28+
import org.apache.kafka.common.utils.{LogContext, MockTime}
29+
import org.apache.kafka.common.metrics.Metrics
2930
import org.apache.kafka.common.{Node, TopicPartition}
3031
import org.apache.kafka.coordinator.transaction.{TransactionMetadata, TransactionState}
3132
import org.apache.kafka.metadata.MetadataCache
3233
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
3334
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
34-
import org.apache.kafka.server.util.RequestAndCompletionHandler
35+
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}
3536
import org.junit.jupiter.api.Assertions._
3637
import org.junit.jupiter.api.Test
3738
import org.junit.jupiter.params.ParameterizedTest
@@ -627,6 +628,35 @@ class TransactionMarkerChannelManagerTest {
627628
})
628629
}
629630

631+
@Test
632+
def shouldEnableApiVersionDiscoveryInFactoryMethod(): Unit = {
633+
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1))
634+
val metrics = new Metrics()
635+
val logContext = new LogContext()
636+
try {
637+
val channelManager = TransactionMarkerChannelManager(
638+
config,
639+
metrics,
640+
metadataCache,
641+
txnStateManager,
642+
time,
643+
logContext
644+
)
645+
try {
646+
val field = classOf[InterBrokerSendThread].getDeclaredField("networkClient")
647+
field.setAccessible(true)
648+
val client = field.get(channelManager).asInstanceOf[NetworkClient]
649+
assertTrue(client.discoverBrokerVersions(),
650+
"TransactionMarkerChannelManager should enable API version discovery to " +
651+
"ensure compatibility during rolling upgrades")
652+
} finally {
653+
channelManager.shutdown()
654+
}
655+
} finally {
656+
metrics.close()
657+
}
658+
}
659+
630660
/**
631661
* Adjusts the transaction metadata based on the transaction version.
632662
* When transaction V2 is enabled, the producer epoch is incremented

tests/kafkatest/tests/core/transactions_mixed_versions_test.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from kafkatest.utils.transactions_utils import create_and_start_copiers
2323
from kafkatest.version import LATEST_3_3, LATEST_3_4, LATEST_3_5, \
2424
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, LATEST_4_0, \
25-
LATEST_4_1, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
25+
LATEST_4_1, LATEST_4_2, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
2626

2727
from ducktape.tests.test import Test
2828
from ducktape.mark import matrix
@@ -147,30 +147,41 @@ def copy_messages_transactionally(self, input_topic, output_topic,
147147

148148
return self.drain_consumer(concurrent_consumer, num_messages_to_copy)
149149

150-
def setup_topics(self):
151-
assignment = ":".join(map(str, [self.kafka.idx(node) for node in self.kafka.nodes]))
152-
transaction_assignment = ",".join(map(str, [assignment[::-1]] * 50))
150+
def setup_topics(self, txn_state_leader_node, data_leader_node):
151+
all_node_ids = [self.kafka.idx(node) for node in self.kafka.nodes]
152+
153+
# Data topics: data_leader_node first
154+
data_leader_id = self.kafka.idx(data_leader_node)
155+
data_replicas = [data_leader_id] + [nid for nid in all_node_ids if nid != data_leader_id]
156+
data_assignment = ":".join(map(str, data_replicas))
157+
158+
# Internal topics: txn_state_leader_node first
159+
txn_leader_id = self.kafka.idx(txn_state_leader_node)
160+
txn_replicas = [txn_leader_id] + [nid for nid in all_node_ids if nid != txn_leader_id]
161+
txn_assignment = ":".join(map(str, txn_replicas))
162+
internal_topic_assignment = ",".join([txn_assignment] * 50)
163+
153164
self.kafka.topics = {
154165
self.input_topic: {
155166
"partitions": self.num_input_partitions,
156167
"replication-factor": self.replication_factor,
157-
"replica-assignment": assignment,
168+
"replica-assignment": data_assignment,
158169
"configs": {
159170
"min.insync.replicas": 2
160171
}
161172
},
162173
self.output_topic: {
163174
"partitions": self.num_output_partitions,
164175
"replication-factor": self.replication_factor,
165-
"replica-assignment": assignment,
176+
"replica-assignment": data_assignment,
166177
"configs": {
167178
"min.insync.replicas": 2
168179
}
169180
},
170181
"__transaction_state": {
171182
"partitions": 50,
172183
"replication-factor": self.replication_factor,
173-
"replica-assignment": transaction_assignment,
184+
"replica-assignment": internal_topic_assignment,
174185
"configs": {
175186
"min.insync.replicas": 2
176187
}
@@ -179,11 +190,12 @@ def setup_topics(self):
179190

180191
@cluster(num_nodes=8)
181192
@matrix(
182-
old_kafka_version=[str(LATEST_4_1), str(LATEST_4_0), str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3)],
193+
old_kafka_version=[str(LATEST_4_2), str(LATEST_4_1), str(LATEST_4_0), str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3)],
183194
metadata_quorum=[isolated_kraft],
184-
group_protocol=[None]
195+
group_protocol=[None],
196+
coordinator_on_new_broker=[True, False]
185197
)
186-
def test_transactions_mixed_versions(self, old_kafka_version, metadata_quorum=quorum.isolated_kraft, group_protocol=None):
198+
def test_transactions_mixed_versions(self, old_kafka_version, metadata_quorum=quorum.isolated_kraft, group_protocol=None, coordinator_on_new_broker=True):
187199
oldKafkaVersion = KafkaVersion(old_kafka_version)
188200
self.kafka = KafkaService(self.test_context,
189201
num_nodes=self.num_brokers,
@@ -193,6 +205,9 @@ def test_transactions_mixed_versions(self, old_kafka_version, metadata_quorum=qu
193205

194206
self.kafka.nodes[0].version = DEV_BRANCH
195207

208+
new_node = self.kafka.nodes[0] # DEV_BRANCH
209+
old_node = self.kafka.nodes[1] # old version
210+
196211
security_protocol = 'PLAINTEXT'
197212
self.kafka.security_protocol = security_protocol
198213
self.kafka.interbroker_security_protocol = security_protocol
@@ -202,7 +217,10 @@ def test_transactions_mixed_versions(self, old_kafka_version, metadata_quorum=qu
202217

203218
self.kafka.log_level = "DEBUG"
204219

205-
self.setup_topics()
220+
if coordinator_on_new_broker:
221+
self.setup_topics(txn_state_leader_node=new_node, data_leader_node=old_node)
222+
else:
223+
self.setup_topics(txn_state_leader_node=old_node, data_leader_node=new_node)
206224
self.kafka.start()
207225

208226
input_messages = self.seed_messages(self.input_topic, self.num_seed_messages)

0 commit comments

Comments
 (0)