Skip to content

Commit d04461c

Browse files
committed
Add kafka-topic-stats utility
1 parent 782e81d commit d04461c

File tree

6 files changed

+204
-0
lines changed

6 files changed

+204
-0
lines changed

Diff for: README.md

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ In each sample, we demonstrate a single connection path for our Standard/Enterpr
3333
* [spring kafka tutorial](https://developer.ibm.com/tutorials/use-spring-kafka-to-access-an-event-streams-service/) : Tutorial to quickly get you up and running using IBM Event Streams.
3434
* [spring-kafka](https://github.com/wkorando/event-stream-kafka) : Sample app to connect to Event Streams using Spring Kafka
3535

36+
### Utilities:
37+
* [kafka-topic-stats](/kafka-topic-stats/README.md): Utility for displaying Kafka topic usage and configuration.
38+
3639
## Get Further Assistance
3740

3841
If you have any issues, just ask us a question (tagged with `ibm-eventstreams`) on [StackOverflow.com](http://stackoverflow.com/questions/tagged/ibm-eventstreams).

Diff for: kafka-topic-stats/.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.gradle
2+
build
3+
bin
4+
.vscode

Diff for: kafka-topic-stats/README.md

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# kafka-topic-stats
2+
Prints usage and topic configuration for [Event Streams](https://cloud.ibm.com/catalog/services/event-streams) topic partitions.
3+
4+
## Building
5+
This project requires Java 11.
6+
7+
```
8+
./gradle jar
9+
```
10+
11+
## Running
12+
```
13+
export API_KEY=your api key here
14+
export BOOTSTRAP_ENDPOINTS=kafka-0.example.org:9093,kafka-1.example.org:9093
15+
java -jar ./build/libs/kafka-topic-stats.jar
16+
```
17+
18+
## Example output
19+
```
20+
Topic Name, Partition ID, Used Bytes, retention.bytes, segment.bytes, cleanup.policy, retention.ms
21+
mytopic, 0, 0, 1073741824, 536870912, delete, 86400000
22+
mytopic, 1, 0, 1073741824, 536870912, delete, 86400000
23+
mytopic, 2, 0, 1073741824, 536870912, delete, 86400000
24+
```

Diff for: kafka-topic-stats/build.gradle

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
plugins {
2+
id 'application'
3+
}
4+
5+
repositories {
6+
mavenCentral()
7+
}
8+
9+
dependencies {
10+
implementation 'org.apache.kafka:kafka-clients:3.0.1'
11+
implementation 'org.slf4j:slf4j-nop:1.7.36'
12+
}
13+
14+
jar {
15+
manifest {
16+
attributes "Main-Class": "com.eventstreams.samples.TopicStats"
17+
}
18+
19+
from {
20+
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
21+
}
22+
}

Diff for: kafka-topic-stats/settings.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rootProject.name = 'kafka-topic-stats'
2+
include('app')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package com.eventstreams.samples;
2+
3+
import java.util.Collection;
4+
import java.util.HashMap;
5+
import java.util.TreeMap;
6+
import java.util.concurrent.ExecutionException;
7+
import java.util.stream.Collectors;
8+
9+
import org.apache.kafka.clients.CommonClientConfigs;
10+
import org.apache.kafka.clients.admin.AdminClient;
11+
import org.apache.kafka.clients.admin.Config;
12+
import org.apache.kafka.clients.admin.ReplicaInfo;
13+
import org.apache.kafka.common.TopicPartition;
14+
import org.apache.kafka.common.config.ConfigResource;
15+
import org.apache.kafka.common.config.ConfigResource.Type;
16+
import org.apache.kafka.common.config.SaslConfigs;
17+
import org.apache.kafka.common.config.TopicConfig;
18+
19+
public class TopicStats {
20+
public static void main(String[] args) {
21+
final var apiKey = System.getenv("API_KEY");
22+
final var bootstrap = System.getenv("BOOTSTRAP_ENDPOINTS");
23+
24+
var shouldExit = false;
25+
if (apiKey == null || apiKey.isEmpty()) {
26+
System.err.println("API_KEY environment variable is not set");
27+
shouldExit = true;
28+
}
29+
if (bootstrap == null || bootstrap.isEmpty()) {
30+
System.err.println("BOOTSTRAP_ENDPOINTS environment variable is not set");
31+
shouldExit = true;
32+
}
33+
if (shouldExit) {
34+
System.exit(1);
35+
}
36+
37+
final var configs = new HashMap<String, Object>();
38+
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
39+
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
40+
configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
41+
configs.put(SaslConfigs.SASL_JAAS_CONFIG,
42+
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\""
43+
+ apiKey + "\";");
44+
45+
try (final AdminClient admin = AdminClient.create(configs)) {
46+
// Discover brokers
47+
final var nodes = admin.describeCluster().nodes().get();
48+
final var brokers = nodes.stream().map(v -> v.id()).collect(Collectors.toSet());
49+
50+
// Get replica information for each broker and reduce into a map of TopicPartition -> ReplicaInfo.
51+
final var brokerToLogDir = admin.describeLogDirs(brokers).allDescriptions().get();
52+
final var logDirs = brokerToLogDir.values().stream().map(v -> v.values()).flatMap(Collection::stream).collect(Collectors.toSet());
53+
final var replicaInfo = logDirs.stream().map(v -> v.replicaInfos()).reduce((m1, m2) -> {
54+
var r = new HashMap<TopicPartition, ReplicaInfo>();
55+
r.putAll(m1);
56+
// All of a partition's replicas should be about the same size. Be pessimistic for
57+
// the case that the partition is not fully in-sync, and pick the largest sized replica.
58+
m2.forEach((k, v) -> {
59+
final var m1v = m1.get(k);
60+
if (m1v == null || m1v.size() < v.size()) {
61+
r.put(k, v);
62+
}
63+
});
64+
return r;
65+
}).get();
66+
67+
// Get all the topics, their descriptions, and their configuration
68+
final var topicNames = admin.listTopics().names().get();
69+
final var topicsDesc = admin.describeTopics(topicNames).all().get();
70+
final var topicsConfigResources = topicNames.stream().map(v -> new ConfigResource(Type.TOPIC, v)).collect(Collectors.toSet());
71+
final var topicsConfigs = admin.describeConfigs(topicsConfigResources).all().get();
72+
73+
class PartitionInfo {
74+
final Config config;
75+
final long usedBytes;
76+
77+
PartitionInfo(Config config, long usedBytes) {
78+
this.config = config;
79+
this.usedBytes = usedBytes;
80+
}
81+
}
82+
83+
// Wraps TopicPartition in Comparable.
84+
class ComparableTopicPartition implements Comparable<ComparableTopicPartition> {
85+
final String topic;
86+
final int partition;
87+
88+
ComparableTopicPartition(TopicPartition tp) {
89+
this.topic = tp.topic();
90+
this.partition = tp.partition();
91+
}
92+
93+
@Override
94+
public int compareTo(ComparableTopicPartition o) {
95+
var result = this.topic.compareTo(o.topic);
96+
if (result == 0) { // topic names are the same, compare based on partition number
97+
result = this.partition - o.partition;
98+
}
99+
return result;
100+
}
101+
}
102+
103+
final var usageInfo = new TreeMap<ComparableTopicPartition, PartitionInfo>(); // TreeMap as it implements SortedMap.
104+
topicsConfigs.forEach((resource, config) -> {
105+
final var topicName = resource.name();
106+
final var topicDescription = topicsDesc.get(topicName);
107+
if (topicDescription == null) {
108+
// skip if there isn't a description. The topic information is gathered
109+
// at slightly different times, so it's possible there will be inconsistencies.
110+
return;
111+
}
112+
for (final var partition : topicDescription.partitions()) {
113+
final var tp = new TopicPartition(topicName, partition.partition());
114+
final var info = replicaInfo.get(tp);
115+
if (info == null) {
116+
// skip if there isn't replica information. As per above, the data gathered
117+
// might not be completely consistent.
118+
continue;
119+
}
120+
final var usedBytes = info.size();
121+
usageInfo.put(new ComparableTopicPartition(tp), new PartitionInfo(config, usedBytes));
122+
}
123+
});
124+
125+
// Output in CSV format
126+
System.out.println("Topic Name, Partition ID, Used Bytes, retention.bytes, segment.bytes, cleanup.policy, retention.ms"); // column titles
127+
usageInfo.forEach((k, v) -> {
128+
System.out.printf("%s, %s, %d, %s, %s, %s, %s\n",
129+
k.topic,
130+
k.partition,
131+
v.usedBytes,
132+
v.config.get(TopicConfig.RETENTION_BYTES_CONFIG).value(),
133+
v.config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value(),
134+
v.config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(),
135+
v.config.get(TopicConfig.RETENTION_MS_CONFIG).value());
136+
});
137+
138+
} catch(final ExecutionException e) {
139+
// ExecutionException typically wraps the exception we *actually* care about...
140+
if (e.getCause() != null) {
141+
e.getCause().printStackTrace();
142+
} else {
143+
e.printStackTrace();
144+
}
145+
} catch(final Exception e) {
146+
e.printStackTrace();
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)