Skip to content

Commit 8209af2

Browse files
KAFKA-20064: Made PartitionLeaderCache thread safe. (apache#21335)
* Introduced a new class - PartitionLeaderCache * Changed the usage of the cache to make calls atomic(e.g. getting cached and non-cached values as one call instead of two, deleting cached values as one call, not one by one) * Added an integration test that tests the concurrent access of the cache Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent a899cfc commit 8209af2

11 files changed

Lines changed: 310 additions & 64 deletions

File tree

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.clients.CommonClientConfigs;
20+
import org.apache.kafka.clients.DefaultHostResolver;
21+
import org.apache.kafka.clients.NetworkClient;
22+
import org.apache.kafka.clients.admin.internals.PartitionLeaderCache;
23+
import org.apache.kafka.common.IsolationLevel;
24+
import org.apache.kafka.common.TopicPartition;
25+
import org.apache.kafka.common.errors.TimeoutException;
26+
import org.apache.kafka.common.test.ClusterInstance;
27+
import org.apache.kafka.common.test.api.ClusterTest;
28+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
29+
import org.apache.kafka.common.test.api.Type;
30+
import org.apache.kafka.common.utils.Utils;
31+
import org.apache.kafka.test.TestUtils;
32+
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
36+
import java.net.InetAddress;
37+
import java.net.UnknownHostException;
38+
import java.util.ArrayList;
39+
import java.util.Collection;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.concurrent.CountDownLatch;
43+
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.atomic.AtomicBoolean;
46+
import java.util.concurrent.atomic.AtomicInteger;
47+
import java.util.function.Function;
48+
import java.util.stream.Collectors;
49+
50+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
51+
import static org.junit.jupiter.api.Assertions.assertThrows;
52+
53+
@ClusterTestDefaults(
54+
types = {Type.KRAFT},
55+
brokers = 3
56+
)
57+
public class ConcurrentListOffsetsRequestTest {
58+
private static final String TOPIC = "topic";
59+
private static final short REPLICAS = 1;
60+
private static final int PARTITION = 2;
61+
private static final int TIMEOUT = 1000;
62+
private final ClusterInstance clusterInstance;
63+
private Admin adminClient;
64+
private NetworkClient networkClient;
65+
private final AtomicBoolean injectHostResolverError = new AtomicBoolean(false);
66+
67+
ConcurrentListOffsetsRequestTest(ClusterInstance clusterInstance) {
68+
this.clusterInstance = clusterInstance;
69+
}
70+
71+
@BeforeEach
72+
public void setup() throws Exception {
73+
clusterInstance.waitForReadyBrokers();
74+
clusterInstance.createTopic(TOPIC, PARTITION, REPLICAS);
75+
Map<String, Object> props = Map.of(
76+
"default.api.timeout.ms", TIMEOUT,
77+
"request.timeout.ms", TIMEOUT,
78+
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
79+
adminClient = KafkaAdminClient.createInternal(new AdminClientConfig(clusterInstance.setClientSaslConfig(props), true),
80+
null, new TestHostResolver());
81+
82+
networkClient = TestUtils.fieldValue(adminClient, KafkaAdminClient.class, "client");
83+
}
84+
85+
@AfterEach
86+
public void teardown() {
87+
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient");
88+
}
89+
90+
@ClusterTest
91+
public void correctlyHandleConcurrentModificationOfPartitionLeaderCache() throws Exception {
92+
// making one request to prepopulate the partition leader cache so we have something to delete later
93+
listAllOffsets().all().get(TIMEOUT * 2, TimeUnit.SECONDS);
94+
95+
final CountDownLatch invalidationLatch = new CountDownLatch(1);
96+
// Replacing the partition leader cache in order to be able to synchronize the calls so that they happen in the right order to reproduce the issue
97+
TestPartitionLeaderCache testPartitionLeaderCache = replacePartitionLeaderCache(invalidationLatch);
98+
99+
// closing the connection to the first node. not using clusterInstance.shutdownBroker to reduce flakiness
100+
networkClient.close(testPartitionLeaderCache.get(getTopicPartitions()).values().iterator().next().toString());
101+
// as next call with try to resolve the host for the closed node, it's time to let it fail, which will lead to cache invalidation
102+
injectHostResolverError.set(true);
103+
104+
// making another request(this request will face the host resolver error and remove the node from the cache)
105+
ListOffsetsResult failInducingResult = listAllOffsets();
106+
// waiting until we get to the invalidation
107+
invalidationLatch.await();
108+
// making another request. at this point the fail inducing request is waiting for this one before it deletes the keys associated with the node
109+
// the TestPartitionLeaderCache class synchronizes the calls to mimic the race condition
110+
ListOffsetsResult failingResult = listAllOffsets();
111+
112+
// verifying that we correctly declined the call
113+
ExecutionException executionException = assertThrows(ExecutionException.class, () -> failInducingResult.all().get(TIMEOUT * 2, TimeUnit.MILLISECONDS));
114+
assertInstanceOf(TimeoutException.class, executionException.getCause());
115+
116+
// verifying that we correctly declined the call
117+
executionException = assertThrows(ExecutionException.class, () -> failingResult.all().get(TIMEOUT * 2, TimeUnit.MILLISECONDS));
118+
assertInstanceOf(TimeoutException.class, executionException.getCause());
119+
}
120+
121+
private TestPartitionLeaderCache replacePartitionLeaderCache(CountDownLatch invalidationLatch) throws Exception {
122+
PartitionLeaderCache oldPartitionLeaderCache = TestUtils.fieldValue(adminClient, KafkaAdminClient.class, "partitionLeaderCache");
123+
124+
TestPartitionLeaderCache partitionLeaderCache = new TestPartitionLeaderCache(oldPartitionLeaderCache.get(getTopicPartitions()), invalidationLatch);
125+
TestUtils.setFieldValue(adminClient, "partitionLeaderCache", partitionLeaderCache);
126+
return partitionLeaderCache;
127+
}
128+
129+
private ListOffsetsResult listAllOffsets() {
130+
List<TopicPartition> partitions = getTopicPartitions();
131+
132+
Map<TopicPartition, OffsetSpec> offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
133+
return adminClient.listOffsets(offsetSpecMap, new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED));
134+
}
135+
136+
private List<TopicPartition> getTopicPartitions() {
137+
List<TopicPartition> partitions = new ArrayList<>();
138+
for (int i = 0; i < PARTITION; i++) {
139+
partitions.add(new TopicPartition(TOPIC, i));
140+
}
141+
return partitions;
142+
}
143+
144+
private static class TestPartitionLeaderCache extends PartitionLeaderCache {
145+
146+
private final AtomicInteger getCounter = new AtomicInteger(0);
147+
private final CountDownLatch invalidationLatch;
148+
private final CountDownLatch newRequestCheckLatch = new CountDownLatch(1);
149+
private final CountDownLatch removeCompleteLatch = new CountDownLatch(1);
150+
151+
public TestPartitionLeaderCache(Map<TopicPartition, Integer> oldPartitionLeaderCache, final CountDownLatch invalidationLatch) {
152+
put(oldPartitionLeaderCache);
153+
this.invalidationLatch = invalidationLatch;
154+
}
155+
156+
@Override
157+
public Map<TopicPartition, Integer> get(Collection<TopicPartition> keys) {
158+
Map<TopicPartition, Integer> result = super.get(keys);
159+
// waiting for the third call: first one was to close the network connection, second one was from the request that invalidates the cache
160+
if (getCounter.incrementAndGet() == 3) {
161+
newRequestCheckLatch.countDown();
162+
try {
163+
// letting the remove method proceed and actually remove the data
164+
removeCompleteLatch.await();
165+
} catch (InterruptedException e) {
166+
throw new RuntimeException(e);
167+
}
168+
}
169+
170+
return result;
171+
}
172+
173+
@Override
174+
public void remove(Collection<TopicPartition> keys) {
175+
try {
176+
// letting the caller know that we've reached the invalidation step, and it's time to send the second request
177+
invalidationLatch.countDown();
178+
// waiting for the second request to reach get
179+
newRequestCheckLatch.await();
180+
} catch (InterruptedException e) {
181+
throw new RuntimeException(e);
182+
}
183+
super.remove(keys);
184+
// once the value removed, we are letting the get method proceed and return the value
185+
removeCompleteLatch.countDown();
186+
}
187+
}
188+
189+
private class TestHostResolver extends DefaultHostResolver {
190+
191+
@Override
192+
public InetAddress[] resolve(String host) throws UnknownHostException {
193+
if (injectHostResolverError.get()) {
194+
throw new UnknownHostException();
195+
}
196+
return super.resolve(host);
197+
}
198+
}
199+
}

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
6464
import org.apache.kafka.clients.admin.internals.ListShareGroupOffsetsHandler;
6565
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
66+
import org.apache.kafka.clients.admin.internals.PartitionLeaderCache;
6667
import org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy;
6768
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
6869
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -407,7 +408,7 @@ public class KafkaAdminClient extends AdminClient {
407408
private final long retryBackoffMaxMs;
408409
private final ExponentialBackoff retryBackoff;
409410
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
410-
private final Map<TopicPartition, Integer> partitionLeaderCache;
411+
private final PartitionLeaderCache partitionLeaderCache;
411412
private final AdminFetchMetricsManager adminFetchMetricsManager;
412413
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
413414

@@ -631,7 +632,7 @@ private KafkaAdminClient(AdminClientConfig config,
631632
CommonClientConfigs.RETRY_BACKOFF_JITTER);
632633
this.clientTelemetryReporter = clientTelemetryReporter;
633634
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
634-
this.partitionLeaderCache = new HashMap<>();
635+
this.partitionLeaderCache = new PartitionLeaderCache();
635636
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
636637
config.logUnused();
637638
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());

clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.slf4j.Logger;
3535

3636
import java.util.List;
37-
import java.util.Map;
3837
import java.util.Set;
3938

4039
import static java.util.Collections.singleton;
@@ -56,7 +55,7 @@ public AbortTransactionHandler(
5655

5756
public static PartitionLeaderStrategy.PartitionLeaderFuture<Void> newFuture(
5857
Set<TopicPartition> topicPartitions,
59-
Map<TopicPartition, Integer> partitionLeaderCache
58+
PartitionLeaderCache partitionLeaderCache
6059
) {
6160
return new PartitionLeaderStrategy.PartitionLeaderFuture<>(topicPartitions, partitionLeaderCache);
6261
}

clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,13 @@ public AdminApiDriver(
115115
// metadata. For all cached keys, they can proceed straight to the fulfillment map.
116116
// Note that the cache is only used on the initial calls, and any errors that result
117117
// in additional lookups use the full set of lookup keys.
118-
retryLookup(future.uncachedLookupKeys());
119-
future.cachedKeyBrokerIdMapping().forEach((key, brokerId) -> fulfillmentMap.put(new FulfillmentScope(brokerId), key));
118+
future.cachedKeyBrokerIdMapping().forEach((key, brokerId) -> {
119+
if (AdminApiFuture.UNKNOWN_BROKER_ID.equals(brokerId)) {
120+
unmap(key);
121+
} else {
122+
fulfillmentMap.put(new FulfillmentScope(brokerId), key);
123+
}
124+
});
120125
}
121126

122127
/**

clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919
import org.apache.kafka.common.KafkaFuture;
2020
import org.apache.kafka.common.internals.KafkaFutureImpl;
2121

22-
import java.util.Collections;
22+
import java.util.HashMap;
2323
import java.util.Map;
2424
import java.util.Set;
2525
import java.util.function.Function;
2626
import java.util.stream.Collectors;
2727

2828
public interface AdminApiFuture<K, V> {
2929

30+
Integer UNKNOWN_BROKER_ID = -1;
31+
3032
/**
3133
* The initial set of lookup keys. Although this will usually match the fulfillment
3234
* keys, it does not necessarily have to. For example, in the case of
@@ -39,22 +41,17 @@ public interface AdminApiFuture<K, V> {
3941
Set<K> lookupKeys();
4042

4143
/**
42-
* The set of request keys that do not have cached key-broker id mappings. If there
43-
* is no cached key mapping, this will be the same as the lookup keys.
44-
* Can be empty, but only if the cached key mapping is not empty.
45-
*/
46-
default Set<K> uncachedLookupKeys() {
47-
return lookupKeys();
48-
}
49-
50-
/**
51-
* The cached key-broker id mapping. For lookup strategies that do not make use of a
52-
* cache of metadata, this will be empty.
44+
* The cached key-broker id mapping. For non-cached values(or lookup strategies that do not make use of a
45+
* cache of metadata) the broker id will be {@link #UNKNOWN_BROKER_ID}
5346
*
5447
* @return mapping of keys to broker ids
5548
*/
5649
default Map<K, Integer> cachedKeyBrokerIdMapping() {
57-
return Collections.emptyMap();
50+
Map<K, Integer> result = new HashMap<>();
51+
for (K key : lookupKeys()) {
52+
result.put(key, UNKNOWN_BROKER_ID);
53+
}
54+
return result;
5855
}
5956

6057
/**

clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
7373

7474
public static PartitionLeaderStrategy.PartitionLeaderFuture<DeletedRecords> newFuture(
7575
Collection<TopicPartition> topicPartitions,
76-
Map<TopicPartition, Integer> partitionLeaderCache
76+
PartitionLeaderCache partitionLeaderCache
7777
) {
7878
return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new HashSet<>(topicPartitions), partitionLeaderCache);
7979
}

clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public DescribeProducersHandler(
6868

6969
public static PartitionLeaderStrategy.PartitionLeaderFuture<PartitionProducerState> newFuture(
7070
Collection<TopicPartition> topicPartitions,
71-
Map<TopicPartition, Integer> partitionLeaderCache
71+
PartitionLeaderCache partitionLeaderCache
7272
) {
7373
return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new HashSet<>(topicPartitions), partitionLeaderCache);
7474
}

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
223223

224224
public static PartitionLeaderStrategy.PartitionLeaderFuture<ListOffsetsResultInfo> newFuture(
225225
Collection<TopicPartition> topicPartitions,
226-
Map<TopicPartition, Integer> partitionLeaderCache
226+
PartitionLeaderCache partitionLeaderCache
227227
) {
228228
return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new HashSet<>(topicPartitions), partitionLeaderCache);
229229
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin.internals;
18+
19+
import org.apache.kafka.common.TopicPartition;
20+
21+
import java.util.Collection;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
public class PartitionLeaderCache {
26+
27+
private final Map<TopicPartition, Integer> cache = new HashMap<>();
28+
29+
public Map<TopicPartition, Integer> get(Collection<TopicPartition> keys) {
30+
Map<TopicPartition, Integer> result = new HashMap<>();
31+
synchronized (cache) {
32+
for (TopicPartition key : keys) {
33+
if (cache.containsKey(key)) {
34+
result.put(key, cache.get(key));
35+
}
36+
}
37+
}
38+
return result;
39+
}
40+
41+
public void put(Map<TopicPartition, Integer> values) {
42+
synchronized (cache) {
43+
cache.putAll(values);
44+
}
45+
}
46+
47+
public void remove(Collection<TopicPartition> keys) {
48+
synchronized (cache) {
49+
for (TopicPartition key : keys) {
50+
cache.remove(key);
51+
}
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)