Skip to content

Commit 9c69272

Browse files
committed
Add TTL-based expiration to PartitionLeaderCache
Cached partition leader mappings previously lived forever, which could cause AdminClient operations to target stale brokers after leadership changes. This adds a TTL (driven by metadata.max.age.ms) so that expired entries are transparently filtered out of get() results, forcing a fresh metadata lookup via the existing AdminApiDriver mechanism.
1 parent 78a66fa commit 9c69272

3 files changed

Lines changed: 175 additions & 5 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ private KafkaAdminClient(AdminClientConfig config,
632632
CommonClientConfigs.RETRY_BACKOFF_JITTER);
633633
this.clientTelemetryReporter = clientTelemetryReporter;
634634
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
635-
this.partitionLeaderCache = new PartitionLeaderCache();
635+
this.partitionLeaderCache = new PartitionLeaderCache(time, config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
636636
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
637637
config.logUnused();
638638
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());

clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderCache.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,46 @@
1717
package org.apache.kafka.clients.admin.internals;
1818

1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.utils.Time;
2021

2122
import java.util.Collection;
2223
import java.util.HashMap;
2324
import java.util.Map;
2425

2526
public class PartitionLeaderCache {
2627

27-
private final Map<TopicPartition, Integer> cache = new HashMap<>();
28+
private final Map<TopicPartition, CacheEntry> cache = new HashMap<>();
29+
private final Time time;
30+
private final long ttlMs;
31+
32+
public PartitionLeaderCache() {
33+
this(Time.SYSTEM, Long.MAX_VALUE);
34+
}
35+
36+
public PartitionLeaderCache(Time time, long ttlMs) {
37+
this.time = time;
38+
this.ttlMs = ttlMs;
39+
}
2840

2941
public Map<TopicPartition, Integer> get(Collection<TopicPartition> keys) {
3042
Map<TopicPartition, Integer> result = new HashMap<>();
43+
long now = time.milliseconds();
3144
synchronized (cache) {
3245
for (TopicPartition key : keys) {
33-
if (cache.containsKey(key)) {
34-
result.put(key, cache.get(key));
46+
CacheEntry entry = cache.get(key);
47+
if (entry != null && !entry.isExpired(now, ttlMs)) {
48+
result.put(key, entry.brokerId);
3549
}
3650
}
3751
}
3852
return result;
3953
}
4054

4155
public void put(Map<TopicPartition, Integer> values) {
56+
long now = time.milliseconds();
4257
synchronized (cache) {
43-
cache.putAll(values);
58+
values.forEach((tp, brokerId) ->
59+
cache.put(tp, new CacheEntry(brokerId, now)));
4460
}
4561
}
4662

@@ -51,4 +67,18 @@ public void remove(Collection<TopicPartition> keys) {
5167
}
5268
}
5369
}
70+
71+
private static class CacheEntry {
72+
final int brokerId;
73+
final long timestampMs;
74+
75+
CacheEntry(int brokerId, long timestampMs) {
76+
this.brokerId = brokerId;
77+
this.timestampMs = timestampMs;
78+
}
79+
80+
boolean isExpired(long nowMs, long ttlMs) {
81+
return (nowMs - timestampMs) >= ttlMs;
82+
}
83+
}
5484
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin.internals;
18+
19+
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.utils.MockTime;
21+
import org.apache.kafka.common.utils.Time;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.util.Arrays;
26+
import java.util.Collections;
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
class PartitionLeaderCacheTest {
34+
35+
private static final long TTL_MS = 10_000L;
36+
private static final TopicPartition TP0 = new TopicPartition("topic", 0);
37+
private static final TopicPartition TP1 = new TopicPartition("topic", 1);
38+
private static final TopicPartition TP2 = new TopicPartition("topic", 2);
39+
40+
@Test
41+
void nonExpiredEntries() {
42+
MockTime time = new MockTime();
43+
PartitionLeaderCache cache = new PartitionLeaderCache(time, TTL_MS);
44+
45+
Map<TopicPartition, Integer> entries = new HashMap<>();
46+
entries.put(TP0, 0);
47+
entries.put(TP1, 1);
48+
cache.put(entries);
49+
50+
// Advance time but stay under TTL
51+
time.sleep(TTL_MS - 1);
52+
53+
Map<TopicPartition, Integer> result = cache.get(Arrays.asList(TP0, TP1));
54+
assertEquals(2, result.size());
55+
assertEquals(0, result.get(TP0));
56+
assertEquals(1, result.get(TP1));
57+
}
58+
59+
@Test
60+
void filtersExpiredEntries() {
61+
MockTime time = new MockTime();
62+
PartitionLeaderCache cache = new PartitionLeaderCache(time, TTL_MS);
63+
64+
Map<TopicPartition, Integer> entries = new HashMap<>();
65+
entries.put(TP0, 0);
66+
entries.put(TP1, 1);
67+
cache.put(entries);
68+
69+
// Advance time to exactly TTL (should expire)
70+
time.sleep(TTL_MS);
71+
72+
Map<TopicPartition, Integer> result = cache.get(Arrays.asList(TP0, TP1));
73+
assertTrue(result.isEmpty());
74+
}
75+
76+
@Test
77+
void putRefreshesTimestamp() {
78+
MockTime time = new MockTime();
79+
PartitionLeaderCache cache = new PartitionLeaderCache(time, TTL_MS);
80+
81+
cache.put(Collections.singletonMap(TP0, 0));
82+
83+
// Advance close to TTL
84+
time.sleep(TTL_MS - 2);
85+
86+
// Re-put to refresh the timestamp
87+
cache.put(Collections.singletonMap(TP0, 0));
88+
89+
// Advance again close to TTL from the refresh point
90+
time.sleep(TTL_MS - 2);
91+
92+
// Should still be valid since we refreshed
93+
Map<TopicPartition, Integer> result = cache.get(Collections.singletonList(TP0));
94+
assertEquals(1, result.size());
95+
assertEquals(0, result.get(TP0));
96+
}
97+
98+
@Test
99+
public void partialExpiration() {
100+
MockTime time = new MockTime();
101+
PartitionLeaderCache cache = new PartitionLeaderCache(time, TTL_MS);
102+
103+
// Put TP0 first
104+
cache.put(Collections.singletonMap(TP0, 0));
105+
106+
// Advance half the TTL
107+
time.sleep(TTL_MS / 2);
108+
109+
// Put TP1 later
110+
cache.put(Collections.singletonMap(TP1, 1));
111+
112+
// Advance so TP0 expires but TP1 does not
113+
time.sleep((TTL_MS / 2) + 1);
114+
115+
Map<TopicPartition, Integer> result = cache.get(Arrays.asList(TP0, TP1));
116+
assertEquals(1, result.size());
117+
assertEquals(1, result.get(TP1));
118+
}
119+
120+
@Test
121+
void noArgConstructorNeverExpires() { // backwards compatibility test
122+
PartitionLeaderCache cache = new PartitionLeaderCache();
123+
124+
Map<TopicPartition, Integer> entries = new HashMap<>();
125+
entries.put(TP0, 0);
126+
entries.put(TP1, 1);
127+
entries.put(TP2, 2);
128+
cache.put(entries);
129+
130+
Time.SYSTEM.sleep(TTL_MS * 2);
131+
132+
// With the no-arg constructor using Long.MAX_VALUE TTL and Time.SYSTEM,
133+
// entries should effectively never expire
134+
Map<TopicPartition, Integer> result = cache.get(Arrays.asList(TP0, TP1, TP2));
135+
assertEquals(3, result.size());
136+
assertEquals(0, result.get(TP0));
137+
assertEquals(1, result.get(TP1));
138+
assertEquals(2, result.get(TP2));
139+
}
140+
}

0 commit comments

Comments
 (0)