@@ -1052,8 +1052,8 @@ class KafkaApisTest extends Logging {
10521052 .setMemberId("member")
10531053 .setTopics(util.List.of(
10541054 new OffsetCommitRequestData.OffsetCommitRequestTopic()
1055- .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID )
1056- .setName(if (version < 10) topicName else "" )
1055+ .setTopicId(topicId)
1056+ .setName(topicName)
10571057 .setPartitions(util.List.of(
10581058 new OffsetCommitRequestData.OffsetCommitRequestPartition()
10591059 .setPartitionIndex(0)
@@ -1113,8 +1113,8 @@ class KafkaApisTest extends Logging {
11131113 .setMemberId("member")
11141114 .setTopics(util.List.of(
11151115 new OffsetCommitRequestData.OffsetCommitRequestTopic()
1116- .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID )
1117- .setName(if (version < 10) topicName else "" )
1116+ .setTopicId(topicId)
1117+ .setName(topicName)
11181118 .setPartitions(util.List.of(
11191119 new OffsetCommitRequestData.OffsetCommitRequestPartition()
11201120 .setPartitionIndex(0)
@@ -1162,13 +1162,15 @@ class KafkaApisTest extends Logging {
11621162 assertEquals(expectedOffsetCommitResponse, response.data)
11631163 }
11641164
1165- @Test
1166- def testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds(): Unit = {
1165+ @ParameterizedTest
1166+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
1167+ def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(version: Short): Unit = {
11671168 val fooId = Uuid.randomUuid()
11681169 val barId = Uuid.randomUuid()
11691170 val zarId = Uuid.randomUuid()
11701171 val fooName = "foo"
11711172 val barName = "bar"
1173+ val zarName = "zar"
11721174 addTopicToMetadataCache(fooName, topicId = fooId, numPartitions = 2)
11731175 addTopicToMetadataCache(barName, topicId = barId, numPartitions = 2)
11741176
@@ -1179,6 +1181,7 @@ class KafkaApisTest extends Logging {
11791181 // foo exists but only has 2 partitions.
11801182 new OffsetCommitRequestData.OffsetCommitRequestTopic()
11811183 .setTopicId(fooId)
1184+ .setName(fooName)
11821185 .setPartitions(util.List.of(
11831186 new OffsetCommitRequestData.OffsetCommitRequestPartition()
11841187 .setPartitionIndex(0)
@@ -1192,6 +1195,7 @@ class KafkaApisTest extends Logging {
11921195 // bar exists.
11931196 new OffsetCommitRequestData.OffsetCommitRequestTopic()
11941197 .setTopicId(barId)
1198+ .setName(barName)
11951199 .setPartitions(util.List.of(
11961200 new OffsetCommitRequestData.OffsetCommitRequestPartition()
11971201 .setPartitionIndex(0)
@@ -1202,6 +1206,7 @@ class KafkaApisTest extends Logging {
12021206 // zar does not exist.
12031207 new OffsetCommitRequestData.OffsetCommitRequestTopic()
12041208 .setTopicId(zarId)
1209+ .setName(zarName)
12051210 .setPartitions(util.List.of(
12061211 new OffsetCommitRequestData.OffsetCommitRequestPartition()
12071212 .setPartitionIndex(0)
@@ -1210,7 +1215,9 @@ class KafkaApisTest extends Logging {
12101215 .setPartitionIndex(1)
12111216 .setCommittedOffset(70)))))
12121217
1213- val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build())
1218+ val requestChannelRequest = buildRequest(
1219+ OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build(version)
1220+ )
12141221
12151222 // This is the request expected by the group coordinator.
12161223 val expectedOffsetCommitRequest = new OffsetCommitRequestData()
@@ -1255,159 +1262,8 @@ class KafkaApisTest extends Logging {
12551262 val offsetCommitResponse = new OffsetCommitResponseData()
12561263 .setTopics(util.List.of(
12571264 new OffsetCommitResponseData.OffsetCommitResponseTopic()
1258- .setTopicId(fooId)
1259- .setName(fooName)
1260- .setPartitions(util.List.of(
1261- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1262- .setPartitionIndex(0)
1263- .setErrorCode(Errors.NONE.code),
1264- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1265- .setPartitionIndex(1)
1266- .setErrorCode(Errors.NONE.code))),
1267- new OffsetCommitResponseData.OffsetCommitResponseTopic()
1268- .setTopicId(barId)
1269- .setName(barName)
1270- .setPartitions(util.List.of(
1271- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1272- .setPartitionIndex(0)
1273- .setErrorCode(Errors.NONE.code),
1274- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1275- .setPartitionIndex(1)
1276- .setErrorCode(Errors.NONE.code)))))
1277-
1278- val expectedOffsetCommitResponse = new OffsetCommitResponseData()
1279- .setTopics(util.List.of(
1280- new OffsetCommitResponseData.OffsetCommitResponseTopic()
1281- .setTopicId(fooId)
1282- .setPartitions(util.List.of(
1283- // foo-2 is first because partitions failing the validation
1284- // are put in the response first.
1285- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1286- .setPartitionIndex(2)
1287- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
1288- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1289- .setPartitionIndex(0)
1290- .setErrorCode(Errors.NONE.code),
1291- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1292- .setPartitionIndex(1)
1293- .setErrorCode(Errors.NONE.code))),
1294- // zar is before bar because topics failing the validation are
1295- // put in the response first.
1296- new OffsetCommitResponseData.OffsetCommitResponseTopic()
1297- .setTopicId(zarId)
1298- .setPartitions(util.List.of(
1299- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1300- .setPartitionIndex(0)
1301- .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
1302- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1303- .setPartitionIndex(1)
1304- .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))),
1305- new OffsetCommitResponseData.OffsetCommitResponseTopic()
1306- .setTopicId(barId)
1307- .setPartitions(util.List.of(
1308- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1309- .setPartitionIndex(0)
1310- .setErrorCode(Errors.NONE.code),
1311- new OffsetCommitResponseData.OffsetCommitResponsePartition()
1312- .setPartitionIndex(1)
1313- .setErrorCode(Errors.NONE.code)))))
1314-
1315- future.complete(offsetCommitResponse)
1316- val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
1317- assertEquals(expectedOffsetCommitResponse, response.data)
1318- }
1319-
1320- @Test
1321- def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
1322- val fooId = Uuid.randomUuid()
1323- val barId = Uuid.randomUuid()
1324- addTopicToMetadataCache("foo", numPartitions = 2, topicId = fooId)
1325- addTopicToMetadataCache("bar", numPartitions = 2, topicId = barId)
1326-
1327- val offsetCommitRequest = new OffsetCommitRequestData()
1328- .setGroupId("group")
1329- .setMemberId("member")
1330- .setTopics(util.List.of(
1331- // foo exists but only has 2 partitions.
1332- new OffsetCommitRequestData.OffsetCommitRequestTopic()
1333- .setName("foo")
1334- .setPartitions(util.List.of(
1335- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1336- .setPartitionIndex(0)
1337- .setCommittedOffset(10),
1338- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1339- .setPartitionIndex(1)
1340- .setCommittedOffset(20),
1341- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1342- .setPartitionIndex(2)
1343- .setCommittedOffset(30))),
1344- // bar exists.
1345- new OffsetCommitRequestData.OffsetCommitRequestTopic()
1346- .setName("bar")
1347- .setPartitions(util.List.of(
1348- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1349- .setPartitionIndex(0)
1350- .setCommittedOffset(40),
1351- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1352- .setPartitionIndex(1)
1353- .setCommittedOffset(50))),
1354- // zar does not exist.
1355- new OffsetCommitRequestData.OffsetCommitRequestTopic()
1356- .setName("zar")
1357- .setPartitions(util.List.of(
1358- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1359- .setPartitionIndex(0)
1360- .setCommittedOffset(60),
1361- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1362- .setPartitionIndex(1)
1363- .setCommittedOffset(70)))))
1364-
1365- val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequest).build())
1366-
1367- // This is the request expected by the group coordinator.
1368- val expectedOffsetCommitRequest = new OffsetCommitRequestData()
1369- .setGroupId("group")
1370- .setMemberId("member")
1371- .setTopics(util.List.of(
1372- // foo exists but only has 2 partitions.
1373- new OffsetCommitRequestData.OffsetCommitRequestTopic()
1374- .setName("foo")
1375- .setTopicId(fooId)
1376- .setPartitions(util.List.of(
1377- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1378- .setPartitionIndex(0)
1379- .setCommittedOffset(10),
1380- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1381- .setPartitionIndex(1)
1382- .setCommittedOffset(20))),
1383- new OffsetCommitRequestData.OffsetCommitRequestTopic()
1384- .setName("bar")
1385- .setTopicId(barId)
1386- .setPartitions(util.List.of(
1387- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1388- .setPartitionIndex(0)
1389- .setCommittedOffset(40),
1390- new OffsetCommitRequestData.OffsetCommitRequestPartition()
1391- .setPartitionIndex(1)
1392- .setCommittedOffset(50)))))
1393-
1394- val future = new CompletableFuture[OffsetCommitResponseData]()
1395- when(groupCoordinator.commitOffsets(
1396- requestChannelRequest.context,
1397- expectedOffsetCommitRequest,
1398- RequestLocal.noCaching.bufferSupplier
1399- )).thenReturn(future)
1400- kafkaApis = createKafkaApis()
1401- kafkaApis.handle(
1402- requestChannelRequest,
1403- RequestLocal.noCaching
1404- )
1405-
1406- // This is the response returned by the group coordinator.
1407- val offsetCommitResponse = new OffsetCommitResponseData()
1408- .setTopics(util.List.of(
1409- new OffsetCommitResponseData.OffsetCommitResponseTopic()
1410- .setName("foo")
1265+ .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
1266+ .setName(if (version < 10) fooName else "")
14111267 .setPartitions(util.List.of(
14121268 new OffsetCommitResponseData.OffsetCommitResponsePartition()
14131269 .setPartitionIndex(0)
@@ -1416,7 +1272,8 @@ class KafkaApisTest extends Logging {
14161272 .setPartitionIndex(1)
14171273 .setErrorCode(Errors.NONE.code))),
14181274 new OffsetCommitResponseData.OffsetCommitResponseTopic()
1419- .setName("bar")
1275+ .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
1276+ .setName(if (version < 10) barName else "")
14201277 .setPartitions(util.List.of(
14211278 new OffsetCommitResponseData.OffsetCommitResponsePartition()
14221279 .setPartitionIndex(0)
@@ -1425,10 +1282,13 @@ class KafkaApisTest extends Logging {
14251282 .setPartitionIndex(1)
14261283 .setErrorCode(Errors.NONE.code)))))
14271284
1285+ // For v10+, the unknown topic returns UNKNOWN_TOPIC_ID; for v0-9 it returns
1286+ // UNKNOWN_TOPIC_OR_PARTITION.
14281287 val expectedOffsetCommitResponse = new OffsetCommitResponseData()
14291288 .setTopics(util.List.of(
14301289 new OffsetCommitResponseData.OffsetCommitResponseTopic()
1431- .setName("foo")
1290+ .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
1291+ .setName(if (version < 10) fooName else "")
14321292 .setPartitions(util.List.of(
14331293 // foo-2 is first because partitions failing the validation
14341294 // are put in the response first.
@@ -1444,16 +1304,18 @@ class KafkaApisTest extends Logging {
14441304 // zar is before bar because topics failing the validation are
14451305 // put in the response first.
14461306 new OffsetCommitResponseData.OffsetCommitResponseTopic()
1447- .setName("zar")
1307+ .setTopicId(if (version >= 10) zarId else Uuid.ZERO_UUID)
1308+ .setName(if (version < 10) zarName else "")
14481309 .setPartitions(util.List.of(
14491310 new OffsetCommitResponseData.OffsetCommitResponsePartition()
14501311 .setPartitionIndex(0)
1451- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
1312+ .setErrorCode(if (version >= 10) Errors.UNKNOWN_TOPIC_ID.code else Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
14521313 new OffsetCommitResponseData.OffsetCommitResponsePartition()
14531314 .setPartitionIndex(1)
1454- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))),
1315+ .setErrorCode(if (version >= 10) Errors.UNKNOWN_TOPIC_ID.code else Errors.UNKNOWN_TOPIC_OR_PARTITION.code))),
14551316 new OffsetCommitResponseData.OffsetCommitResponseTopic()
1456- .setName("bar")
1317+ .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
1318+ .setName(if (version < 10) barName else "")
14571319 .setPartitions(util.List.of(
14581320 new OffsetCommitResponseData.OffsetCommitResponsePartition()
14591321 .setPartitionIndex(0)
0 commit comments