Skip to content

Commit 8756298

Browse files
committed
KAFKA-18201: testGroupMetadataMessageFormatter fails for new consumer protocol
Signed-off-by: PoAn Yang <[email protected]>
1 parent ae661de commit 8756298

File tree

1 file changed

+46
-7
lines changed

1 file changed

+46
-7
lines changed

tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
import org.apache.kafka.clients.consumer.GroupProtocol;
2425
import org.apache.kafka.clients.consumer.KafkaConsumer;
2526
import org.apache.kafka.clients.consumer.MockConsumer;
26-
import org.apache.kafka.clients.consumer.RangeAssignor;
2727
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
2828
import org.apache.kafka.clients.producer.KafkaProducer;
2929
import org.apache.kafka.clients.producer.Producer;
@@ -34,6 +34,7 @@
3434
import org.apache.kafka.common.internals.Topic;
3535
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3636
import org.apache.kafka.common.serialization.ByteArraySerializer;
37+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
3738
import org.apache.kafka.common.test.api.ClusterInstance;
3839
import org.apache.kafka.common.test.api.ClusterTest;
3940
import org.apache.kafka.common.test.api.ClusterTestExtensions;
@@ -75,19 +76,22 @@
7576
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
7677
import static org.apache.kafka.clients.consumer.ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG;
7778
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
79+
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
7880
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
7981
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
80-
import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
8182
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
8283
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
8384
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
8485
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
8586
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
8687
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
88+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
89+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
8790
import static org.junit.jupiter.api.Assertions.assertEquals;
8891
import static org.junit.jupiter.api.Assertions.assertNotNull;
8992
import static org.junit.jupiter.api.Assertions.assertNull;
9093
import static org.junit.jupiter.api.Assertions.assertThrows;
94+
import static org.junit.jupiter.api.Assertions.assertTrue;
9195
import static org.mockito.ArgumentMatchers.any;
9296
import static org.mockito.ArgumentMatchers.eq;
9397
import static org.mockito.Mockito.mock;
@@ -374,8 +378,11 @@ public void testOffsetsMessageFormatter(ClusterInstance cluster) throws Exceptio
374378
}
375379
}
376380

377-
@ClusterTest(brokers = 3)
378-
public void testGroupMetadataMessageFormatter(ClusterInstance cluster) throws Exception {
381+
@ClusterTest(serverProperties = {
382+
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
383+
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
384+
})
385+
public void testGroupMetadataMessageFormatterWithClassicGroupProtocol(ClusterInstance cluster) throws Exception {
379386
try (Admin admin = cluster.admin()) {
380387

381388
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
@@ -388,7 +395,7 @@ public void testGroupMetadataMessageFormatter(ClusterInstance cluster) throws Ex
388395

389396
ConsoleConsumerOptions options = new ConsoleConsumerOptions(groupMetadataMessageFormatter);
390397
ConsoleConsumer.ConsumerWrapper consumerWrapper =
391-
new ConsoleConsumer.ConsumerWrapper(options, createGroupMetaDataConsumer(cluster));
398+
new ConsoleConsumer.ConsumerWrapper(options, createGroupMetaDataConsumer(cluster, GroupProtocol.CLASSIC));
392399

393400
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
394401
PrintStream output = new PrintStream(out)) {
@@ -418,6 +425,38 @@ public void testGroupMetadataMessageFormatter(ClusterInstance cluster) throws Ex
418425
}
419426
}
420427

428+
@ClusterTest(serverProperties = {
429+
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
430+
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
431+
})
432+
public void testGroupMetadataMessageFormatterWithConsumerGroupProtocol(ClusterInstance cluster) throws Exception {
433+
try (Admin admin = cluster.admin()) {
434+
435+
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
436+
admin.createTopics(singleton(newTopic));
437+
produceMessages(cluster);
438+
439+
String[] groupMetadataMessageFormatter = createConsoleConsumerArgs(cluster,
440+
Topic.GROUP_METADATA_TOPIC_NAME,
441+
"org.apache.kafka.tools.consumer.GroupMetadataMessageFormatter");
442+
443+
ConsoleConsumerOptions options = new ConsoleConsumerOptions(groupMetadataMessageFormatter);
444+
ConsoleConsumer.ConsumerWrapper consumerWrapper =
445+
new ConsoleConsumer.ConsumerWrapper(options, createGroupMetaDataConsumer(cluster, GroupProtocol.CONSUMER));
446+
447+
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
448+
PrintStream output = new PrintStream(out)) {
449+
ConsoleConsumer.process(1, options.formatter(), consumerWrapper, output, true);
450+
451+
JsonNode jsonNode = objectMapper.reader().readTree(out.toByteArray());
452+
// Consumer group protocol does not write GROUP_METADATA records, so the result is empty
453+
assertTrue(jsonNode.isEmpty());
454+
} finally {
455+
consumerWrapper.cleanup();
456+
}
457+
}
458+
}
459+
421460
private void produceMessagesWithTxn(ClusterInstance cluster) {
422461
try (Producer<byte[], byte[]> producer = createTxnProducer(cluster)) {
423462
producer.initTransactions();
@@ -462,9 +501,10 @@ private Consumer<byte[], byte[]> createOffsetConsumer(ClusterInstance cluster) {
462501
return new KafkaConsumer<>(props);
463502
}
464503

465-
private Consumer<byte[], byte[]> createGroupMetaDataConsumer(ClusterInstance cluster) {
504+
private Consumer<byte[], byte[]> createGroupMetaDataConsumer(ClusterInstance cluster, GroupProtocol groupProtocol) {
466505
Properties props = consumerProps(cluster);
467506
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
507+
props.put(GROUP_PROTOCOL_CONFIG, groupProtocol.name);
468508
return new KafkaConsumer<>(props);
469509
}
470510

@@ -481,7 +521,6 @@ private Properties consumerProps(ClusterInstance cluster) {
481521
props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
482522
props.put(KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
483523
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
484-
props.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
485524
props.put(GROUP_ID_CONFIG, groupId);
486525
return props;
487526
}

0 commit comments

Comments
 (0)