From fdb213438aae55ee46ad6a81157b1cd5809219ba Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 27 May 2026 13:37:37 -0700 Subject: [PATCH] KAFKA-20632: Added ChunkedBufferPool that can allocate multiple byte buffers at once. Added unimplemented CompositeProducerBatch and DynamicRecordAccumulator as a scaffolding for future changes. --- .../producer/internals/BufferPool.java | 2 +- .../producer/internals/ChunkedBufferPool.java | 80 +++++++++++++ .../internals/CompositeProducerBatch.java | 27 +++++ .../internals/DynamicRecordAccumulator.java | 55 +++++++++ .../producer/internals/ProducerBatch.java | 2 +- .../producer/internals/RecordAccumulator.java | 6 +- .../internals/ChunkedBufferPoolTest.java | 105 ++++++++++++++++++ 7 files changed, 274 insertions(+), 3 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPool.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/CompositeProducerBatch.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/DynamicRecordAccumulator.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPoolTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 517a2bd9ca7a6..3a4ea1da38fa3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -54,7 +54,7 @@ public class BufferPool { /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */ private long nonPooledAvailableMemory; private final Metrics metrics; - private final Time time; + protected final Time time; private final Sensor waitTime; private boolean closed; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPool.java new file mode 100644 index 0000000000000..f92b57db96011 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPool.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ChunkedBufferPool extends BufferPool { + + /** + * Create a new buffer pool + * + * @param memory The maximum amount of memory that this buffer pool can allocate + * @param poolableSize The buffer size to cache in the free list rather than deallocating + * @param metrics instance of Metrics + * @param time time instance + * @param metricGrpName logical group name for metrics + */ + public ChunkedBufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { + super(memory, poolableSize, metrics, time, metricGrpName); + } + + /** + * Allocate {@code ceil(totalSize / chunkSize)} chunk-sized buffers. Blocks up to + * {@code maxTimeToBlockMs} total across all chunk acquisitions, shrinking the budget after + * each one. On any failure (timeout, close, interruption), chunks already acquired in this + * call are returned to the pool before the exception propagates. + * + * @param totalSize Minimum total capacity required across the returned chunks + * @param maxTimeToBlockMs Maximum time in milliseconds to block waiting for memory + * @return list of {@code ceil(totalSize / chunkSize)} ByteBuffers, each of capacity {@code chunkSize} + * @throws InterruptedException If interrupted while waiting + * @throws IllegalArgumentException If {@code totalSize <= 0} + */ + public List allocateChunks(int totalSize, long maxTimeToBlockMs) throws InterruptedException { + if (totalSize > totalMemory()) { + throw new IllegalArgumentException("Attempt to allocate " + totalSize + + " bytes across chunks, but there is a hard limit of " + + totalMemory() + " on memory allocations."); + } + + int chunkSize = poolableSize(); + int numChunks = (totalSize + chunkSize - 1) / chunkSize; // ceil division + List chunks = new ArrayList<>(numChunks); + long deadlineMs = time.milliseconds() + maxTimeToBlockMs; + try { + for (int i = 0; i < numChunks; i++) { + long remainingMs = Math.max(0L, deadlineMs - time.milliseconds()); + chunks.add(allocate(chunkSize, remainingMs)); + } + List result = chunks; + chunks = null; + return result; + } finally { + if (chunks != null) { + for (ByteBuffer chunk : chunks) { + deallocate(chunk); + } + } + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/CompositeProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/CompositeProducerBatch.java new file mode 100644 index 0000000000000..23939fb2d45e1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/CompositeProducerBatch.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.internal.MemoryRecordsBuilder; + +public class CompositeProducerBatch extends ProducerBatch { + + public CompositeProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs) { + super(tp, recordsBuilder, createdMs); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DynamicRecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DynamicRecordAccumulator.java new file mode 100644 index 0000000000000..c6678ae0d14eb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DynamicRecordAccumulator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.internal.MemoryRecordsBuilder; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.internals.LogContext; + +public class DynamicRecordAccumulator extends RecordAccumulator { + + private final int chunkSize; + private final ChunkedBufferPool free; + + public DynamicRecordAccumulator(LogContext logContext, + int batchSize, + int chunkSize, + Compression compression, + int lingerMs, + long retryBackoffMs, + long retryBackoffMaxMs, + int deliveryTimeoutMs, + PartitionerConfig partitionerConfig, + Metrics metrics, + String metricGrpName, + Time time, + TransactionManager transactionManager, + ChunkedBufferPool bufferPool) { + super(logContext, batchSize, compression, lingerMs, retryBackoffMs, retryBackoffMaxMs, deliveryTimeoutMs, + partitionerConfig, metrics, metricGrpName, time, transactionManager, bufferPool); + this.chunkSize = chunkSize; + this.free = bufferPool; + } + + @Override + protected ProducerBatch newProducerBatch(TopicPartition topicPartition, long nowMs, MemoryRecordsBuilder recordsBuilder) { + return new CompositeProducerBatch(topicPartition, recordsBuilder, nowMs); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 14d0d6ef6334d..88a52eedc7b7f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -57,7 +57,7 @@ * * This class is not thread safe and external synchronization must be used when modifying it */ -public final class ProducerBatch { +public class ProducerBatch { private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index b1eced17e4f25..76d730b8cbc6c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -396,7 +396,7 @@ private RecordAppendResult appendNewBatch(String topic, } MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer); - ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs); + ProducerBatch batch = newProducerBatch(new TopicPartition(topic, partition), nowMs, recordsBuilder); FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callbacks, nowMs)); @@ -406,6 +406,10 @@ private RecordAppendResult appendNewBatch(String topic, return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes()); } + protected ProducerBatch newProducerBatch(TopicPartition topicPartition, long nowMs, MemoryRecordsBuilder recordsBuilder) { + return new ProducerBatch(topicPartition, recordsBuilder, nowMs); + } + private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) { return MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPoolTest.java new file mode 100644 index 0000000000000..816cf32197b38 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ChunkedBufferPoolTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ChunkedBufferPoolTest { + private final MockTime time = new MockTime(); + private final Metrics metrics = new Metrics(time); + private final long maxBlockTimeMs = 10; + private final String metricGroup = "TestMetrics"; + + @AfterEach + public void teardown() { + this.metrics.close(); + } + + @Test + public void testAllocateExactMultipleOfChunkSize() throws Exception { + int chunkSize = 1024; + ChunkedBufferPool pool = new ChunkedBufferPool(64 * 1024, chunkSize, metrics, time, metricGroup); + List chunks = pool.allocateChunks(3 * chunkSize, maxBlockTimeMs); + assertEquals(3, chunks.size()); + for (ByteBuffer chunk : chunks) { + assertEquals(chunkSize, chunk.capacity()); + } + } + + @Test + public void testAllocateCeilsToWholeChunk() throws Exception { + int chunkSize = 1024; + ChunkedBufferPool pool = new ChunkedBufferPool(64 * 1024, chunkSize, metrics, time, metricGroup); + List chunks = pool.allocateChunks(2 * chunkSize + 1, maxBlockTimeMs); + assertEquals(3, chunks.size()); + for (ByteBuffer chunk : chunks) { + assertEquals(chunkSize, chunk.capacity()); + } + } + + @Test + public void testThrowsWhenTotalSizeExceedsTotalMemory() { + int chunkSize = 1024; + long totalMemory = 4L * chunkSize; + ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, metricGroup); + assertThrows(IllegalArgumentException.class, + () -> pool.allocateChunks((int) totalMemory + 1, maxBlockTimeMs)); + } + + @Test + public void testDeallocateReturnsMemoryToPool() throws Exception { + int chunkSize = 1024; + long totalMemory = 4L * chunkSize; + ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, metricGroup); + List chunks = pool.allocateChunks(3 * chunkSize, maxBlockTimeMs); + assertEquals(totalMemory - 3L * chunkSize, pool.availableMemory()); + for (ByteBuffer chunk : chunks) { + pool.deallocate(chunk); + } + assertEquals(totalMemory, pool.availableMemory()); + } + + @Test + public void testPartialFailureRollsBack() throws Exception { + int chunkSize = 1024; + long totalMemory = 2L * chunkSize; + ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, metricGroup); + + ByteBuffer held = pool.allocate(chunkSize, maxBlockTimeMs); + assertEquals(chunkSize, pool.availableMemory()); + + assertThrows(BufferExhaustedException.class, + () -> pool.allocateChunks(2 * chunkSize, maxBlockTimeMs)); + + assertEquals(chunkSize, pool.availableMemory()); + assertEquals(0, pool.queued()); + + pool.deallocate(held); + assertEquals(totalMemory, pool.availableMemory()); + } +}