Skip to content

Commit 58d208f

Browse files
authored
KAFKA-20282: Add message to classic consumer startup guiding to next-gen protocol (apache#21900)
Add info msg if Consumer created using the classic protocol, to guide users towards the next-gen Consumer rebalance protocol (GA). Show only if using group rebalance capabilities (group.id defined). Exclude Streams applications to avoid showing the log on streams (should follow separate evolution to the new streams protocol). This is the first step in KIP-1274 (evolution path towards the new Consumer protocol) Reviewers: Andrew Schofield <aschofield@confluent.io>, Lan Ding <isDing_L@163.com>
1 parent 7303c1e commit 58d208f

3 files changed

Lines changed: 97 additions & 0 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.HashMap;
7373
import java.util.HashSet;
7474
import java.util.List;
75+
import java.util.Locale;
7576
import java.util.Map;
7677
import java.util.Optional;
7778
import java.util.OptionalLong;
@@ -212,6 +213,22 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
212213
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
213214
);
214215

216+
// If the classic rebalance protocol is used, log message to guide users towards upgrading to the
217+
// next-generation consumer rebalance protocol
218+
if (groupId.isPresent()) {
219+
boolean isStreamsConsumer = assignors.stream()
220+
.anyMatch(a -> a.getClass().getName().contains("StreamsPartitionAssignor"));
221+
if (!isStreamsConsumer) {
222+
log.info("\n" +
223+
"****************************************************************\n" +
224+
"* The consumer rebalance protocol (KIP-848) is production ready!\n" +
225+
"* Set the consumer configuration {}={} to try it out.\n" +
226+
"* See https://kafka.apache.org/documentation/#consumer_rebalance_protocol\n" +
227+
"****************************************************************",
228+
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
229+
}
230+
}
231+
215232
// no coordinator will be constructed for the default (null) group id
216233
if (groupId.isEmpty()) {
217234
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113

114114
import org.apache.logging.log4j.Level;
115115
import org.junit.jupiter.api.AfterEach;
116+
import org.junit.jupiter.api.Test;
116117
import org.junit.jupiter.api.Timeout;
117118
import org.junit.jupiter.params.ParameterizedTest;
118119
import org.junit.jupiter.params.provider.EnumSource;
@@ -620,6 +621,72 @@ public void testConstructorInvalidMetricReporters(GroupProtocol groupProtocol) {
620621
assertEquals("Class an.invalid.class cannot be found", e.getCause().getMessage());
621622
}
622623

624+
@Test
625+
public void testClassicProtocolLogsRecommendationToTryConsumerProtocol() {
626+
Properties props = new Properties();
627+
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
628+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
629+
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
630+
631+
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
632+
appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
633+
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
634+
assertTrue(
635+
appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
636+
"Log message about consumer protocol not showing as expected when starting a consumer using the classic protocol"
637+
);
638+
}
639+
}
640+
641+
@Test
642+
public void testConsumerProtocolDoesNotLogRecommendation() {
643+
Properties props = new Properties();
644+
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name());
645+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
646+
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
647+
648+
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
649+
appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
650+
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
651+
assertFalse(
652+
appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
653+
"Should not log recommendation when already using consumer protocol"
654+
);
655+
}
656+
}
657+
658+
@Test
659+
public void testDefaultProtocolLogsRecommendationToTryConsumerProtocol() {
660+
Properties props = new Properties();
661+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
662+
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
663+
664+
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
665+
appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
666+
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
667+
assertTrue(
668+
appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
669+
"Log message about consumer protocol not showing as expected when starting a consumer using the default (classic) protocol"
670+
);
671+
}
672+
}
673+
674+
@Test
675+
public void testNoGroupIdDoesNotLogGroupProtocolMessage() {
676+
Properties props = new Properties();
677+
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
678+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
679+
680+
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
681+
appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
682+
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
683+
assertFalse(
684+
appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
685+
"Should not log recommendation when no group.id is set"
686+
);
687+
}
688+
}
689+
623690
@ParameterizedTest
624691
@EnumSource(GroupProtocol.class)
625692
public void testOsDefaultSocketBufferSizes(GroupProtocol groupProtocol) {

tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.apache.kafka.tools.consumer;
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2021
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.GroupProtocol;
2123
import org.apache.kafka.clients.consumer.KafkaConsumer;
2224
import org.apache.kafka.common.MessageFormatter;
2325
import org.apache.kafka.common.TopicPartition;
@@ -70,6 +72,7 @@ public static void main(String[] args) throws Exception {
7072
public static void run(ConsoleConsumerOptions opts) {
7173
messageCount = 0;
7274
Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
75+
maybePrintConsumerProtocolMessage(opts);
7376
ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts, consumer);
7477

7578
addShutdownHook(consumerWrapper, opts);
@@ -145,6 +148,16 @@ static boolean checkErr(PrintStream output) {
145148
return gotError;
146149
}
147150

151+
static void maybePrintConsumerProtocolMessage(ConsoleConsumerOptions opts) {
152+
String protocol = (String) opts.consumerProps().get(ConsumerConfig.GROUP_PROTOCOL_CONFIG);
153+
if (protocol == null || GroupProtocol.CLASSIC.name().equalsIgnoreCase(protocol)) {
154+
// Only print if INFO logging is not enabled (otherwise ClassicKafkaConsumer already logs it)
155+
if (!LoggerFactory.getLogger("org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer").isInfoEnabled()) {
156+
System.err.println("The consumer rebalance protocol (KIP-848) is production ready! Set group.protocol=consumer to try it out. See https://kafka.apache.org/documentation/#consumer_rebalance_protocol");
157+
}
158+
}
159+
}
160+
148161
public static class ConsumerWrapper {
149162
final Time time = Time.SYSTEM;
150163
final long timeoutMs;

0 commit comments

Comments
 (0)