|
20 | 20 | package org.apache.druid.testing.embedded.indexing; |
21 | 21 |
|
22 | 22 | import org.apache.druid.testing.embedded.EmbeddedDruidCluster; |
23 | | -import org.apache.kafka.clients.admin.Admin; |
24 | | -import org.apache.kafka.clients.admin.ConfigEntry; |
25 | | -import org.apache.kafka.clients.admin.DescribeConfigsResult; |
26 | 23 | import org.apache.kafka.clients.consumer.AcknowledgeType; |
27 | 24 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
28 | 25 | import org.apache.kafka.clients.consumer.ConsumerRecords; |
29 | 26 | import org.apache.kafka.clients.consumer.KafkaShareConsumer; |
30 | 27 | import org.apache.kafka.clients.producer.KafkaProducer; |
31 | 28 | import org.apache.kafka.clients.producer.ProducerRecord; |
32 | | -import org.apache.kafka.common.config.ConfigResource; |
33 | 29 | import org.apache.kafka.common.serialization.ByteArrayDeserializer; |
34 | 30 | import org.apache.kafka.common.serialization.ByteArraySerializer; |
35 | 31 | import org.junit.jupiter.api.AfterEach; |
@@ -179,29 +175,8 @@ public void probe_recordsProducedAfterSubscribe_areConsumed() throws Exception |
179 | 175 | final String groupId = "probe_group_" + System.currentTimeMillis(); |
180 | 176 |
|
181 | 177 | kafkaServer.createTopicWithPartitions(topic, 2); |
182 | | - |
183 | | - try (Admin admin = kafkaServer.newAdminClient()) { |
184 | | - final ConfigResource brokerCfg = new ConfigResource(ConfigResource.Type.BROKER, "1"); |
185 | | - final DescribeConfigsResult res = admin.describeConfigs(List.of(brokerCfg)); |
186 | | - for (ConfigEntry e : res.all().get().get(brokerCfg).entries()) { |
187 | | - if (e.name().startsWith("group.share")) { |
188 | | - System.out.println("BROKER_CFG: " + e.name() + "=" + e.value() + " source=" + e.source()); |
189 | | - } |
190 | | - } |
191 | | - } |
192 | | - |
193 | 178 | kafkaServer.setShareGroupAutoOffsetReset(groupId, "earliest"); |
194 | 179 |
|
195 | | - try (Admin admin = kafkaServer.newAdminClient()) { |
196 | | - final ConfigResource grp = new ConfigResource(ConfigResource.Type.GROUP, groupId); |
197 | | - final DescribeConfigsResult res = admin.describeConfigs(List.of(grp)); |
198 | | - for (ConfigEntry e : res.all().get().get(grp).entries()) { |
199 | | - if (e.name().startsWith("share")) { |
200 | | - System.out.println("GROUP_CFG: " + e.name() + "=" + e.value() + " source=" + e.source()); |
201 | | - } |
202 | | - } |
203 | | - } |
204 | | - |
205 | 180 | final Properties props = new Properties(); |
206 | 181 | props.put("bootstrap.servers", kafkaServer.getBootstrapServerUrl()); |
207 | 182 | props.put("group.id", groupId); |
|
0 commit comments