-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-17573: Move KRaftMetadataCache to metadata module #19232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Signed-off-by: PoAn Yang <[email protected]>
4a650ab
to
c94277b
Compare
@chia7712, may you help to review this when you have time? This PR improves some performance like |
@FrankYang0529 please fix the conflicts |
@chia7712, fixed conflicts Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for this patch!
} else { | ||
List<Integer> res = new ArrayList<>(brokers.length); | ||
for (int brokerId : brokers) { | ||
Optional.ofNullable(image.cluster().broker(brokerId)).ifPresent(b -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is hotspot, we should not generate optional object.
for (int brokerId : brokers) {
var broker = image.cluster().broker(brokerId);
if (broker != null && !broker.fenced() && broker.listeners().containsKey(listenerName.value()))
res.add(brokerId);
}
this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] ").logger(KRaftMetadataCache.class); | ||
} | ||
|
||
// This method is the main hotspot when it comes to the performance of metadata requests, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please format those docs?
return currentImage; | ||
} | ||
|
||
// errorUnavailableEndpoints exists to support v0 MetadataResponses |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
// errorUnavailableEndpoints exists to support v0 MetadataResponses | ||
// If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. | ||
// Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). | ||
private Optional<Iterator<MetadataResponsePartition>> getPartitionMetadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not returning List<MetadataResponsePartition>
? we eventually need the collection, right?
int startIndex, | ||
int maxCount | ||
) { | ||
TopicImage topic = image.topics().getTopic(topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please try to streamline the code? for example:
var topic = image.topics().getTopic(topicName);
if (topic == null) return Map.entry(Optional.empty(), -1);
List<DescribeTopicPartitionsResponsePartition> result = new ArrayList<>();
final var partitions = topic.partitions().keySet();
final var upperIndex = Math.min(topic.partitions().size(), startIndex + maxCount);
for (int partitionId = startIndex; partitionId < upperIndex; partitionId++) {
var partition = topic.partitions().get(partitionId);
if (partition == null) {
log.warn("The partition {} does not exist for {}", partitionId, topicName);
continue;
}
var filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, listenerName, false);
var filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false);
var offlineReplicas = getOfflineReplicas(image, partition, listenerName);
var maybeLeader = getAliveEndpoint(image, partition.leader, listenerName);
result.add(new DescribeTopicPartitionsResponsePartition()
.setPartitionIndex(partitionId)
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
.setLeaderEpoch(partition.leaderEpoch)
.setReplicaNodes(filteredReplicas)
.setIsrNodes(filteredIsr)
.setOfflineReplicas(offlineReplicas)
.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
.setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
}
return Map.entry(Optional.of(result), (upperIndex < partitions.size()) ? upperIndex : -1);
} | ||
|
||
private List<Integer> getOfflineReplicas(MetadataImage image, PartitionRegistration partition, ListenerName listenerName) { | ||
List<Integer> offlineReplicas = new ArrayList<>(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<Integer> offlineReplicas = new ArrayList<>(0);
for (var brokerId : partition.replicas) {
var broker = image.cluster().broker(brokerId);
if (broker == null || isReplicaOffline(partition, listenerName, broker))
offlineReplicas.add(brokerId);
}
return offlineReplicas;
return getRandomAliveBroker(currentImage); | ||
} | ||
|
||
private Optional<Integer> getRandomAliveBroker(MetadataImage image) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please inline it
Signed-off-by: PoAn Yang <[email protected]>
Changes
Performance
org.apache.kafka.jmh.fetcher.ReplicaFetcherThreadBenchmark
Benchmark (partitionCount) Mode Cnt
Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 2
4669.047 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 2
24722.430 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 2
57837.371 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 2
485991.588 ns/op
Benchmark (partitionCount) Mode Cnt
Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 2
4581.417 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 2
24503.970 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 2
56348.142 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 2
470634.225 ns/op
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.metadata.KRaftMetadataRequestBenchmark
Benchmark
(partitionCount) (topicCount) Mode Cnt Score Error Units
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 500 avgt 2 943495.449 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 1000 avgt 2 1923336.954 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 5000 avgt 2 22469603.257 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 500 avgt 2 1564799.213 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 1000 avgt 2 3218482.586 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 5000 avgt 2 29796675.484 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 500 avgt 2 3604029.278 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 1000 avgt 2 7617695.388 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 5000 avgt 2 55070054.797 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 500 avgt 2 331.724 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 1000 avgt 2 337.840 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 5000 avgt 2 337.959 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 500 avgt 2 334.827 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 1000 avgt 2 342.852 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 5000 avgt 2 322.059 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 500 avgt 2 328.329 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 1000 avgt 2 336.541 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 5000 avgt 2 334.077 ns/op
Benchmark
(partitionCount) (topicCount) Mode Cnt Score Error Units
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 500 avgt 2 671028.571 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 1000 avgt 2 1435193.244 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 5000 avgt 2 19727430.833 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 500 avgt 2 1114655.107 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 1000 avgt 2 2249700.266 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 5000 avgt 2 25811047.360 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 500 avgt 2 2311887.438 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 1000 avgt 2 4537162.770 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 5000 avgt 2 44013921.418 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 500 avgt 2 330.509 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 1000 avgt 2 337.044 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 5000 avgt 2 332.035 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 500 avgt 2 341.664 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 1000 avgt 2 322.385 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 5000 avgt 2 333.220 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 500 avgt 2 335.865 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 1000 avgt 2 326.180 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 5000 avgt 2 336.809 ns/op
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.partition.PartitionMakeFollowerBenchmark
Benchmark Mode Cnt Score
Error Units
PartitionMakeFollowerBenchmark.testMakeFollower avgt 2 158.161
ns/op
Benchmark Mode Cnt Score
Error Units
PartitionMakeFollowerBenchmark.testMakeFollower avgt 2 156.056
ns/op
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.partition.UpdateFollowerFetchStateBenchmark
Benchmark
Mode Cnt Score Error Units
UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench
avgt 2 5006.529 ns/op
UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange
avgt 2 4900.634 ns/op
Benchmark
Mode Cnt Score Error Units
UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench
avgt 2 5031.341 ns/op
UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange
avgt 2 4987.916 ns/op
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.server.CheckpointBench
Benchmark (numPartitions)
(numTopics) Mode Cnt Score Error Units
CheckpointBench.measureCheckpointHighWatermarks 3
100 thrpt 2 0.927 ops/ms
CheckpointBench.measureCheckpointHighWatermarks 3
1000 thrpt 2 0.678 ops/ms
CheckpointBench.measureCheckpointHighWatermarks 3
2000 thrpt 2 0.489 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
100 thrpt 2 0.998 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
1000 thrpt 2 0.719 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
2000 thrpt 2 0.572 ops/ms
Benchmark (numPartitions)
(numTopics) Mode Cnt Score Error Units
CheckpointBench.measureCheckpointHighWatermarks 3
100 thrpt 2 0.975 ops/ms
CheckpointBench.measureCheckpointHighWatermarks 3
1000 thrpt 2 0.673 ops/ms
CheckpointBench.measureCheckpointHighWatermarks 3
2000 thrpt 2 0.483 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
100 thrpt 2 1.002 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
1000 thrpt 2 0.718 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
2000 thrpt 2 0.575 ops/ms
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.server.PartitionCreationBench
Benchmark (numPartitions) (useTopicIds)
Mode Cnt Score Error Units
PartitionCreationBench.makeFollower 20 false
avgt 2 6.200 ms/op
PartitionCreationBench.makeFollower 20 true
avgt 2 7.244 ms/op
Benchmark (numPartitions) (useTopicIds)
Mode Cnt Score Error Units
PartitionCreationBench.makeFollower 20 false
avgt 2 6.144 ms/op
PartitionCreationBench.makeFollower 20 true
avgt 2 7.169 ms/op