Skip to content

Commit a832488

Browse files
swuferhongwuchong
authored andcommitted
[log] Bump LogRecordBatch's CURRENT_LOG_MAGIC_VALUE to V1 to support leaderEpoch (apache#778)
Co-authored-by: Jark Wu <[email protected]>
1 parent e1bc849 commit a832488

37 files changed

+937
-286
lines changed

fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.TreeSet;
3030
import java.util.function.Consumer;
3131

32-
import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
32+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
3333

3434
/** Entry to store the idempotence information of each table-bucket. */
3535
@Internal

fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.fluss.metadata.PhysicalTablePath;
2626
import org.apache.fluss.metadata.TableBucket;
2727
import org.apache.fluss.metadata.TablePath;
28-
import org.apache.fluss.record.LogRecordBatch;
2928
import org.apache.fluss.rpc.gateway.TabletServerGateway;
3029
import org.apache.fluss.rpc.messages.InitWriterRequest;
3130
import org.apache.fluss.rpc.protocol.Errors;
@@ -40,7 +39,8 @@
4039
import java.util.Set;
4140
import java.util.stream.Collectors;
4241

43-
import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
42+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
43+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
4444

4545
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
4646
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -182,10 +182,10 @@ synchronized void removeInFlightBatch(ReadyWriteBatch batch) {
182182
*/
183183
synchronized int firstInFlightBatchSequence(TableBucket tableBucket) {
184184
if (!hasInflightBatches(tableBucket)) {
185-
return LogRecordBatch.NO_BATCH_SEQUENCE;
185+
return NO_BATCH_SEQUENCE;
186186
}
187187
WriteBatch batch = nextBatchBySequence(tableBucket);
188-
return batch == null ? LogRecordBatch.NO_BATCH_SEQUENCE : batch.batchSequence();
188+
return batch == null ? NO_BATCH_SEQUENCE : batch.batchSequence();
189189
}
190190

191191
synchronized void handleCompletedBatch(ReadyWriteBatch readyWriteBatch) {

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.fluss.metadata.TableBucket;
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metrics.MetricNames;
35-
import org.apache.fluss.record.LogRecordBatch;
3635
import org.apache.fluss.row.arrow.ArrowWriter;
3736
import org.apache.fluss.row.arrow.ArrowWriterPool;
3837
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
@@ -61,7 +60,8 @@
6160
import java.util.concurrent.ConcurrentMap;
6261
import java.util.concurrent.atomic.AtomicInteger;
6362

64-
import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
63+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
64+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
6565
import static org.apache.fluss.utils.Preconditions.checkNotNull;
6666

6767
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
@@ -757,7 +757,7 @@ private boolean shouldStopDrainBatchesForBucket(WriteBatch first, TableBucket ta
757757
// flight request count to 1.
758758
int firstInFlightSequence = idempotenceManager.firstInFlightBatchSequence(tableBucket);
759759
boolean isFirstInFlightBatch =
760-
firstInFlightSequence == LogRecordBatch.NO_BATCH_SEQUENCE
760+
firstInFlightSequence == NO_BATCH_SEQUENCE
761761
|| (first.hasBatchSequence()
762762
&& first.batchSequence() == firstInFlightSequence);
763763

@@ -824,7 +824,7 @@ private void insertInSequenceOrder(
824824
Deque<WriteBatch> deque, WriteBatch batch, TableBucket tableBucket) {
825825
// When we are re-enqueue and have enabled idempotence, the re-enqueued batch must always
826826
// have a batch sequence.
827-
if (batch.batchSequence() == LogRecordBatch.NO_BATCH_SEQUENCE) {
827+
if (batch.batchSequence() == NO_BATCH_SEQUENCE) {
828828
throw new IllegalStateException(
829829
"Trying to re-enqueue a batch which doesn't have a sequence even "
830830
+ "though idempotence is enabled.");

fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.memory.MemorySegment;
2222
import org.apache.fluss.memory.MemorySegmentPool;
2323
import org.apache.fluss.metadata.PhysicalTablePath;
24-
import org.apache.fluss.record.LogRecordBatch;
2524
import org.apache.fluss.record.bytesview.BytesView;
2625

2726
import org.slf4j.Logger;
@@ -35,6 +34,7 @@
3534
import java.util.concurrent.atomic.AtomicInteger;
3635
import java.util.concurrent.atomic.AtomicReference;
3736

37+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
3838
import static org.apache.fluss.utils.Preconditions.checkNotNull;
3939

4040
/** The abstract write batch contains write callback object to wait write request feedback. */
@@ -113,7 +113,7 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac
113113
public abstract void abortRecordAppends();
114114

115115
public boolean hasBatchSequence() {
116-
return batchSequence() != LogRecordBatch.NO_BATCH_SEQUENCE;
116+
return batchSequence() != NO_BATCH_SEQUENCE;
117117
}
118118

119119
public void resetWriterState(long writerId, int batchSequence) {

fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
import org.apache.fluss.metadata.PhysicalTablePath;
2222
import org.apache.fluss.record.DefaultKvRecord;
2323
import org.apache.fluss.record.DefaultKvRecordBatch;
24-
import org.apache.fluss.record.DefaultLogRecordBatch;
2524
import org.apache.fluss.record.IndexedLogRecord;
2625
import org.apache.fluss.row.BinaryRow;
2726
import org.apache.fluss.row.InternalRow;
2827
import org.apache.fluss.row.indexed.IndexedRow;
2928

3029
import javax.annotation.Nullable;
3130

31+
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
32+
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
3233
import static org.apache.fluss.utils.Preconditions.checkNotNull;
3334

3435
/**
@@ -85,7 +86,7 @@ public static WriteRecord forIndexedAppend(
8586
PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[] bucketKey) {
8687
checkNotNull(row);
8788
int estimatedSizeInBytes =
88-
IndexedLogRecord.sizeOf(row) + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
89+
IndexedLogRecord.sizeOf(row) + recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
8990
return new WriteRecord(
9091
tablePath,
9192
null,

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,22 @@
3939
import org.junit.jupiter.api.Test;
4040
import org.junit.jupiter.api.io.TempDir;
4141
import org.junit.jupiter.params.ParameterizedTest;
42+
import org.junit.jupiter.params.provider.Arguments;
43+
import org.junit.jupiter.params.provider.MethodSource;
4244
import org.junit.jupiter.params.provider.ValueSource;
4345

4446
import java.io.File;
4547
import java.nio.ByteBuffer;
48+
import java.util.ArrayList;
4649
import java.util.Arrays;
50+
import java.util.Collection;
4751
import java.util.HashMap;
4852
import java.util.List;
4953
import java.util.Map;
5054

5155
import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
56+
import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
57+
import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
5258
import static org.apache.fluss.record.TestData.DATA2;
5359
import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE;
5460
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
@@ -78,14 +84,15 @@ void beforeEach() {
7884
logScannerStatus.assignScanBuckets(scanBuckets);
7985
}
8086

81-
@Test
82-
void testSimple() throws Exception {
87+
@ParameterizedTest
88+
@ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
89+
void testSimple(byte recordBatchMagic) throws Exception {
8390
long fetchOffset = 0L;
8491
int bucketId = 0; // records for 0-10.
8592
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
8693
FetchLogResultForBucket resultForBucket0 =
8794
new FetchLogResultForBucket(
88-
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L);
95+
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L);
8996
DefaultCompletedFetch defaultCompletedFetch =
9097
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
9198
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(8);
@@ -100,14 +107,15 @@ void testSimple() throws Exception {
100107
assertThat(scanRecords.size()).isEqualTo(0);
101108
}
102109

103-
@Test
104-
void testNegativeFetchCount() throws Exception {
110+
@ParameterizedTest
111+
@ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
112+
void testNegativeFetchCount(byte recordBatchMagic) throws Exception {
105113
long fetchOffset = 0L;
106114
int bucketId = 0; // records for 0-10.
107115
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
108116
FetchLogResultForBucket resultForBucket0 =
109117
new FetchLogResultForBucket(
110-
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L);
118+
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L);
111119
DefaultCompletedFetch defaultCompletedFetch =
112120
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
113121
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(-10);
@@ -128,9 +136,8 @@ void testNoRecordsInFetch() {
128136
}
129137

130138
@ParameterizedTest
131-
@ValueSource(strings = {"INDEXED", "ARROW"})
132-
void testProjection(String format) throws Exception {
133-
LogFormat logFormat = LogFormat.fromString(format);
139+
@MethodSource("typeAndMagic")
140+
void testProjection(LogFormat logFormat, byte magic) throws Exception {
134141
Schema schema =
135142
Schema.newBuilder()
136143
.column("a", DataTypes.INT())
@@ -158,9 +165,9 @@ void testProjection(String format) throws Exception {
158165
Projection projection = Projection.of(new int[] {0, 2});
159166
MemoryLogRecords memoryLogRecords;
160167
if (logFormat == LogFormat.ARROW) {
161-
memoryLogRecords = genRecordsWithProjection(DATA2, projection);
168+
memoryLogRecords = genRecordsWithProjection(DATA2, projection, magic);
162169
} else {
163-
memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED);
170+
memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED, magic);
164171
}
165172
FetchLogResultForBucket resultForBucket0 =
166173
new FetchLogResultForBucket(tb, memoryLogRecords, 10L);
@@ -224,19 +231,28 @@ private DefaultCompletedFetch makeCompletedFetch(
224231
offset);
225232
}
226233

227-
private MemoryLogRecords createMemoryLogRecords(List<Object[]> objects, LogFormat logFormat)
228-
throws Exception {
234+
private static Collection<Arguments> typeAndMagic() {
235+
List<Arguments> params = new ArrayList<>();
236+
params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V1));
237+
params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V1));
238+
params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V0));
239+
params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V0));
240+
return params;
241+
}
242+
243+
private MemoryLogRecords createMemoryLogRecords(
244+
List<Object[]> objects, LogFormat logFormat, byte magic) throws Exception {
229245
return createRecordsWithoutBaseLogOffset(
230-
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, logFormat);
246+
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, logFormat);
231247
}
232248

233-
private MemoryLogRecords genRecordsWithProjection(List<Object[]> objects, Projection projection)
234-
throws Exception {
249+
private MemoryLogRecords genRecordsWithProjection(
250+
List<Object[]> objects, Projection projection, byte magic) throws Exception {
235251
File logFile = FlussPaths.logFile(tempDir, 0L);
236252
FileLogRecords fileLogRecords = FileLogRecords.open(logFile, false, 1024 * 1024, false);
237253
fileLogRecords.append(
238254
createRecordsWithoutBaseLogOffset(
239-
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, LogFormat.ARROW));
255+
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, LogFormat.ARROW));
240256
fileLogRecords.flush();
241257

242258
FileLogProjection fileLogProjection = new FileLogProjection();

fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.memory.PreAllocatedPagedOutputView;
2222
import org.apache.fluss.metadata.TableBucket;
2323
import org.apache.fluss.record.ChangeType;
24-
import org.apache.fluss.record.DefaultLogRecordBatch;
2524
import org.apache.fluss.record.IndexedLogRecord;
2625
import org.apache.fluss.record.LogRecord;
2726
import org.apache.fluss.record.LogRecordBatch;
@@ -40,6 +39,8 @@
4039
import java.util.List;
4140
import java.util.concurrent.CompletableFuture;
4241

42+
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
43+
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
4344
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
4445
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
4546
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
@@ -72,7 +73,7 @@ void testTryAppendWithWriteLimit() throws Exception {
7273

7374
for (int i = 0;
7475
i
75-
< (writeLimit - DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE)
76+
< (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
7677
/ estimatedSizeInBytes;
7778
i++) {
7879
boolean appendResult =

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@
6666
import java.util.concurrent.TimeUnit;
6767
import java.util.stream.Collectors;
6868

69-
import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
69+
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
70+
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
7071
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
7172
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
7273
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -614,7 +615,7 @@ private void verifyTableBucketInBatches(
614615

615616
/** Return the offset delta. */
616617
private int expectedNumAppends(IndexedRow row, int batchSize) {
617-
int size = RECORD_BATCH_HEADER_SIZE;
618+
int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
618619
int offsetDelta = 0;
619620
while (true) {
620621
int recordSize = IndexedLogRecord.sizeOf(row);
@@ -652,7 +653,8 @@ private RecordAccumulator createTestRecordAccumulator(
652653
}
653654

654655
private long getTestBatchSize(BinaryRow row) {
655-
return RECORD_BATCH_HEADER_SIZE + DefaultKvRecord.sizeOf(new byte[4], row);
656+
return recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)
657+
+ DefaultKvRecord.sizeOf(new byte[4], row);
656658
}
657659

658660
private int getBatchNumInAccum(RecordAccumulator accum) {

fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.fluss.config.MemorySize;
2727
import org.apache.fluss.exception.TimeoutException;
2828
import org.apache.fluss.metadata.TableBucket;
29-
import org.apache.fluss.record.LogRecordBatch;
3029
import org.apache.fluss.record.MemoryLogRecords;
3130
import org.apache.fluss.row.GenericRow;
3231
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
@@ -51,6 +50,7 @@
5150
import java.util.Set;
5251
import java.util.concurrent.CompletableFuture;
5352

53+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
5454
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
5555
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
5656
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
@@ -797,7 +797,7 @@ private IdempotenceManager createIdempotenceManager(boolean idempotenceEnabled)
797797

798798
private static boolean hasIdempotentRecords(TableBucket tb, ProduceLogRequest request) {
799799
MemoryLogRecords memoryLogRecords = getProduceLogData(request).get(tb);
800-
return memoryLogRecords.batchIterator().next().writerId() != LogRecordBatch.NO_WRITER_ID;
800+
return memoryLogRecords.batchIterator().next().writerId() != NO_WRITER_ID;
801801
}
802802

803803
private static void assertBatchSequenceEquals(

0 commit comments

Comments
 (0)