Skip to content

Commit 7e429b0

Browse files
committed
List internal topics as well for replicas table
1 parent a9f9e7c commit 7e429b0

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

src/main/java/kmql/table/ReplicasTable.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Set;
99

1010
import org.apache.kafka.clients.admin.AdminClient;
11+
import org.apache.kafka.clients.admin.ListTopicsOptions;
1112
import org.apache.kafka.clients.admin.TopicDescription;
1213
import org.apache.kafka.common.Node;
1314
import org.apache.kafka.common.TopicPartitionInfo;
@@ -37,7 +38,8 @@ public void create(Connection connection) throws Exception {
3738

3839
@Override
3940
public void prepare(Connection connection, AdminClient adminClient) throws Exception {
40-
Set<String> topics = adminClient.listTopics().names().get();
41+
ListTopicsOptions options = new ListTopicsOptions().listInternal(true);
42+
Set<String> topics = adminClient.listTopics(options).names().get();
4143
Map<String, TopicDescription> topicInfo = adminClient.describeTopics(topics).all().get();
4244
try (PreparedStatement stmt = connection.prepareStatement(
4345
"INSERT INTO replicas (topic, partition, broker_id, is_leader, is_preferred_leader, is_in_sync, replica_order)"

0 commit comments

Comments
 (0)