Skip to content

Commit a666908

Browse files
committed
Merge remote-tracking branch 'upstream/trunk' into kip848_await_async_commit_legacy
2 parents f299290 + e4d34dd commit a666908

File tree

529 files changed

+6649
-3509
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

529 files changed

+6649
-3509
lines changed

Diff for: build.gradle

+22-4
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ ext {
114114
repo = file("$rootDir/.git").isDirectory() ? Grgit.open(currentDir: project.getRootDir()) : null
115115

116116
commitId = determineCommitId()
117+
118+
addParametersForTests = { name, options ->
119+
// -parameters generates arguments with parameter names in TestInfo#getDisplayName.
120+
// ref: https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTest.java#L161-L164
121+
if (name == "compileTestJava" || name == "compileTestScala")
122+
options.compilerArgs << "-parameters"
123+
}
117124
}
118125

119126
allprojects {
@@ -278,11 +285,9 @@ subprojects {
278285
// --source/--target 8 is deprecated in Java 20, suppress warning until Java 8 support is dropped in Kafka 4.0
279286
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_20))
280287
options.compilerArgs << "-Xlint:-options"
281-
}
282288

283-
// -parameters generates arguments with parameter names in TestInfo#getDisplayName.
284-
// ref: https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTest.java#L161-L164
285-
compileTestJava.options.compilerArgs.add "-parameters"
289+
addParametersForTests(name, options)
290+
}
286291

287292
// We should only set this if Java version is < 9 (--release is recommended for >= 9), but the Scala plugin for IntelliJ sets
288293
// `-target` incorrectly if this is unset
@@ -703,6 +708,8 @@ subprojects {
703708
if (versions.baseScala == "2.13" || JavaVersion.current().isJava9Compatible())
704709
scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)]
705710

