Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class BufferPool {
/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */
private long nonPooledAvailableMemory;
private final Metrics metrics;
private final Time time;
protected final Time time;
private final Sensor waitTime;
private boolean closed;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class ChunkedBufferPool extends BufferPool {

/**
* Create a new buffer pool
*
* @param memory The maximum amount of memory that this buffer pool can allocate
* @param poolableSize The buffer size to cache in the free list rather than deallocating
* @param metrics instance of Metrics
* @param time time instance
* @param metricGrpName logical group name for metrics
*/
public ChunkedBufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
super(memory, poolableSize, metrics, time, metricGrpName);
}

/**
* Allocate {@code ceil(totalSize / chunkSize)} chunk-sized buffers. Blocks up to
* {@code maxTimeToBlockMs} total across all chunk acquisitions, shrinking the budget after
* each one. On any failure (timeout, close, interruption), chunks already acquired in this
* call are returned to the pool before the exception propagates.
*
* @param totalSize Minimum total capacity required across the returned chunks
* @param maxTimeToBlockMs Maximum time in milliseconds to block waiting for memory
* @return list of {@code ceil(totalSize / chunkSize)} ByteBuffers, each of capacity {@code chunkSize}
* @throws InterruptedException If interrupted while waiting
* @throws IllegalArgumentException If {@code totalSize <= 0}
*/
public List<ByteBuffer> allocateChunks(int totalSize, long maxTimeToBlockMs) throws InterruptedException {
if (totalSize > totalMemory()) {
throw new IllegalArgumentException("Attempt to allocate " + totalSize
+ " bytes across chunks, but there is a hard limit of "
+ totalMemory() + " on memory allocations.");
}

int chunkSize = poolableSize();
int numChunks = (totalSize + chunkSize - 1) / chunkSize; // ceil division
List<ByteBuffer> chunks = new ArrayList<>(numChunks);
long deadlineMs = time.milliseconds() + maxTimeToBlockMs;
try {
for (int i = 0; i < numChunks; i++) {
long remainingMs = Math.max(0L, deadlineMs - time.milliseconds());
chunks.add(allocate(chunkSize, remainingMs));
}
List<ByteBuffer> result = chunks;
chunks = null;
return result;
} finally {
if (chunks != null) {
for (ByteBuffer chunk : chunks) {
deallocate(chunk);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;

public class CompositeProducerBatch extends ProducerBatch {

public CompositeProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs) {
super(tp, recordsBuilder, createdMs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.internals.LogContext;

public class DynamicRecordAccumulator extends RecordAccumulator {

private final int chunkSize;
private final ChunkedBufferPool free;

public DynamicRecordAccumulator(LogContext logContext,
int batchSize,
int chunkSize,
Compression compression,
int lingerMs,
long retryBackoffMs,
long retryBackoffMaxMs,
int deliveryTimeoutMs,
PartitionerConfig partitionerConfig,
Metrics metrics,
String metricGrpName,
Time time,
TransactionManager transactionManager,
ChunkedBufferPool bufferPool) {
super(logContext, batchSize, compression, lingerMs, retryBackoffMs, retryBackoffMaxMs, deliveryTimeoutMs,
partitionerConfig, metrics, metricGrpName, time, transactionManager, bufferPool);
this.chunkSize = chunkSize;
this.free = bufferPool;
}

@Override
protected ProducerBatch newProducerBatch(TopicPartition topicPartition, long nowMs, MemoryRecordsBuilder recordsBuilder) {
return new CompositeProducerBatch(topicPartition, recordsBuilder, nowMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*
* This class is not thread safe and external synchronization must be used when modifying it
*/
public final class ProducerBatch {
public class ProducerBatch {

private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ private RecordAppendResult appendNewBatch(String topic,
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer);
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
ProducerBatch batch = newProducerBatch(new TopicPartition(topic, partition), nowMs, recordsBuilder);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));

Expand All @@ -406,6 +406,10 @@ private RecordAppendResult appendNewBatch(String topic,
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes());
}

protected ProducerBatch newProducerBatch(TopicPartition topicPartition, long nowMs, MemoryRecordsBuilder recordsBuilder) {
return new ProducerBatch(topicPartition, recordsBuilder, nowMs);
}

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) {
return MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class ChunkedBufferPoolTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(time);
private final long maxBlockTimeMs = 10;
private final String metricGroup = "TestMetrics";

@AfterEach
public void teardown() {
this.metrics.close();
}

@Test
public void testAllocateExactMultipleOfChunkSize() throws Exception {
int chunkSize = 1024;
ChunkedBufferPool pool = new ChunkedBufferPool(64 * 1024, chunkSize, metrics, time, metricGroup);
List<ByteBuffer> chunks = pool.allocateChunks(3 * chunkSize, maxBlockTimeMs);
assertEquals(3, chunks.size());
for (ByteBuffer chunk : chunks) {
assertEquals(chunkSize, chunk.capacity());
}
}

@Test
public void testAllocateCeilsToWholeChunk() throws Exception {
int chunkSize = 1024;
ChunkedBufferPool pool = new ChunkedBufferPool(64 * 1024, chunkSize, metrics, time, metricGroup);
List<ByteBuffer> chunks = pool.allocateChunks(2 * chunkSize + 1, maxBlockTimeMs);
assertEquals(3, chunks.size());
for (ByteBuffer chunk : chunks) {
assertEquals(chunkSize, chunk.capacity());
}
}

@Test
public void testThrowsWhenTotalSizeExceedsTotalMemory() {
int chunkSize = 1024;
long totalMemory = 4L * chunkSize;
ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, metricGroup);
assertThrows(IllegalArgumentException.class,
() -> pool.allocateChunks((int) totalMemory + 1, maxBlockTimeMs));
}

@Test
public void testDeallocateReturnsMemoryToPool() throws Exception {
int chunkSize = 1024;
long totalMemory = 4L * chunkSize;
ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, metricGroup);
List<ByteBuffer> chunks = pool.allocateChunks(3 * chunkSize, maxBlockTimeMs);
assertEquals(totalMemory - 3L * chunkSize, pool.availableMemory());
for (ByteBuffer chunk : chunks) {
pool.deallocate(chunk);
}
assertEquals(totalMemory, pool.availableMemory());
}

@Test
public void testPartialFailureRollsBack() throws Exception {
int chunkSize = 1024;
long totalMemory = 2L * chunkSize;
ChunkedBufferPool pool = new ChunkedBufferPool(totalMemory, chunkSize, metrics, time, metricGroup);

ByteBuffer held = pool.allocate(chunkSize, maxBlockTimeMs);
assertEquals(chunkSize, pool.availableMemory());

assertThrows(BufferExhaustedException.class,
() -> pool.allocateChunks(2 * chunkSize, maxBlockTimeMs));

assertEquals(chunkSize, pool.availableMemory());
assertEquals(0, pool.queued());

pool.deallocate(held);
assertEquals(totalMemory, pool.availableMemory());
}
}