Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
import org.apache.kafka.clients.admin.internals.PartitionLeaderCache;
import org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy;
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -402,7 +403,7 @@ public class KafkaAdminClient extends AdminClient {
private final long retryBackoffMaxMs;
private final ExponentialBackoff retryBackoff;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
private final Map<TopicPartition, Integer> partitionLeaderCache;
private final PartitionLeaderCache partitionLeaderCache;
private final AdminFetchMetricsManager adminFetchMetricsManager;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;

Expand Down Expand Up @@ -629,7 +630,7 @@ private KafkaAdminClient(AdminClientConfig config,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryReporter = clientTelemetryReporter;
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.partitionLeaderCache = new HashMap<>();
this.partitionLeaderCache = new PartitionLeaderCache();
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.slf4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.singleton;
Expand All @@ -56,7 +55,7 @@ public AbortTransactionHandler(

public static PartitionLeaderStrategy.PartitionLeaderFuture<Void> newFuture(
Set<TopicPartition> topicPartitions,
Map<TopicPartition, Integer> partitionLeaderCache
PartitionLeaderCache partitionLeaderCache
) {
return new PartitionLeaderStrategy.PartitionLeaderFuture<>(topicPartitions, partitionLeaderCache);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,13 @@ public AdminApiDriver(
// metadata. For all cached keys, they can proceed straight to the fulfillment map.
// Note that the cache is only used on the initial calls, and any errors that result
// in additional lookups use the full set of lookup keys.
retryLookup(future.uncachedLookupKeys());
future.cachedKeyBrokerIdMapping().forEach((key, brokerId) -> fulfillmentMap.put(new FulfillmentScope(brokerId), key));
future.cachedKeyBrokerIdMapping().forEach((key, brokerId) -> {
if (AdminApiFuture.UNKNOWN_BROKER_ID.equals(brokerId)) {
unmap(key);
} else {
fulfillmentMap.put(new FulfillmentScope(brokerId), key);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

public interface AdminApiFuture<K, V> {

Integer UNKNOWN_BROKER_ID = -1;

/**
* The initial set of lookup keys. Although this will usually match the fulfillment
* keys, it does not necessarily have to. For example, in the case of
Expand All @@ -39,22 +41,17 @@ public interface AdminApiFuture<K, V> {
Set<K> lookupKeys();

/**
* The set of request keys that do not have cached key-broker id mappings. If there
* is no cached key mapping, this will be the same as the lookup keys.
* Can be empty, but only if the cached key mapping is not empty.
*/
default Set<K> uncachedLookupKeys() {
return lookupKeys();
}

/**
* The cached key-broker id mapping. For lookup strategies that do not make use of a
* cache of metadata, this will be empty.
* The cached key-broker id mapping. For non-cached values(or lookup strategies that do not make use of a
* cache of metadata) the broker id will be {@link #UNKNOWN_BROKER_ID}
*
* @return mapping of keys to broker ids
*/
default Map<K, Integer> cachedKeyBrokerIdMapping() {
return Collections.emptyMap();
Map<K, Integer> result = new HashMap<>();
for (K key : lookupKeys()) {
result.put(key, UNKNOWN_BROKER_ID);
}
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {

public static PartitionLeaderStrategy.PartitionLeaderFuture<DeletedRecords> newFuture(
Collection<TopicPartition> topicPartitions,
Map<TopicPartition, Integer> partitionLeaderCache
PartitionLeaderCache partitionLeaderCache
) {
return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new HashSet<>(topicPartitions), partitionLeaderCache);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public DescribeProducersHandler(

public static PartitionLeaderStrategy.PartitionLeaderFuture<PartitionProducerState> newFuture(
Collection<TopicPartition> topicPartitions,
Map<TopicPartition, Integer> partitionLeaderCache
PartitionLeaderCache partitionLeaderCache
) {
return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new HashSet<>(topicPartitions), partitionLeaderCache);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public Map<TopicPartition, Throwable> handleUnsupportedVersionException(

public static PartitionLeaderStrategy.PartitionLeaderFuture<ListOffsetsResultInfo> newFuture(
Collection<TopicPartition> topicPartitions,
Map<TopicPartition, Integer> partitionLeaderCache
PartitionLeaderCache partitionLeaderCache
) {
return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new HashSet<>(topicPartitions), partitionLeaderCache);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class PartitionLeaderCache {

private final Map<TopicPartition, Integer> cache = new HashMap<>();

public Map<TopicPartition, Integer> get(Collection<TopicPartition> keys) {
Map<TopicPartition, Integer> result = new HashMap<>();
synchronized (cache) {
for (TopicPartition key : keys) {
if (cache.containsKey(key)) {
result.put(key, cache.get(key));
}
}
}
return result;
}

public void put(Map<TopicPartition, Integer> values) {
synchronized (cache) {
cache.putAll(values);
}
}

public void remove(Collection<TopicPartition> keys) {
synchronized (cache) {
for (TopicPartition key : keys) {
cache.remove(key);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -208,10 +207,10 @@ public LookupResult<TopicPartition> handleResponse(
*/
public static class PartitionLeaderFuture<V> implements AdminApiFuture<TopicPartition, V> {
private final Set<TopicPartition> requestKeys;
private final Map<TopicPartition, Integer> partitionLeaderCache;
private final PartitionLeaderCache partitionLeaderCache;
private final Map<TopicPartition, KafkaFuture<V>> futures;

public PartitionLeaderFuture(Set<TopicPartition> requestKeys, Map<TopicPartition, Integer> partitionLeaderCache) {
public PartitionLeaderFuture(Set<TopicPartition> requestKeys, PartitionLeaderCache partitionLeaderCache) {
this.requestKeys = requestKeys;
this.partitionLeaderCache = partitionLeaderCache;
this.futures = requestKeys.stream().collect(Collectors.toUnmodifiableMap(
Expand All @@ -225,26 +224,12 @@ public Set<TopicPartition> lookupKeys() {
return futures.keySet();
}

@Override
public Set<TopicPartition> uncachedLookupKeys() {
Set<TopicPartition> keys = new HashSet<>();
requestKeys.forEach(tp -> {
if (!partitionLeaderCache.containsKey(tp)) {
keys.add(tp);
}
});
return keys;
}

@Override
public Map<TopicPartition, Integer> cachedKeyBrokerIdMapping() {
Map<TopicPartition, Integer> cache = partitionLeaderCache.get(requestKeys);

Map<TopicPartition, Integer> mapping = new HashMap<>();
requestKeys.forEach(tp -> {
Integer brokerId = partitionLeaderCache.get(tp);
if (brokerId != null) {
mapping.put(tp, brokerId);
}
});
requestKeys.forEach(tp -> mapping.put(tp, cache.getOrDefault(tp, UNKNOWN_BROKER_ID)));
return mapping;
}

Expand All @@ -263,16 +248,16 @@ private void complete(TopicPartition key, V value) {

@Override
public void completeLookup(Map<TopicPartition, Integer> brokerIdMapping) {
partitionLeaderCache.putAll(brokerIdMapping);
partitionLeaderCache.put(brokerIdMapping);
}

@Override
public void completeExceptionally(Map<TopicPartition, Throwable> errors) {
partitionLeaderCache.remove(errors.keySet());
errors.forEach(this::completeExceptionally);
}

private void completeExceptionally(TopicPartition key, Throwable t) {
partitionLeaderCache.remove(key);
futureOrThrow(key).completeExceptionally(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private AdminApiDriver<TopicPartition, Void> buildDriver(

@Test
public void testCachingRepeatedRequest() {
Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
PartitionLeaderCache partitionLeaderCache = new PartitionLeaderCache();

TopicPartition tp0 = new TopicPartition("T", 0);
TopicPartition tp1 = new TopicPartition("T", 1);
Expand All @@ -99,8 +99,9 @@ public void testCachingRepeatedRequest() {
assertFalse(result.all().get(tp0).isDone());
assertFalse(result.all().get(tp1).isDone());

assertEquals(1, partitionLeaderCache.get(tp0));
assertEquals(2, partitionLeaderCache.get(tp1));
Map<TopicPartition, Integer> cache = partitionLeaderCache.get(Set.of(tp0, tp1));
assertEquals(1, cache.get(tp0));
assertEquals(2, cache.get(tp1));

// Second, the fulfillment stage makes the actual requests
requestSpecs = driver.poll();
Expand Down Expand Up @@ -139,7 +140,7 @@ public void testCachingOverlappingRequests() {
// 2) for T-1 and T-2 (leadership data for T-1 should be cached from previous request)
// 3) for T-0, T-1 and T-2 (all leadership data should be cached already)
// 4) for T-0, T-1, T-2 and T-3 (just T-3 needs to be looked up)
Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
PartitionLeaderCache partitionLeaderCache = new PartitionLeaderCache();

TopicPartition tp0 = new TopicPartition("T", 0);
TopicPartition tp1 = new TopicPartition("T", 1);
Expand Down Expand Up @@ -168,8 +169,9 @@ public void testCachingOverlappingRequests() {
assertFalse(result.all().get(tp0).isDone());
assertFalse(result.all().get(tp1).isDone());

assertEquals(1, partitionLeaderCache.get(tp0));
assertEquals(2, partitionLeaderCache.get(tp1));
Map<TopicPartition, Integer> cache = partitionLeaderCache.get(Set.of(tp0, tp1));
assertEquals(1, cache.get(tp0));
assertEquals(2, cache.get(tp1));

// Second, the fulfillment stage makes the actual requests
requestSpecs = driver.poll();
Expand Down Expand Up @@ -206,9 +208,10 @@ public void testCachingOverlappingRequests() {
assertTrue(result.all().get(tp1).isDone()); // Already fulfilled
assertFalse(result.all().get(tp2).isDone());

assertEquals(1, partitionLeaderCache.get(tp0));
assertEquals(2, partitionLeaderCache.get(tp1));
assertEquals(1, partitionLeaderCache.get(tp2));
cache = partitionLeaderCache.get(Set.of(tp0, tp1, tp2));
assertEquals(1, cache.get(tp0));
assertEquals(2, cache.get(tp1));
assertEquals(1, cache.get(tp2));

// Finally, the fulfillment stage makes the actual request for the uncached topic-partition
requestSpecs = driver.poll();
Expand Down Expand Up @@ -268,10 +271,11 @@ public void testCachingOverlappingRequests() {
assertTrue(result.all().get(tp2).isDone()); // Already fulfilled
assertFalse(result.all().get(tp3).isDone());

assertEquals(1, partitionLeaderCache.get(tp0));
assertEquals(2, partitionLeaderCache.get(tp1));
assertEquals(1, partitionLeaderCache.get(tp2));
assertEquals(2, partitionLeaderCache.get(tp3));
cache = partitionLeaderCache.get(Set.of(tp0, tp1, tp2, tp3));
assertEquals(1, cache.get(tp0));
assertEquals(2, cache.get(tp1));
assertEquals(1, cache.get(tp2));
assertEquals(2, cache.get(tp3));

// Finally, the fulfillment stage makes the actual request for the uncached topic-partition
requestSpecs = driver.poll();
Expand All @@ -288,7 +292,7 @@ public void testCachingOverlappingRequests() {

@Test
public void testNotLeaderFulfillmentError() {
Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
PartitionLeaderCache partitionLeaderCache = new PartitionLeaderCache();

TopicPartition tp0 = new TopicPartition("T", 0);
TopicPartition tp1 = new TopicPartition("T", 1);
Expand All @@ -311,8 +315,9 @@ public void testNotLeaderFulfillmentError() {
assertFalse(result.all().get(tp0).isDone());
assertFalse(result.all().get(tp1).isDone());

assertEquals(1, partitionLeaderCache.get(tp0));
assertEquals(2, partitionLeaderCache.get(tp1));
Map<TopicPartition, Integer> cache = partitionLeaderCache.get(Set.of(tp0, tp1));
assertEquals(1, cache.get(tp0));
assertEquals(2, cache.get(tp1));

// Second, the fulfillment stage makes the actual requests
requestSpecs = driver.poll();
Expand All @@ -337,8 +342,9 @@ public void testNotLeaderFulfillmentError() {
assertTrue(result.all().get(tp0).isDone());
assertFalse(result.all().get(tp1).isDone());

assertEquals(1, partitionLeaderCache.get(tp0));
assertEquals(1, partitionLeaderCache.get(tp1));
cache = partitionLeaderCache.get(Set.of(tp0, tp1));
assertEquals(1, cache.get(tp0));
assertEquals(1, cache.get(tp1));

// And the fulfillment stage makes the actual request
requestSpecs = driver.poll();
Expand All @@ -354,7 +360,7 @@ public void testNotLeaderFulfillmentError() {
@Test
public void testFatalLookupError() {
TopicPartition tp0 = new TopicPartition("T", 0);
Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
PartitionLeaderCache partitionLeaderCache = new PartitionLeaderCache();
PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
new PartitionLeaderStrategy.PartitionLeaderFuture<>(Collections.singleton(tp0), partitionLeaderCache);
AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
Expand All @@ -374,7 +380,7 @@ public void testFatalLookupError() {
@Test
public void testRetryLookupAfterDisconnect() {
TopicPartition tp0 = new TopicPartition("T", 0);
Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
PartitionLeaderCache partitionLeaderCache = new PartitionLeaderCache();
PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
new PartitionLeaderStrategy.PartitionLeaderFuture<>(Collections.singleton(tp0), partitionLeaderCache);
AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
Expand Down
Loading