711+
addParametersForTests(name, options)
712+
706713
configure(scalaCompileOptions.forkOptions) {
707714
memoryMaximumSize = defaultMaxHeapSize
708715
jvmArgs = defaultJvmArgs
@@ -1837,6 +1844,7 @@ project(':storage') {
18371844
testImplementation project(':clients').sourceSets.test.output
18381845
testImplementation project(':core')
18391846
testImplementation project(':core').sourceSets.test.output
1847+
testImplementation project(':server')
18401848
testImplementation project(':server-common')
18411849
testImplementation project(':server-common').sourceSets.test.output
18421850
testImplementation libs.hamcrest
@@ -2025,6 +2033,7 @@ project(':tools') {
20252033
testImplementation project(':connect:runtime')
20262034
testImplementation project(':connect:runtime').sourceSets.test.output
20272035
testImplementation project(':storage:storage-api').sourceSets.main.output
2036+
testImplementation project(':group-coordinator')
20282037
testImplementation libs.junitJupiter
20292038
testImplementation libs.mockitoCore
20302039
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
@@ -2177,10 +2186,13 @@ project(':streams') {
21772186
testCompileOnly project(':streams:test-utils')
21782187

21792188
testImplementation project(':clients').sourceSets.test.output
2189+
testImplementation project(':server')
21802190
testImplementation project(':core')
21812191
testImplementation project(':tools')
21822192
testImplementation project(':core').sourceSets.test.output
21832193
testImplementation project(':storage')
2194+
testImplementation project(':group-coordinator')
2195+
testImplementation project(':transaction-coordinator')
21842196
testImplementation project(':server-common')
21852197
testImplementation project(':server-common').sourceSets.test.output
21862198
testImplementation project(':server')
@@ -2192,6 +2204,7 @@ project(':streams') {
21922204
testImplementation libs.hamcrest
21932205
testImplementation libs.mockitoCore
21942206
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
2207+
testImplementation project(':group-coordinator')
21952208

21962209
testRuntimeOnly project(':streams:test-utils')
21972210
testRuntimeOnly libs.slf4jlog4j
@@ -2328,6 +2341,7 @@ project(':streams:streams-scala') {
23282341
// So we make sure to not include it in the dependencies.
23292342
api libs.scalaCollectionCompat
23302343
}
2344+
testImplementation project(':group-coordinator')
23312345
testImplementation project(':core')
23322346
testImplementation project(':core').sourceSets.test.output
23332347
testImplementation project(':server-common').sourceSets.test.output
@@ -2991,12 +3005,15 @@ project(':connect:runtime') {
29913005

29923006
testImplementation project(':clients').sourceSets.test.output
29933007
testImplementation project(':core')
3008+
testImplementation project(':server')
29943009
testImplementation project(':metadata')
29953010
testImplementation project(':core').sourceSets.test.output
29963011
testImplementation project(':server-common')
29973012
testImplementation project(':server')
3013+
testImplementation project(':group-coordinator')
29983014
testImplementation project(':storage')
29993015
testImplementation project(':connect:test-plugins')
3016+
testImplementation project(':group-coordinator')
30003017

30013018
testImplementation libs.easymock
30023019
testImplementation libs.junitJupiterApi
@@ -3207,6 +3224,7 @@ project(':connect:mirror') {
32073224
testImplementation project(':connect:runtime').sourceSets.test.output
32083225
testImplementation project(':core')
32093226
testImplementation project(':core').sourceSets.test.output
3227+
testImplementation project(':server')
32103228

32113229
testRuntimeOnly project(':connect:runtime')
32123230
testRuntimeOnly libs.slf4jlog4j

Diff for: checkstyle/import-control-core.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<allow pkg="kafka.serializer" />
3838
<allow pkg="org.apache.kafka.common" />
3939
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
40-
40+
<allow pkg="org.apache.kafka.server"/>
4141
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
4242
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable -->
4343
<disallow class="com.yammer.metrics.Metrics" />

Diff for: checkstyle/import-control-server-common.xml

+4
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,13 @@
111111
<allow class="org.apache.kafka.server.util.ShutdownableThread" />
112112
</subpackage>
113113
</subpackage>
114+
<subpackage name="config">
115+
<allow pkg="org.apache.kafka.server"/>
116+
</subpackage>
114117
</subpackage>
115118

116119
<subpackage name="admin">
117120
<allow pkg="org.apache.kafka.server.common" />
118121
</subpackage>
122+
119123
</import-control>

Diff for: checkstyle/import-control-server.xml

+3
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@
8282
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
8383
<allow pkg="org.apache.kafka.server.telemetry" />
8484
</subpackage>
85+
<subpackage name="config">
86+
<allow pkg="org.apache.kafka.server" />
87+
</subpackage>
8588
</subpackage>
8689

8790
<subpackage name="security">

Diff for: checkstyle/import-control-storage.xml

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
<allow pkg="org.apache.kafka.server.log" />
112112
<allow pkg="org.apache.kafka.server.log.remote" />
113113
<allow pkg="org.apache.kafka.server.log.remote.storage" />
114+
<allow pkg="org.apache.kafka.server.config" />
114115

115116
<allow pkg="org.apache.kafka.test" />
116117
<subpackage name="actions">

Diff for: checkstyle/import-control.xml

+8-1
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@
303303
<allow pkg="kafka.admin" />
304304
<allow pkg="kafka.server" />
305305
<allow pkg="org.apache.kafka.storage.internals" />
306+
<allow pkg="org.apache.kafka.server.config" />
306307
<allow pkg="org.apache.kafka.server.common" />
307308
<allow pkg="org.apache.kafka.clients" />
308309
<allow pkg="org.apache.kafka.clients.admin" />
@@ -323,8 +324,8 @@
323324

324325
<subpackage name="consumer">
325326
<allow pkg="org.apache.kafka.tools"/>
326-
327327
<subpackage name="group">
328+
<allow pkg="org.apache.kafka.coordinator.group"/>
328329
<allow pkg="kafka.api"/>
329330
<allow pkg="kafka.security"/>
330331
<allow pkg="kafka.zk" />
@@ -413,6 +414,8 @@
413414
<allow pkg="org.apache.kafka.tools" />
414415
<allow pkg="org.apache.kafka.server.config" />
415416
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
417+
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfigs" />
418+
<allow pkg="org.apache.kafka.coordinator.group" />
416419
</subpackage>
417420

418421
<subpackage name="test">
@@ -483,6 +486,8 @@
483486
<allow pkg="org.apache.kafka.connect.components"/>
484487
<allow pkg="org.apache.kafka.clients" />
485488
<allow pkg="org.apache.kafka.test"/>
489+
<!-- for testing -->
490+
<allow pkg="org.apache.kafka.coordinator.group" />
486491

487492
<subpackage name="source">
488493
<allow pkg="org.apache.kafka.connect.connector" />
@@ -532,6 +537,7 @@
532537
<!-- for tests -->
533538
<allow pkg="org.apache.kafka.connect.integration" />
534539
<allow pkg="org.apache.kafka.connect.mirror" />
540+
<allow pkg="org.apache.kafka.server.config" />
535541
<allow pkg="kafka.server" />
536542
<subpackage name="rest">
537543
<allow pkg="javax.ws.rs" />
@@ -595,6 +601,7 @@
595601
<allow pkg="com.fasterxml.jackson.annotation" />
596602
<allow pkg="com.fasterxml.jackson.databind" />
597603
<subpackage name="clusters">
604+
<allow pkg="org.apache.kafka.server.config" />
598605
<allow pkg="kafka.cluster" />
599606
<allow pkg="kafka.server" />
600607
<allow pkg="kafka.zk" />

Diff for: checkstyle/suppressions.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
100100

101101
<suppress checks="NPathComplexity"
102-
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator).java"/>
102+
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
103103

104104
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
105105
files="CoordinatorClient.java"/>

Diff for: clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,7 @@ public InFlightRequests(int maxInFlightRequestsPerConnection) {
4444
*/
4545
public void add(NetworkClient.InFlightRequest request) {
4646
String destination = request.destination;
47-
Deque<NetworkClient.InFlightRequest> reqs = this.requests.get(destination);
48-
if (reqs == null) {
49-
reqs = new ArrayDeque<>();
50-
this.requests.put(destination, reqs);
51-
}
47+
Deque<NetworkClient.InFlightRequest> reqs = this.requests.computeIfAbsent(destination, k -> new ArrayDeque<>());
5248
reqs.addFirst(request);
5349
inFlightRequestCount.incrementAndGet();
5450
}

Diff for: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public interface KafkaClient extends Closeable {
7070

7171
/**
7272
* Check if the connection of the node has failed, based on the connection state. Such connection failure are
73-
* usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
73+
* usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)}
7474
* call, but there are cases where transient failures needs to be caught and re-acted upon.
7575
*
7676
* @param node the node to check

Diff for: clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
*
2626
* Client code should use the newer {@link Admin} interface in preference to this class.
2727
*
28-
* This class may be removed in a later release, but has not be marked as deprecated to avoid unnecessary noise.
28+
* This class may be removed in a later release, but has not been marked as deprecated to avoid unnecessary noise.
2929
*/
3030
public abstract class AdminClient implements Admin {
3131

Diff for: clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ protected CreateTopicsResult(Map<String, KafkaFuture<TopicMetadataAndConfig>> fu
4646
*/
4747
public Map<String, KafkaFuture<Void>> values() {
4848
return futures.entrySet().stream()
49-
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null)));
49+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> null)));
5050
}
5151

5252
/**

Diff for: clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java

+10-13
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,18 @@ public Map<ConfigResource, KafkaFuture<Config>> values() {
5353
*/
5454
public KafkaFuture<Map<ConfigResource, Config>> all() {
5555
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
56-
thenApply(new KafkaFuture.BaseFunction<Void, Map<ConfigResource, Config>>() {
57-
@Override
58-
public Map<ConfigResource, Config> apply(Void v) {
59-
Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
60-
for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
61-
try {
62-
configs.put(entry.getKey(), entry.getValue().get());
63-
} catch (InterruptedException | ExecutionException e) {
64-
// This should be unreachable, because allOf ensured that all the futures
65-
// completed successfully.
66-
throw new RuntimeException(e);
67-
}
56+
thenApply(v -> {
57+
Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
58+
for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
59+
try {
60+
configs.put(entry.getKey(), entry.getValue().get());
61+
} catch (InterruptedException | ExecutionException e) {
62+
// This should be unreachable, because allOf ensured that all the futures
63+
// completed successfully.
64+
throw new RuntimeException(e);
6865
}
69-
return configs;
7066
}
67+
return configs;
7168
});
7269
}
7370
}

Diff for: clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java

-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public class DescribeLogDirsResult {
4848
* @deprecated Deprecated Since Kafka 2.7. Use {@link #descriptions()}.
4949
*/
5050
@Deprecated
51-
@SuppressWarnings("deprecation")
5251
public Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> values() {
5352
return descriptions().entrySet().stream()
5453
.collect(Collectors.toMap(
@@ -87,7 +86,6 @@ public Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions()
8786
* @deprecated Deprecated Since Kafka 2.7. Use {@link #allDescriptions()}.
8887
*/
8988
@Deprecated
90-
@SuppressWarnings("deprecation")
9189
public KafkaFuture<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> all() {
9290
return allDescriptions().thenApply(map -> map.entrySet().stream().collect(Collectors.toMap(
9391
entry -> entry.getKey(),

Diff for: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {
3131

3232
private boolean includeAuthorizedOperations;
33+
private int partitionSizeLimitPerResponse = 2000;
3334

3435
/**
3536
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
@@ -47,8 +48,18 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz
4748
return this;
4849
}
4950

51+
// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config
52+
// max.request.partition.size.limit on the server side.
53+
public DescribeTopicsOptions partitionSizeLimitPerResponse(int partitionSizeLimitPerResponse) {
54+
this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse;
55+
return this;
56+
}
57+
5058
public boolean includeAuthorizedOperations() {
5159
return includeAuthorizedOperations;
5260
}
5361

62+
public int partitionSizeLimitPerResponse() {
63+
return partitionSizeLimitPerResponse;
64+
}
5465
}

Diff for: clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java

+9-12
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,17 @@ public KafkaFuture<Void> all() {
5757
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
5858

5959
partitions().whenComplete(
60-
new KafkaFuture.BiConsumer<Map<TopicPartition, Optional<Throwable>>, Throwable>() {
61-
@Override
62-
public void accept(Map<TopicPartition, Optional<Throwable>> topicPartitions, Throwable throwable) {
63-
if (throwable != null) {
64-
result.completeExceptionally(throwable);
65-
} else {
66-
for (Optional<Throwable> exception : topicPartitions.values()) {
67-
if (exception.isPresent()) {
68-
result.completeExceptionally(exception.get());
69-
return;
70-
}
60+
(topicPartitions, throwable) -> {
61+
if (throwable != null) {
62+
result.completeExceptionally(throwable);
63+
} else {
64+
for (Optional<Throwable> exception : topicPartitions.values()) {
65+
if (exception.isPresent()) {
66+
result.completeExceptionally(exception.get());
67+
return;
7168
}
72-
result.complete(null);
7369
}
70+
result.complete(null);
7471
}
7572
});
7673

0 commit comments

Comments
 (0)