Skip to content

Commit 8c3602b

Browse files
committed
fix integration tests
1 parent a8f0c91 commit 8c3602b

File tree

2 files changed

+21
-16
lines changed

2 files changed

+21
-16
lines changed

Diff for: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

+16-13
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
9292
val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => Map[ApiKeys, Nothing => Errors](
9393
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
9494
ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
95-
val topicId: Uuid = topicNames.find(topicName => topicName._2 == topic).map(_._1).getOrElse(Uuid.ZERO_UUID)
95+
val topicId = topicNames.find(topicName => topicName._2 == topic).map(_._1).getOrElse(Uuid.ZERO_UUID)
96+
val topicName = if (version >= 12) "" else topic
9697
Errors.forCode(
9798
resp.data
98-
.responses.find(topic, topicId)
99+
.responses.find(topicName, topicId)
99100
.partitionResponses.asScala.find(_.index == part).get
100101
.errorCode
101102
)
@@ -250,18 +251,20 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
250251
new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build()
251252
}
252253

253-
private def createProduceRequest =
254+
private def createProduceRequestWithId(id: Uuid) = {
254255
requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
255-
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
256-
Collections.singletonList(new ProduceRequestData.TopicProduceData()
257-
.setName(tp.topic).setPartitionData(Collections.singletonList(
258-
new ProduceRequestData.PartitionProduceData()
259-
.setIndex(tp.partition)
260-
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))))))
261-
.iterator))
262-
.setAcks(1.toShort)
263-
.setTimeoutMs(5000))
256+
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
257+
Collections.singletonList(new ProduceRequestData.TopicProduceData()
258+
.setName(tp.topic).setTopicId(id).setPartitionData(Collections.singletonList(
259+
new ProduceRequestData.PartitionProduceData()
260+
.setIndex(tp.partition)
261+
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))))))
262+
.iterator))
263+
.setAcks(1.toShort)
264+
.setTimeoutMs(5000))
264265
.build()
266+
}
267+
private def createProduceRequest = createProduceRequestWithId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
265268

266269
private def createFetchRequest = {
267270
val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
@@ -708,7 +711,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
708711
val topicNames = Map(id -> "topic")
709712
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
710713
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false),
711-
ApiKeys.PRODUCE -> createProduceRequest,
714+
ApiKeys.PRODUCE -> createProduceRequestWithId(id),
712715
ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, ApiKeys.FETCH.latestVersion()),
713716
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
714717
ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,

Diff for: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala

+5-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.{DataInputStream, DataOutputStream}
2121
import java.net.Socket
2222
import java.nio.ByteBuffer
2323
import java.util.Collections
24-
2524
import kafka.integration.KafkaServerTestHarness
2625
import kafka.network.SocketServer
2726
import kafka.utils._
@@ -33,7 +32,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
3332
import org.apache.kafka.common.requests.{ProduceResponse, ResponseHeader}
3433
import org.apache.kafka.common.security.auth.SecurityProtocol
3534
import org.apache.kafka.common.utils.ByteUtils
36-
import org.apache.kafka.common.{TopicPartition, requests}
35+
import org.apache.kafka.common.{TopicPartition, Uuid, requests}
3736
import org.apache.kafka.server.config.ServerLogConfigs
3837
import org.junit.jupiter.api.Assertions._
3938
import org.junit.jupiter.params.ParameterizedTest
@@ -129,10 +128,13 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
129128
val version = ApiKeys.PRODUCE.latestVersion: Short
130129
val (serializedBytes, responseHeaderVersion) = {
131130
val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, "", correlationId)
131+
val topicId = getTopicIds().getOrElse(topicPartition.topic(), Uuid.ZERO_UUID)
132132
val request = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
133133
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
134134
Collections.singletonList(new ProduceRequestData.TopicProduceData()
135-
.setName(topicPartition.topic()).setPartitionData(Collections.singletonList(
135+
.setName(topicPartition.topic())
136+
.setTopicId(topicId)
137+
.setPartitionData(Collections.singletonList(
136138
new ProduceRequestData.PartitionProduceData()
137139
.setIndex(topicPartition.partition())
138140
.setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes))))))

0 commit comments

Comments
 (0)