Skip to content

Commit a8f0c91

Browse files
committed
fix test
1 parent afede12 commit a8f0c91

File tree

13 files changed

+88
-56
lines changed

13 files changed

+88
-56
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -936,7 +936,8 @@ private Map<String, Uuid> getTopicIdsFromBatches(List<ProducerBatch> batches) {
936936
return batches.stream()
937937
.collect(Collectors.toMap(
938938
b -> b.topicPartition.topic(),
939-
b -> metadata.topicIds().getOrDefault(b.topicPartition.topic(), Uuid.ZERO_UUID))
939+
b -> metadata.topicIds().getOrDefault(b.topicPartition.topic(), Uuid.ZERO_UUID),
940+
(existing, replacement) -> replacement)
940941
);
941942
}
942943

Diff for: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ public static Builder forMagic(byte magic, ProduceRequestData data) {
4949

5050
final short minVersion;
5151
final short maxVersion;
52-
if (magic < RecordBatch.MAGIC_VALUE_V2 || canNotSupportTopicId(data)) {
52+
if (magic < RecordBatch.MAGIC_VALUE_V2) {
5353
minVersion = 2;
5454
maxVersion = 2;
55+
} else if (canNotSupportTopicId(data)) {
56+
minVersion = 3;
57+
maxVersion = 11;
5558
} else {
5659
minVersion = 3;
5760
maxVersion = ApiKeys.PRODUCE.latestVersion();
@@ -60,7 +63,7 @@ public static Builder forMagic(byte magic, ProduceRequestData data) {
6063
}
6164

6265
private static boolean canNotSupportTopicId(ProduceRequestData data) {
63-
return data.topicData().stream().anyMatch(d -> d.topicId() == Uuid.ZERO_UUID);
66+
return data.topicData().stream().anyMatch(d -> d.topicId() == null || d.topicId() == Uuid.ZERO_UUID);
6467
}
6568

6669
public static Builder forCurrentMagic(ProduceRequestData data) {

Diff for: clients/src/main/java/org/apache/kafka/common/utils/Utils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1655,7 +1655,7 @@ public static void require(boolean requirement) {
16551655
}
16561656

16571657
/**
1658-
* Convert map keys to another type.
1658+
* Convert a map's keys to another type.
16591659
*/
16601660
public static <K1, K2, V> Map<K2, V> convertKeys(Map<K1, V> originalMap, Function<K1, K2> converter) {
16611661
Map<K2, V> newMap = new HashMap<>();

Diff for: clients/src/test/java/org/apache/kafka/common/message/MessageTest.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,8 @@ private void testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(short from
915915
@Test
916916
public void testProduceResponseVersions() throws Exception {
917917
String topicName = "topic";
918+
Uuid topicId = Uuid.fromString("klZ9sa2rSvig6QpgGXzALT");
919+
918920
int partitionIndex = 0;
919921
short errorCode = Errors.INVALID_TOPIC_EXCEPTION.code();
920922
long baseOffset = 12L;
@@ -971,12 +973,11 @@ public void testProduceResponseVersions() throws Exception {
971973
responseData.setThrottleTimeMs(0);
972974
}
973975

974-
if (version <= 11) {
975-
responseData.responses().iterator().next().setName(topicName);
976-
}
977976

978977
if (version >= 12) {
979-
responseData.responses().iterator().next().setTopicId(Uuid.randomUuid());
978+
responseData.responses().iterator().next().setTopicId(topicId);
979+
} else {
980+
responseData.responses().iterator().next().setName(topicName);
980981
}
981982

982983
if (version >= 3 && version <= 4) {

Diff for: clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.kafka.common.requests;
1919

2020
import org.apache.kafka.common.InvalidRecordException;
21+
import org.apache.kafka.common.Uuid;
2122
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
2223
import org.apache.kafka.common.message.ProduceRequestData;
2324
import org.apache.kafka.common.protocol.ApiKeys;
@@ -120,15 +121,38 @@ public void testBuildWithCurrentMessageFormat() {
120121
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
121122
new ProduceRequestData()
122123
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
123-
new ProduceRequestData.TopicProduceData().setName("test").setPartitionData(Collections.singletonList(
124-
new ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build()))))
124+
new ProduceRequestData.TopicProduceData()
125+
.setName("test")
126+
.setTopicId(Uuid.fromString("H3Emm3vW7AKKO4NTRPaCWt"))
127+
.setPartitionData(Collections.singletonList(
128+
new ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build()))))
125129
.iterator()))
126130
.setAcks((short) 1)
127131
.setTimeoutMs(5000));
128132
assertEquals(3, requestBuilder.oldestAllowedVersion());
129133
assertEquals(ApiKeys.PRODUCE.latestVersion(), requestBuilder.latestAllowedVersion());
130134
}
131135

136+
@Test
137+
public void testBuildWithCurrentMessageFormatWithoutTopicId() {
138+
ByteBuffer buffer = ByteBuffer.allocate(256);
139+
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
140+
CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
141+
builder.append(10L, null, "a".getBytes());
142+
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
143+
new ProduceRequestData()
144+
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
145+
new ProduceRequestData.TopicProduceData()
146+
.setName("test")
147+
.setPartitionData(Collections.singletonList(
148+
new ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build()))))
149+
.iterator()))
150+
.setAcks((short) 1)
151+
.setTimeoutMs(5000));
152+
assertEquals(3, requestBuilder.oldestAllowedVersion());
153+
assertEquals(11, requestBuilder.latestAllowedVersion());
154+
}
155+
132156
@Test
133157
public void testV3AndAboveShouldContainOnlyOneRecordBatch() {
134158
ByteBuffer buffer = ByteBuffer.allocate(256);

Diff for: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ public void produceResponseV5Test() {
7474
@Test
7575
public void produceResponseVersionTest() {
7676
Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
77-
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "test");
77+
Uuid topicId = Uuid.fromString("5JkYABorYD4w0AQXe9TvBG");
78+
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, 0, "test");
7879
responseData.put(topicIdPartition, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
7980
ProduceResponse v0Response = new ProduceResponse(responseData);
8081
ProduceResponse v1Response = new ProduceResponse(responseData, 10);
@@ -103,7 +104,8 @@ public void produceResponseVersionTest() {
103104
@Test
104105
public void produceResponseRecordErrorsTest() {
105106
Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
106-
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), 0, "test");
107+
Uuid topicId = Uuid.fromString("4w0AQXe9TvBG5JkYABorYD");
108+
TopicIdPartition tp = new TopicIdPartition(topicId, 0, "test");
107109
ProduceResponse.PartitionResponse partResponse = new ProduceResponse.PartitionResponse(Errors.NONE,
108110
10000, RecordBatch.NO_TIMESTAMP, 100,
109111
Collections.singletonList(new ProduceResponse.RecordError(3, "Record error")),

Diff for: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() {
445445

446446
@Test
447447
public void testProduceRequestPartitionSize() {
448-
Uuid topicId = Uuid.randomUuid();
448+
Uuid topicId = Uuid.fromString("e9TvBGX5JkYAB0AQorYD4w");
449449
TopicIdPartition tp0 = new TopicIdPartition(topicId, 0, "test");
450450
TopicIdPartition tp1 = new TopicIdPartition(topicId, 1, "test");
451451
MemoryRecords records0 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2,
@@ -2368,15 +2368,17 @@ private ProduceRequest createProduceRequest(short version) {
23682368
@SuppressWarnings("deprecation")
23692369
private ProduceResponse createProduceResponse() {
23702370
Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
2371-
responseData.put(new TopicIdPartition(Uuid.randomUuid(), 0, "test"), new ProduceResponse.PartitionResponse(Errors.NONE,
2371+
Uuid topicId = Uuid.fromString("0AQorYD4we9TvBGX5JkYAB");
2372+
responseData.put(new TopicIdPartition(topicId, 0, "test"), new ProduceResponse.PartitionResponse(Errors.NONE,
23722373
10000, RecordBatch.NO_TIMESTAMP, 100));
23732374
return new ProduceResponse(responseData, 0);
23742375
}
23752376

23762377
@SuppressWarnings("deprecation")
23772378
private ProduceResponse createProduceResponseWithErrorMessage() {
23782379
Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
2379-
responseData.put(new TopicIdPartition(Uuid.randomUuid(), 0, "test"), new ProduceResponse.PartitionResponse(Errors.NONE,
2380+
Uuid topicId = Uuid.fromString("0AQorYD4we9TvBGX5JkYAB");
2381+
responseData.put(new TopicIdPartition(topicId, 0, "test"), new ProduceResponse.PartitionResponse(Errors.NONE,
23802382
10000, RecordBatch.NO_TIMESTAMP, 100, singletonList(new ProduceResponse.RecordError(0, "error message")),
23812383
"global error message"));
23822384
return new ProduceResponse(responseData, 0);

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

+5-6
Original file line numberDiff line numberDiff line change
@@ -608,13 +608,12 @@ class KafkaApis(val requestChannel: RequestChannel,
608608
val nonExistingTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]()
609609
val invalidRequestResponses = mutable.Map[TopicIdPartition, PartitionResponse]()
610610
val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]()
611-
val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, ProduceRequestData.PartitionProduceData)]
611+
val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, ProduceRequestData.PartitionProduceData)]
612612

613613
produceRequest.data.topicData.forEach { topic =>
614614
topic.partitionData.forEach { partition =>
615615
val topicIdIsMissing = topic.topicId == null || topic.topicId == Uuid.ZERO_UUID
616-
617-
val topicName: String = metadataCache.getTopicName(topic.topicId).getOrElse(topic.name)
616+
val topicName: String = if (topicIdIsMissing) topic.name else metadataCache.getTopicName(topic.topicId).getOrElse(topic.name)
618617
val topicId: Uuid = if (topicIdIsMissing) metadataCache.getTopicId(topicName) else topic.topicId
619618

620619
val topicPartition = new TopicPartition(topicName, partition.index())
@@ -623,13 +622,13 @@ class KafkaApis(val requestChannel: RequestChannel,
623622
else if (!metadataCache.contains(topicPartition))
624623
nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
625624
else
626-
partitionDatas += new TopicIdPartition(topicId, topicPartition) -> partition
625+
topicIdToPartitionData += new TopicIdPartition(topicId, topicPartition) -> partition
627626
}
628627
}
629628
// cache the result to avoid redundant authorization calls
630-
val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, partitionDatas)(_._1.topic)
629+
val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, topicIdToPartitionData)(_._1.topic)
631630

632-
partitionDatas.foreach { case (topicIdPartition, partition) =>
631+
topicIdToPartitionData.foreach { case (topicIdPartition, partition) =>
633632
// This caller assumes the type is MemoryRecords and that is true on current serialization
634633
// We cast the type to avoid causing big change to code base.
635634
// https://issues.apache.org/jira/browse/KAFKA-10698

Diff for: core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class CoordinatorPartitionWriterTest {
9999
Collections.emptyMap(),
100100
new Properties()
101101
)))
102-
val topicId = Uuid.randomUuid()
102+
val topicId = Uuid.fromString("JkYAB4AQe905orYDTvBGXw")
103103
when(replicaManager.getTopicIdPartition(tp)).thenReturn(new TopicIdPartition(topicId, tp))
104104

105105
val recordsCapture: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] =

Diff for: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class GroupCoordinatorTest {
9292
private val protocolSuperset = List((protocolName, metadata), ("roundrobin", metadata))
9393
private val requireStable = true
9494
private var groupPartitionId: Int = -1
95-
val groupMetadataTopicId = Uuid.randomUuid()
95+
val groupMetadataTopicId = Uuid.fromString("JaTH2JYK2ed2GzUapg8tgg")
9696

9797
// we use this string value since its hashcode % #.partitions is different
9898
private val otherGroupId = "otherGroup"

Diff for: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

+14-12
Original file line numberDiff line numberDiff line change
@@ -2458,7 +2458,7 @@ class KafkaApisTest extends Logging {
24582458
@Test
24592459
def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse(): Unit = {
24602460
val topic = "topic"
2461-
val topicId = Uuid.randomUuid()
2461+
val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
24622462
val tp = new TopicIdPartition(topicId, 0, "topic")
24632463
addTopicToMetadataCache(topic, numPartitions = 2, topicId = topicId)
24642464

@@ -2519,7 +2519,7 @@ class KafkaApisTest extends Logging {
25192519
@Test
25202520
def testProduceResponseContainsNewLeaderOnNotLeaderOrFollower(): Unit = {
25212521
val topic = "topic"
2522-
val topicId = Uuid.randomUuid()
2522+
val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
25232523
addTopicToMetadataCache(topic, numPartitions = 2, numBrokers = 3, topicId = topicId)
25242524

25252525
for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
@@ -2590,7 +2590,7 @@ class KafkaApisTest extends Logging {
25902590
@Test
25912591
def testProduceResponseReplicaManagerLookupErrorOnNotLeaderOrFollower(): Unit = {
25922592
val topic = "topic"
2593-
val topicId = Uuid.randomUuid()
2593+
val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
25942594
addTopicToMetadataCache(topic, numPartitions = 2, numBrokers = 3, topicId = topicId)
25952595

25962596
for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
@@ -2657,7 +2657,7 @@ class KafkaApisTest extends Logging {
26572657
@Test
26582658
def testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower(): Unit = {
26592659
val topic = "topic"
2660-
val topicId = Uuid.randomUuid()
2660+
val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
26612661
metadataCache = mock(classOf[ZkMetadataCache])
26622662

26632663
for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
@@ -2669,10 +2669,11 @@ class KafkaApisTest extends Logging {
26692669
val tp = new TopicIdPartition(topicId, 0, topic)
26702670

26712671
val topicProduceData = new ProduceRequestData.TopicProduceData()
2672-
.setName(tp.topic)
26732672

26742673
if (version >= 12 ) {
26752674
topicProduceData.setTopicId(topicId)
2675+
} else {
2676+
topicProduceData.setName(tp.topic)
26762677
}
26772678

26782679
val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData()
@@ -2707,8 +2708,8 @@ class KafkaApisTest extends Logging {
27072708
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
27082709
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
27092710
when(metadataCache.contains(tp.topicPartition())).thenAnswer(_ => true)
2710-
when(metadataCache.getTopicName(any())).thenReturn(Some(topicProduceData.name()))
2711-
when(metadataCache.getTopicId(any())).thenReturn(topicProduceData.topicId())
2711+
when(metadataCache.getTopicName(tp.topicId())).thenReturn(Some(tp.topic()))
2712+
when(metadataCache.getTopicId(tp.topic())).thenReturn(tp.topicId())
27122713
when(metadataCache.getPartitionInfo(tp.topic(), tp.partition())).thenAnswer(_ => Option.empty)
27132714
when(metadataCache.getAliveBrokerNode(any(), any())).thenReturn(Option.empty)
27142715
kafkaApis = createKafkaApis()
@@ -2732,7 +2733,8 @@ class KafkaApisTest extends Logging {
27322733
val topic = "topic"
27332734
val transactionalId = "txn1"
27342735

2735-
val tp = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
2736+
val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
2737+
val tp = new TopicIdPartition(topicId, 0, "topic")
27362738
addTopicToMetadataCache(topic, numPartitions = 2, topicId = tp.topicId())
27372739

27382740
for (version <- 3 to ApiKeys.PRODUCE.latestVersion) {
@@ -2913,7 +2915,7 @@ class KafkaApisTest extends Logging {
29132915
def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
29142916
val tp1 = new TopicPartition("t", 0)
29152917
val tp2 = new TopicPartition("t1", 0)
2916-
val topicId = Uuid.randomUuid()
2918+
val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
29172919
val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
29182920
val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava
29192921

@@ -3046,7 +3048,7 @@ class KafkaApisTest extends Logging {
30463048
def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
30473049
val tp1 = new TopicPartition("t", 0)
30483050
val tp2 = new TopicPartition("t1", 0)
3049-
val topicId = Uuid.randomUuid()
3051+
val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
30503052
val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
30513053
val expectedErrors = Map(tp1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION, tp2 -> Errors.NONE).asJava
30523054

@@ -3131,8 +3133,8 @@ class KafkaApisTest extends Logging {
31313133
val foo1 = new TopicPartition("foo", 1)
31323134

31333135
val topicIds = Map(
3134-
Topic.GROUP_METADATA_TOPIC_NAME -> Uuid.randomUuid(),
3135-
"foo" -> Uuid.randomUuid())
3136+
Topic.GROUP_METADATA_TOPIC_NAME -> Uuid.fromString("JaTH2JYK2ed2GzUapg8tgg"),
3137+
"foo" -> Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg"))
31363138
val allPartitions = List(
31373139
offset0,
31383140
offset1,

Diff for: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package kafka.server
2020
import java.nio.ByteBuffer
2121
import java.util.{Collections, Properties}
2222
import kafka.utils.TestUtils
23-
import org.apache.kafka.common.TopicPartition
23+
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
2424
import org.apache.kafka.common.config.TopicConfig
2525
import org.apache.kafka.common.message.ProduceRequestData
2626
import org.apache.kafka.common.protocol.Errors
@@ -91,6 +91,7 @@ class ProduceRequestTest extends BaseRequestTest {
9191
topicConfig.setProperty(messageTimeStampConfig, "1000")
9292
val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig)
9393
val leader = partitionToLeader(partition)
94+
val topicDescription = TestUtils.describeTopic(createAdminClient(), topic)
9495

9596
def createRecords(magicValue: Byte, timestamp: Long, codec: CompressionType): MemoryRecords = {
9697
val buf = ByteBuffer.allocate(512)
@@ -102,11 +103,12 @@ class ProduceRequestTest extends BaseRequestTest {
102103
}
103104

104105
val records = createRecords(RecordBatch.MAGIC_VALUE_V2, recordTimestamp, CompressionType.GZIP)
105-
val topicPartition = new TopicPartition("topic", partition)
106+
val topicPartition = new TopicIdPartition(topicDescription.topicId(), partition, "topic")
106107
val produceResponse = sendProduceRequest(leader, ProduceRequest.forCurrentMagic(new ProduceRequestData()
107108
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
108109
new ProduceRequestData.TopicProduceData()
109110
.setName(topicPartition.topic())
111+
.setTopicId(topicPartition.topicId())
110112
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
111113
.setIndex(topicPartition.partition())
112114
.setRecords(records)))).iterator))
@@ -118,7 +120,9 @@ class ProduceRequestTest extends BaseRequestTest {
118120
val topicProduceResponse = produceResponse.data.responses.asScala.head
119121
assertEquals(1, topicProduceResponse.partitionResponses.size)
120122
val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head
121-
val tp = new TopicPartition(topicProduceResponse.name, partitionProduceResponse.index)
123+
val tp = new TopicIdPartition(topicProduceResponse.topicId(),
124+
partitionProduceResponse.index,
125+
getTopicNames().get(topicProduceResponse.topicId()).getOrElse(""))
122126
assertEquals(topicPartition, tp)
123127
assertEquals(Errors.INVALID_TIMESTAMP, Errors.forCode(partitionProduceResponse.errorCode))
124128
// there are 3 records with InvalidTimestampException created from inner function createRecords

0 commit comments

Comments
 (0)