Skip to content

MINOR: Add Append KRaft Version Record to BatchAccumulator #18956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
Expand Down Expand Up @@ -104,23 +105,22 @@ public BatchAccumulator(
/**
* Append to the accumulator.
*
* @param epoch the leader epoch to append at
* @param records the records to append
* @param epoch the leader epoch to append at
* @param records the records to append
* @param delayDrain whether the records could be drained
* @return the offset of the last record
*
* @throws NotLeaderException indicates that an append operation cannot be completed because the
* provided leader epoch was too old
* @throws NotLeaderException indicates that an append operation cannot be completed because the
* provided leader epoch was too old
* @throws IllegalArgumentException indicates that an append operation cannot be completed
* because the provided leader epoch was too new
* @throws IllegalStateException if the number of accumulated batches reaches the maximum
* number of batches
* because the provided leader epoch was too new
* @throws IllegalStateException if the number of accumulated batches reaches the maximum
* number of batches
*/
public long append(int epoch, List<T> records, boolean delayDrain) {
int numberOfCompletedBatches = completed.size();
if (epoch < this.epoch) {
throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. " +
"Current leader epoch = " + this.epoch());
"Current leader epoch = " + this.epoch());
} else if (epoch > this.epoch) {
throw new IllegalArgumentException("Attempt to append from epoch " + epoch +
" which is larger than the current epoch " + this.epoch);
Expand Down Expand Up @@ -221,12 +221,12 @@ public void allowDrain() {

/**
* Append a control batch from a supplied memory record.
*
* <p>
* See the {@code valueCreator} parameter description for requirements on this function.
*
* @param valueCreator a function that uses the passed buffer to create the control
* batch that will be appended. The memory records returned must contain one
* control batch and that control batch have at least one record.
* batch that will be appended. The memory records returned must contain one
* control batch and that control batch have at least one record.
* @return the last of offset of the records created
*/
public long appendControlMessages(MemoryRecordsCreator valueCreator) {
Expand Down Expand Up @@ -311,7 +311,7 @@ private int validateMemoryRecordsAndReturnCount(MemoryRecords memoryRecords) {
/**
* Append a {@link VotersRecord} record to the batch
*
* @param voters the record to append
* @param voters the record to append
* @param currentTimestamp the current time in milliseconds
* @return the last of offset of the records created
* @throws IllegalStateException on failure to allocate a buffer for the record
Expand All @@ -336,7 +336,7 @@ public long appendVotersRecord(
* Append a {@link LeaderChangeMessage} record to the batch
*
* @param leaderChangeMessage The message to append
* @param currentTimestamp The current time in milliseconds
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/
public void appendLeaderChangeMessage(
Expand All @@ -354,12 +354,33 @@ public void appendLeaderChangeMessage(
);
}

/**
* Append a {@link KRaftVersionRecord} record to the batch
*
* @param kraftVersionRecord The message to append
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/
public void appendKRaftVersionRecord(
KRaftVersionRecord kraftVersionRecord,
long currentTimestamp
) {
appendControlMessages((baseOffset, epoch, compression, buffer) ->
MemoryRecords.withKRaftVersionRecord(
baseOffset,
currentTimestamp,
epoch,
buffer,
kraftVersionRecord
)
);
}

/**
* Append a {@link SnapshotHeaderRecord} record to the batch
*
* @param snapshotHeaderRecord The record to append
* @param currentTimestamp The current time in milliseconds
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/
public void appendSnapshotHeaderRecord(
Expand All @@ -381,7 +402,7 @@ public void appendSnapshotHeaderRecord(
* Append a {@link SnapshotFooterRecord} record to the batch
*
* @param snapshotFooterRecord The record to append
* @param currentTimestamp The current time in milliseconds
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/
public void appendSnapshotFooterRecord(
Expand Down Expand Up @@ -476,16 +497,16 @@ public int epoch() {
/**
* Drain completed batches. The caller is expected to first check whether
* {@link #needsDrain(long)} returns true in order to avoid unnecessary draining.
*
* <p>
* Note on thread-safety: this method is safe in the presence of concurrent
* appends, but it assumes a single thread is responsible for draining.
*
* <p>
* This call will not block, but the drain may require multiple attempts before
* it can be completed if the thread responsible for appending is holding the
* append lock. In the worst case, the append will be completed on the next
* call to {@link #append(int, List, boolean)} following the
* initial call to this method.
*
* <p>
* The caller should respect the time to the next flush as indicated by
* {@link #timeUntilDrain(long)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,35 @@ public void testLeaderChangeMessageWritten() {
Mockito.verify(memoryPool).release(buffer);
}

@Test
public void testKRaftVersionRecordWritten() {
int leaderEpoch = 17;
long baseOffset = 0;
int lingerMs = 50;
int maxBatchSize = 512;

ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);

BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
);

acc.appendKRaftVersionRecord(new KRaftVersionRecord(), time.milliseconds());
assertTrue(acc.needsDrain(time.milliseconds()));

List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
assertEquals(1, batches.size());

BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
batch.release();
Mockito.verify(memoryPool).release(buffer);
}

@Test
public void testForceDrain() {
int leaderEpoch = 17;
Expand Down Expand Up @@ -475,24 +504,24 @@ public void testMultipleControlRecords() {

ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);
.thenReturn(buffer);

try (BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
) {
acc.appendControlMessages((offset, epoch, compression, buf) -> {
long now = 1234;
try (MemoryRecordsBuilder builder = controlRecordsBuilder(
offset,
epoch,
compression,
now,
buf
)
offset,
epoch,
compression,
now,
buf
)
) {
builder.appendSnapshotHeaderMessage(
now,
Expand Down Expand Up @@ -533,17 +562,17 @@ public void testInvalidControlRecordOffset() {

ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);
.thenReturn(buffer);

BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, compression, buf) -> {
long now = 1234;
try (MemoryRecordsBuilder builder = controlRecordsBuilder(
offset + 1,
epoch,
compression,
now,
buf
)
offset + 1,
epoch,
compression,
now,
buf
)
) {
builder.appendSnapshotHeaderMessage(
now,
Expand All @@ -557,11 +586,11 @@ public void testInvalidControlRecordOffset() {
};

try (BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
) {
assertThrows(IllegalArgumentException.class, () -> acc.appendControlMessages(creator));
}
Expand All @@ -576,17 +605,17 @@ public void testInvalidControlRecordEpoch() {

ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);
.thenReturn(buffer);

BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, compression, buf) -> {
long now = 1234;
try (MemoryRecordsBuilder builder = controlRecordsBuilder(
offset,
epoch + 1,
compression,
now,
buf
)
offset,
epoch + 1,
compression,
now,
buf
)
) {
builder.appendSnapshotHeaderMessage(
now,
Expand All @@ -600,11 +629,11 @@ public void testInvalidControlRecordEpoch() {
};

try (BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
) {
assertThrows(IllegalArgumentException.class, () -> acc.appendControlMessages(creator));
}
Expand All @@ -619,29 +648,29 @@ public void testEmptyControlBatch() {

ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);
.thenReturn(buffer);

BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, compression, buf) -> {
long now = 1234;
try (MemoryRecordsBuilder builder = controlRecordsBuilder(
offset,
epoch,
compression,
now,
buf
)
offset,
epoch,
compression,
now,
buf
)
) {
// Create a control batch without any records
return builder.build();
}
};

try (BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
)
) {
assertThrows(IllegalArgumentException.class, () -> acc.appendControlMessages(creator));
}
Expand Down