Skip to content

Commit f8df406

Browse files
committed
Merge remote-tracking branch 'origin/kip-1170-format' into kip-1170-format
2 parents b83d676 + ce5faf6 commit f8df406

142 files changed

Lines changed: 4898 additions & 2645 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build.gradle

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,8 @@ project(':server') {
10041004
archivesName = "kafka-server"
10051005
}
10061006

1007+
apply plugin: 'java-test-fixtures'
1008+
10071009
dependencies {
10081010
compileOnly libs.bndlib
10091011
compileOnly libs.spotbugs
@@ -1033,6 +1035,13 @@ project(':server') {
10331035
testImplementation project(':test-common:test-common-runtime')
10341036
testImplementation testFixtures(project(':storage:storage-api'))
10351037
testImplementation testFixtures(project(':server-common'))
1038+
testImplementation testFixtures(project(':metadata'))
1039+
1040+
testFixturesImplementation project(':clients')
1041+
testFixturesImplementation testFixtures(project(':clients'))
1042+
testFixturesImplementation project(':metadata')
1043+
testFixturesImplementation project(':server-common')
1044+
testFixturesImplementation libs.junitJupiter
10361045

10371046
testRuntimeOnly runtimeTestLibs
10381047
}
@@ -1152,7 +1161,7 @@ project(':core') {
11521161
testImplementation testFixtures(project(':raft'))
11531162
testImplementation testFixtures(project(':server-common'))
11541163
testImplementation testFixtures(project(':storage:storage-api'))
1155-
testImplementation project(':server').sourceSets.test.output
1164+
testImplementation testFixtures(project(':server'))
11561165
testImplementation project(':streams')
11571166
testImplementation project(':test-common:test-common-runtime')
11581167
testImplementation project(':test-common:test-common-internal-api')
@@ -2430,6 +2439,8 @@ project(':storage') {
24302439
archivesName = "kafka-storage"
24312440
}
24322441

2442+
apply plugin: 'java-test-fixtures'
2443+
24332444
configurations {
24342445
generator
24352446
}
@@ -2446,17 +2457,25 @@ project(':storage') {
24462457
implementation libs.jacksonDatabind
24472458
implementation libs.metrics
24482459

2460+
testFixturesImplementation project(':storage:storage-api')
2461+
testFixturesImplementation project(':clients')
2462+
testFixturesImplementation testFixtures(project(':clients'))
2463+
testFixturesImplementation project(':server-common')
2464+
testFixturesImplementation libs.slf4jApi
2465+
24492466
testImplementation project(':clients')
24502467
testImplementation testFixtures(project(':clients'))
24512468
testImplementation project(':core')
24522469
testImplementation project(':core').sourceSets.test.output
24532470
testImplementation testFixtures(project(':storage:storage-api'))
2471+
testImplementation project(':metadata')
24542472
testImplementation project(':test-common:test-common-internal-api')
24552473
testImplementation project(':test-common:test-common-runtime')
24562474
testImplementation project(':test-common:test-common-util')
24572475
testImplementation project(':server')
24582476
testImplementation project(':server-common')
24592477
testImplementation testFixtures(project(':server-common'))
2478+
testImplementation testFixtures(project(':metadata'))
24602479
testImplementation project(':transaction-coordinator')
24612480
testImplementation libs.hamcrest
24622481
testImplementation libs.jacksonDataformatYaml
@@ -2608,6 +2627,12 @@ project(':tools') {
26082627

26092628
configurations {
26102629
releaseOnly
2630+
// ApacheDS 2.0.0-M24 pulls in the stale bcprov-jdk15on:1.56, which ships the same
2631+
// RosstandartObjectIdentifiers class as the modern bcprov-jdk18on we already depend on
2632+
// but is missing fields referenced by bcpkix-jdk18on:1.84. When IntelliJ's test runner
2633+
// orders the old JAR first, BC provider registration fails with NoSuchFieldError. Drop it.
2634+
testCompileClasspath.exclude group: 'org.bouncycastle', module: 'bcprov-jdk15on'
2635+
testRuntimeClasspath.exclude group: 'org.bouncycastle', module: 'bcprov-jdk15on'
26112636
}
26122637

26132638
dependencies {
@@ -2642,8 +2667,8 @@ project(':tools') {
26422667
testImplementation project(':clients')
26432668
testImplementation testFixtures(project(':clients'))
26442669
testImplementation project(':server')
2645-
testImplementation project(':server').sourceSets.test.output
26462670
testImplementation project(':core')
2671+
testImplementation testFixtures(project(':server'))
26472672
testImplementation project(':core').sourceSets.test.output
26482673
testImplementation project(':test-common:test-common-internal-api')
26492674
testImplementation project(':test-common:test-common-runtime')
@@ -2653,7 +2678,7 @@ project(':tools') {
26532678
testImplementation project(':connect:runtime')
26542679
testImplementation project(':connect:runtime').sourceSets.test.output
26552680
testImplementation project(':storage:storage-api').sourceSets.main.output
2656-
testImplementation project(':storage').sourceSets.test.output
2681+
testImplementation testFixtures(project(':storage'))
26572682
testImplementation project(':streams')
26582683
testImplementation project(':streams').sourceSets.test.output
26592684
testImplementation project(':streams:integration-tests').sourceSets.test.output
@@ -3552,7 +3577,6 @@ project(':jmh-benchmarks') {
35523577
implementation testFixtures(project(':clients'))
35533578
implementation testFixtures(project(':server-common'))
35543579
implementation testFixtures(project(':metadata'))
3555-
implementation project(':server').sourceSets.test.output
35563580

35573581
implementation libs.jmhCore
35583582
annotationProcessor libs.jmhGeneratorAnnProcess
@@ -4117,8 +4141,10 @@ gradle.projectsEvaluated {
41174141
def protectedModules = [
41184142
'clients': ':clients',
41194143
'server-common': ':server-common',
4144+
'server': ':server',
41204145
'storage/api': ':storage:storage-api',
41214146
'coordinator-common': ':coordinator-common',
4147+
'storage': ':storage',
41224148
'group-coordinator': ':group-coordinator',
41234149
'share-coordinator': ':share-coordinator',
41244150
'metadata': ':metadata',

checkstyle/import-control-server-common.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134

135135
<allow class="org.apache.kafka.server.util.TopicFilter.IncludeList" />
136136
<allow class="org.apache.kafka.test.TestUtils" />
137+
<allow class="org.apache.kafka.server.authorizer.Authorizer" />
137138
<!-- ServerTestUtils uses yammer metrics for test cleanup -->
138139
<allow pkg="org.apache.kafka.server.metrics" />
139140
<allow pkg="com.yammer.metrics.core" />

checkstyle/import-control-server.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,12 @@
114114
<allow pkg="org.apache.kafka.network" />
115115
<allow pkg="org.apache.kafka.server" />
116116
<allow pkg="org.apache.kafka.server.authorizer" />
117+
<allow class="org.apache.kafka.controller.MockAclMutator" />
117118
</subpackage>
118119

119120
<subpackage name="network">
120121
<allow pkg="com.fasterxml.jackson" />
122+
<allow pkg="org.apache.kafka.network.metrics" />
121123
</subpackage>
122124

123125
</import-control>

checkstyle/import-control-storage.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
<allow pkg="org.apache.kafka.tiered.storage" />
119119

120120
<allow pkg="kafka.api" />
121+
<allow pkg="kafka.integration" />
121122
<allow pkg="kafka.log" />
122123
<allow pkg="kafka.server" />
123124
<allow pkg="kafka.utils" />

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,34 @@ public synchronized void rebalance(Collection<TopicPartition> newAssignment) {
142142
this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsAssigned(added));
143143
}
144144

145+
/**
146+
* Simulates a partition loss event. Calls {@link ConsumerRebalanceListener#onPartitionsLost}
147+
* for the specified partitions and removes them from the current assignment. Unlike
148+
* {@link #rebalance(Collection)}, which calls {@link ConsumerRebalanceListener#onPartitionsRevoked},
149+
* this method models the case where the consumer loses partitions without a graceful revoke..
150+
*
151+
* <p>Only records belonging to the lost partitions are cleared; records for retained
152+
* partitions are unaffected.
153+
*
154+
* @param partitionsLost the partitions to lose; all must be currently assigned
155+
* @throws IllegalStateException if any partition is not currently assigned
156+
*/
157+
public synchronized void losePartitions(Collection<TopicPartition> partitionsLost) {
158+
Set<TopicPartition> currentAssignment = this.subscriptions.assignedPartitions();
159+
Set<TopicPartition> lost = new HashSet<>(partitionsLost);
160+
List<TopicPartition> notAssigned = lost.stream()
161+
.filter(tp -> !currentAssignment.contains(tp))
162+
.collect(Collectors.toList());
163+
if (!notAssigned.isEmpty())
164+
throw new IllegalStateException("Cannot lose partitions that are not currently assigned: " + notAssigned);
165+
lost.forEach(records::remove);
166+
this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsLost(lost));
167+
Set<TopicPartition> remaining = currentAssignment.stream()
168+
.filter(tp -> !lost.contains(tp))
169+
.collect(Collectors.toSet());
170+
this.subscriptions.assignFromSubscribed(remaining);
171+
}
172+
145173
@Override
146174
public synchronized Set<String> subscription() {
147175
return this.subscriptions.subscription();

clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ public boolean modified() {
460460

461461
private KeyStore createKeyStoreFromPem(String privateKeyPem, String certChainPem, char[] keyPassword) {
462462
try {
463-
KeyStore ks = KeyStore.getInstance("PKCS12");
463+
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
464464
ks.load(null, null);
465465
Key key = privateKey(privateKeyPem, keyPassword);
466466
Certificate[] certChain = certs(certChainPem);
@@ -473,7 +473,7 @@ private KeyStore createKeyStoreFromPem(String privateKeyPem, String certChainPem
473473

474474
private KeyStore createTrustStoreFromPem(String trustedCertsPem) {
475475
try {
476-
KeyStore ts = KeyStore.getInstance("PKCS12");
476+
KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType());
477477
ts.load(null, null);
478478
Certificate[] certs = certs(trustedCertsPem);
479479
for (int i = 0; i < certs.length; i++) {

clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Iterator;
3333
import java.util.List;
3434
import java.util.Optional;
35+
import java.util.Set;
3536
import java.util.stream.IntStream;
3637

3738
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -230,4 +231,108 @@ public void shouldReturnMaxPollRecords() {
230231
assertTrue(records.isEmpty());
231232
}
232233

234+
@Test
235+
public void testLosePartitionsCallsOnPartitionsLost() {
236+
TopicPartition tp0 = new TopicPartition("test", 0);
237+
TopicPartition tp1 = new TopicPartition("test", 1);
238+
List<TopicPartition> assigned = List.of(tp0, tp1);
239+
240+
List<TopicPartition> lost = new ArrayList<>();
241+
List<TopicPartition> revoked = new ArrayList<>();
242+
consumer.subscribe(List.of("test"), new ConsumerRebalanceListener() {
243+
@Override
244+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
245+
revoked.addAll(partitions);
246+
}
247+
@Override
248+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
249+
@Override
250+
public void onPartitionsLost(Collection<TopicPartition> partitions) {
251+
lost.addAll(partitions);
252+
}
253+
});
254+
255+
consumer.rebalance(assigned);
256+
consumer.losePartitions(List.of(tp0));
257+
258+
assertEquals(List.of(tp0), lost);
259+
assertTrue(revoked.isEmpty());
260+
}
261+
262+
@Test
263+
public void testLosePartitionsRemovesFromAssignment() {
264+
TopicPartition tp0 = new TopicPartition("test", 0);
265+
TopicPartition tp1 = new TopicPartition("test", 1);
266+
267+
consumer.subscribe(List.of("test"));
268+
consumer.rebalance(List.of(tp0, tp1));
269+
consumer.losePartitions(List.of(tp0));
270+
271+
assertFalse(consumer.assignment().contains(tp0));
272+
assertTrue(consumer.assignment().contains(tp1));
273+
}
274+
275+
@Test
276+
public void testLosePartitionsThrowsIfNotAssigned() {
277+
TopicPartition tp0 = new TopicPartition("test", 0);
278+
TopicPartition tp1 = new TopicPartition("test", 1);
279+
280+
consumer.subscribe(List.of("test"));
281+
consumer.rebalance(List.of(tp0));
282+
283+
assertThrows(IllegalStateException.class,
284+
() -> consumer.losePartitions(List.of(tp1)));
285+
}
286+
287+
@Test
288+
public void testLosePartitionsClearsOnlyLostRecords() {
289+
TopicPartition tp0 = new TopicPartition("test", 0);
290+
TopicPartition tp1 = new TopicPartition("test", 1);
291+
292+
consumer.subscribe(List.of("test"));
293+
consumer.rebalance(List.of(tp0, tp1));
294+
consumer.updateBeginningOffsets(new HashMap<>() {{
295+
put(tp0, 0L);
296+
put(tp1, 0L);
297+
}});
298+
consumer.seek(tp0, 0);
299+
consumer.seek(tp1, 0);
300+
301+
consumer.addRecord(new ConsumerRecord<>("test", 0, 0, null, null));
302+
consumer.addRecord(new ConsumerRecord<>("test", 1, 0, null, null));
303+
304+
consumer.losePartitions(List.of(tp0));
305+
306+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
307+
assertEquals(1, records.count());
308+
309+
var record = records.iterator().next();
310+
assertEquals(tp1, new TopicPartition(record.topic(), record.partition()));
311+
}
312+
313+
@Test
314+
public void testLosePartitionsThenRebalance() {
315+
TopicPartition tp0 = new TopicPartition("test", 0);
316+
TopicPartition tp1 = new TopicPartition("test", 1);
317+
TopicPartition tp2 = new TopicPartition("test", 2);
318+
319+
List<TopicPartition> assigned = new ArrayList<>();
320+
consumer.subscribe(List.of("test"), new ConsumerRebalanceListener() {
321+
@Override
322+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
323+
@Override
324+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
325+
assigned.addAll(partitions);
326+
}
327+
});
328+
329+
consumer.rebalance(List.of(tp0, tp1));
330+
assigned.clear();
331+
332+
consumer.losePartitions(List.of(tp0));
333+
consumer.rebalance(Arrays.asList(tp1, tp2));
334+
335+
assertEquals(List.of(tp2), assigned);
336+
assertEquals(Set.of(tp1, tp2), consumer.assignment());
337+
}
233338
}

clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public void testPemTrustStoreConfigWithOneCert() throws Exception {
221221
assertEquals(List.of("kafka0"), aliases);
222222
assertNotNull(trustStore.getCertificate("kafka0"), "Certificate not loaded");
223223
assertNull(trustStore.getKey("kafka0", null), "Unexpected private key");
224+
assertEquals(KeyStore.getDefaultType(), trustStore.getType());
224225
}
225226

226227
@Test
@@ -236,6 +237,7 @@ public void testPemTrustStoreConfigWithMultipleCerts() throws Exception {
236237
assertNull(trustStore.getKey("kafka0", null), "Unexpected private key");
237238
assertNotNull(trustStore.getCertificate("kafka1"), "Certificate not loaded");
238239
assertNull(trustStore.getKey("kafka1", null), "Unexpected private key");
240+
assertEquals(KeyStore.getDefaultType(), trustStore.getType());
239241
}
240242

241243
@Test
@@ -276,6 +278,7 @@ private void verifyPemKeyStoreConfig(String keyFileName, Password keyPassword) t
276278
assertNotNull(keyStore.getCertificate("kafka"), "Certificate not loaded");
277279
assertNotNull(keyStore.getKey("kafka", keyPassword == null ? null : keyPassword.value().toCharArray()),
278280
"Private key not loaded");
281+
assertEquals(KeyStore.getDefaultType(), keyStore.getType());
279282
}
280283

281284
@Test
@@ -289,6 +292,7 @@ public void testPemTrustStoreFile() throws Exception {
289292
assertEquals(List.of("kafka0"), aliases);
290293
assertNotNull(trustStore.getCertificate("kafka0"), "Certificate not found");
291294
assertNull(trustStore.getKey("kafka0", null), "Unexpected private key");
295+
assertEquals(KeyStore.getDefaultType(), trustStore.getType());
292296
}
293297

294298
@Test
@@ -304,6 +308,7 @@ public void testPemKeyStoreFileNoKeyPassword() throws Exception {
304308
assertEquals(List.of("kafka"), aliases);
305309
assertNotNull(keyStore.getCertificate("kafka"), "Certificate not loaded");
306310
assertNotNull(keyStore.getKey("kafka", null), "Private key not loaded");
311+
assertEquals(KeyStore.getDefaultType(), keyStore.getType());
307312
}
308313

309314
@Test
@@ -319,6 +324,7 @@ public void testPemKeyStoreFileWithKeyPassword() throws Exception {
319324
assertEquals(List.of("kafka"), aliases);
320325
assertNotNull(keyStore.getCertificate("kafka"), "Certificate not found");
321326
assertNotNull(keyStore.getKey("kafka", KEY_PASSWORD.value().toCharArray()), "Private key not found");
327+
assertEquals(KeyStore.getDefaultType(), keyStore.getType());
322328
}
323329

324330
private String pemFilePath(String pem) throws Exception {

0 commit comments

Comments
 (0)