Skip to content

Commit 9cfbc5e

Browse files
authored
Merge branch 'apache:trunk' into storage
2 parents dbaa4d6 + 5d86a6b commit 9cfbc5e

51 files changed

Lines changed: 2580 additions & 1834 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2467,6 +2467,7 @@ project(':storage') {
24672467
testImplementation project(':core')
24682468
testImplementation project(':core').sourceSets.test.output
24692469
testImplementation testFixtures(project(':storage:storage-api'))
2470+
testImplementation project(':metadata')
24702471
testImplementation project(':test-common:test-common-internal-api')
24712472
testImplementation project(':test-common:test-common-runtime')
24722473
testImplementation project(':test-common:test-common-util')
@@ -2625,6 +2626,12 @@ project(':tools') {
26252626

26262627
configurations {
26272628
releaseOnly
2629+
// ApacheDS 2.0.0-M24 pulls in the stale bcprov-jdk15on:1.56, which ships the same
2630+
// RosstandartObjectIdentifiers class as the modern bcprov-jdk18on we already depend on
2631+
// but is missing fields referenced by bcpkix-jdk18on:1.84. When IntelliJ's test runner
2632+
// orders the old JAR first, BC provider registration fails with NoSuchFieldError. Drop it.
2633+
testCompileClasspath.exclude group: 'org.bouncycastle', module: 'bcprov-jdk15on'
2634+
testRuntimeClasspath.exclude group: 'org.bouncycastle', module: 'bcprov-jdk15on'
26282635
}
26292636

26302637
dependencies {

checkstyle/import-control-storage.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
<allow pkg="org.apache.kafka.tiered.storage" />
119119

120120
<allow pkg="kafka.api" />
121+
<allow pkg="kafka.integration" />
121122
<allow pkg="kafka.log" />
122123
<allow pkg="kafka.server" />
123124
<allow pkg="kafka.utils" />

clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public String metricsContentType() {
8585
return OTLP_CONTENT_TYPE;
8686
}
8787

88-
public ByteBuffer metricsData() {
88+
public ByteBuffer metricsData(int maxDecompressedBytes) {
8989
CompressionType cType = CompressionType.forId(this.data.compressionType());
9090
return (cType == CompressionType.NONE) ?
91-
this.data.metrics() : ClientTelemetryUtils.decompress(this.data.metrics(), cType);
91+
this.data.metrics() : ClientTelemetryUtils.decompress(this.data.metrics(), cType, maxDecompressedBytes);
9292
}
9393

9494
public static PushTelemetryRequest parse(Readable readable, short version) {

clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.common.KafkaException;
2020
import org.apache.kafka.common.Uuid;
2121
import org.apache.kafka.common.compress.Compression;
22+
import org.apache.kafka.common.errors.TelemetryTooLargeException;
2223
import org.apache.kafka.common.metrics.MetricsContext;
2324
import org.apache.kafka.common.protocol.Errors;
2425
import org.apache.kafka.common.record.internal.CompressionType;
@@ -52,6 +53,8 @@ public class ClientTelemetryUtils {
5253

5354
public static final Predicate<? super MetricKeyable> SELECTOR_ALL_METRICS = k -> true;
5455

56+
private static final int DECOMPRESS_READ_BUFFER_BYTES = 8 * 1024;
57+
5558
/**
5659
* Examine the response data and handle different error code accordingly:
5760
*
@@ -212,13 +215,18 @@ public static ByteBuffer compress(MetricsData metrics, CompressionType compressi
212215
}
213216
}
214217

215-
public static ByteBuffer decompress(ByteBuffer metrics, CompressionType compressionType) {
218+
public static ByteBuffer decompress(ByteBuffer metrics, CompressionType compressionType, int maxDecompressedBytes) {
216219
Compression compression = Compression.of(compressionType).build();
217220
try (InputStream in = compression.wrapForInput(metrics, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
218221
ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {
219-
byte[] bytes = new byte[metrics.limit() * 2];
222+
byte[] bytes = new byte[Math.min(metrics.limit() * 2, DECOMPRESS_READ_BUFFER_BYTES)];
220223
int nRead;
224+
int totalRead = 0;
221225
while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
226+
totalRead += nRead;
227+
if (totalRead > maxDecompressedBytes) {
228+
throw new TelemetryTooLargeException("Decompressed telemetry metrics exceed maximum allowed size: " + maxDecompressedBytes);
229+
}
222230
out.write(bytes, 0, nRead);
223231
}
224232
out.buffer().flip();

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1407,8 +1407,9 @@ public void testNoCommittedOffsets(GroupProtocol groupProtocol) {
14071407
assertNull(committed.get(tp1));
14081408
}
14091409

1410+
// NOTE: the rebalance flow in prepareRebalance is specific to the CLASSIC consumer.
14101411
@ParameterizedTest
1411-
@EnumSource(GroupProtocol.class)
1412+
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
14121413
public void testAutoCommitSentBeforePositionUpdate(GroupProtocol groupProtocol) {
14131414
ConsumerMetadata metadata = createMetadata(subscription);
14141415
MockClient client = new MockClient(time, metadata);

clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void testMetricsDataCompression(CompressionType compressionType) throws I
6262
MetricsData metricsData = getMetricsData();
6363
PushTelemetryRequest req = getPushTelemetryRequest(metricsData, compressionType);
6464

65-
ByteBuffer receivedMetricsBuffer = req.metricsData();
65+
ByteBuffer receivedMetricsBuffer = req.metricsData(1024 * 1024);
6666
assertNotNull(receivedMetricsBuffer);
6767
assertTrue(receivedMetricsBuffer.capacity() > 0);
6868

clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.common.telemetry.internals;
1818

1919
import org.apache.kafka.common.Uuid;
20+
import org.apache.kafka.common.errors.TelemetryTooLargeException;
2021
import org.apache.kafka.common.protocol.Errors;
2122
import org.apache.kafka.common.record.internal.CompressionType;
2223
import org.apache.kafka.common.utils.Utils;
@@ -154,7 +155,7 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc
154155
} else {
155156
assertArrayEquals(raw, Utils.toArray(compressed));
156157
}
157-
ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType);
158+
ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType, 1024 * 1024);
158159
assertNotNull(decompressed);
159160
byte[] actualResult = Utils.toArray(decompressed);
160161
assertArrayEquals(raw, actualResult);
@@ -191,4 +192,31 @@ private MetricsData getMetricsData() {
191192

192193
return builder.build();
193194
}
195+
196+
@Test
197+
public void testDecompressExceedingMaxSizeThrows() throws IOException {
198+
// Compress a large payload using the existing compress API (via MetricsData)
199+
// then verify decompression with a small limit throws
200+
MetricsData metricsData = getMetricsData();
201+
ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData, CompressionType.GZIP);
202+
byte[] raw = metricsData.toByteArray();
203+
204+
// Set limit smaller than the actual decompressed size
205+
int smallLimit = raw.length - 1;
206+
TelemetryTooLargeException ex = assertThrows(TelemetryTooLargeException.class,
207+
() -> ClientTelemetryUtils.decompress(compressed.duplicate(), CompressionType.GZIP, smallLimit));
208+
assertTrue(ex.getMessage().contains("Decompressed telemetry metrics exceed maximum allowed size: " + smallLimit));
209+
}
210+
211+
@Test
212+
public void testDecompressWithPayloadSizeSucceeds() throws IOException {
213+
MetricsData metricsData = getMetricsData();
214+
byte[] raw = metricsData.toByteArray();
215+
ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData, CompressionType.GZIP);
216+
217+
// Set limit to exact limit prior compression.
218+
ByteBuffer result = ClientTelemetryUtils.decompress(compressed, CompressionType.GZIP, raw.length);
219+
assertNotNull(result);
220+
assertArrayEquals(raw, Utils.toArray(result));
221+
}
194222
}

connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public interface TransactionContext {
4343
/**
4444
* Requests a transaction abort after the next batch of records from {@link SourceTask#poll()}. All of
4545
* the records in that transaction will be discarded and will not appear in a committed transaction.
46-
* However, offsets for that transaction will still be committed so than the records in that transaction
46+
* However, offsets for that transaction will still be committed so that the records in that transaction
4747
* are not reprocessed. If the data should instead be reprocessed, the task should not invoke this method
4848
* and should instead throw an exception.
4949
*/

connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,8 @@ public void shutdownClusters() throws Exception {
287287
} finally {
288288
Exit.resetExitProcedure();
289289
Exit.resetHaltProcedure();
290+
// Shared by all CollectAllMetricsReporter instances; avoid cross-test accumulation.
291+
CollectAllMetricsReporter.METRICS.clear();
290292
}
291293
}
292294
}
@@ -1628,5 +1630,10 @@ public void init(List<KafkaMetric> metrics) {
16281630
public void metricChange(KafkaMetric metric) {
16291631
METRICS.put(metric.metricName(), metric);
16301632
}
1633+
1634+
@Override
1635+
public void metricRemoval(KafkaMetric metric) {
1636+
METRICS.remove(metric.metricName());
1637+
}
16311638
}
16321639
}

core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import kafka.coordinator.transaction.TransactionCoordinator;
2121
import kafka.network.RequestChannel;
22-
import kafka.server.AutoTopicCreationManager;
2322
import kafka.server.ForwardingManager;
2423
import kafka.server.KafkaApis;
2524
import kafka.server.KafkaConfig;
@@ -37,6 +36,7 @@
3736
import org.apache.kafka.metadata.MetadataCache;
3837
import org.apache.kafka.security.DelegationTokenManager;
3938
import org.apache.kafka.server.ApiVersionManager;
39+
import org.apache.kafka.server.AutoTopicCreationManager;
4040
import org.apache.kafka.server.ClientMetricsManager;
4141
import org.apache.kafka.server.FetchManager;
4242
import org.apache.kafka.server.authorizer.Authorizer;

0 commit comments

Comments
 (0)