Skip to content

Commit 420c612

Browse files
swuferhongwuchong
authored andcommitted
[log] Bump LogRecordBatch's CURRENT_LOG_MAGIC_VALUE to V1 to support leaderEpoch (apache#778)
Co-authored-by: Jark Wu <[email protected]> # Conflicts: # fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java # fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
1 parent 140464c commit 420c612

36 files changed

+915
-271
lines changed

fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.TreeSet;
3030
import java.util.function.Consumer;
3131

32-
import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
32+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
3333

3434
/** Entry to store the idempotence information of each table-bucket. */
3535
@Internal

fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.fluss.metadata.PhysicalTablePath;
2626
import org.apache.fluss.metadata.TableBucket;
2727
import org.apache.fluss.metadata.TablePath;
28-
import org.apache.fluss.record.LogRecordBatch;
2928
import org.apache.fluss.rpc.gateway.TabletServerGateway;
3029
import org.apache.fluss.rpc.messages.InitWriterRequest;
3130
import org.apache.fluss.rpc.protocol.Errors;
@@ -40,7 +39,8 @@
4039
import java.util.Set;
4140
import java.util.stream.Collectors;
4241

43-
import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
42+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
43+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
4444

4545
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
4646
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -182,10 +182,10 @@ synchronized void removeInFlightBatch(ReadyWriteBatch batch) {
182182
*/
183183
synchronized int firstInFlightBatchSequence(TableBucket tableBucket) {
184184
if (!hasInflightBatches(tableBucket)) {
185-
return LogRecordBatch.NO_BATCH_SEQUENCE;
185+
return NO_BATCH_SEQUENCE;
186186
}
187187
WriteBatch batch = nextBatchBySequence(tableBucket);
188-
return batch == null ? LogRecordBatch.NO_BATCH_SEQUENCE : batch.batchSequence();
188+
return batch == null ? NO_BATCH_SEQUENCE : batch.batchSequence();
189189
}
190190

191191
synchronized void handleCompletedBatch(ReadyWriteBatch readyWriteBatch) {

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.fluss.metadata.TableBucket;
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metrics.MetricNames;
35-
import org.apache.fluss.record.LogRecordBatch;
3635
import org.apache.fluss.row.arrow.ArrowWriter;
3736
import org.apache.fluss.row.arrow.ArrowWriterPool;
3837
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
@@ -61,7 +60,8 @@
6160
import java.util.concurrent.ConcurrentMap;
6261
import java.util.concurrent.atomic.AtomicInteger;
6362

64-
import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
63+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
64+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
6565
import static org.apache.fluss.utils.Preconditions.checkNotNull;
6666

6767
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
@@ -766,7 +766,7 @@ private boolean shouldStopDrainBatchesForBucket(WriteBatch first, TableBucket ta
766766
// flight request count to 1.
767767
int firstInFlightSequence = idempotenceManager.firstInFlightBatchSequence(tableBucket);
768768
boolean isFirstInFlightBatch =
769-
firstInFlightSequence == LogRecordBatch.NO_BATCH_SEQUENCE
769+
firstInFlightSequence == NO_BATCH_SEQUENCE
770770
|| (first.hasBatchSequence()
771771
&& first.batchSequence() == firstInFlightSequence);
772772

@@ -833,7 +833,7 @@ private void insertInSequenceOrder(
833833
Deque<WriteBatch> deque, WriteBatch batch, TableBucket tableBucket) {
834834
// When we are re-enqueue and have enabled idempotence, the re-enqueued batch must always
835835
// have a batch sequence.
836-
if (batch.batchSequence() == LogRecordBatch.NO_BATCH_SEQUENCE) {
836+
if (batch.batchSequence() == NO_BATCH_SEQUENCE) {
837837
throw new IllegalStateException(
838838
"Trying to re-enqueue a batch which doesn't have a sequence even "
839839
+ "though idempotence is enabled.");

fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.memory.MemorySegment;
2222
import org.apache.fluss.memory.MemorySegmentPool;
2323
import org.apache.fluss.metadata.PhysicalTablePath;
24-
import org.apache.fluss.record.LogRecordBatch;
2524
import org.apache.fluss.record.bytesview.BytesView;
2625

2726
import org.slf4j.Logger;
@@ -35,6 +34,7 @@
3534
import java.util.concurrent.atomic.AtomicInteger;
3635
import java.util.concurrent.atomic.AtomicReference;
3736

37+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
3838
import static org.apache.fluss.utils.Preconditions.checkNotNull;
3939

4040
/** The abstract write batch contains write callback object to wait write request feedback. */
@@ -113,7 +113,7 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac
113113
public abstract void abortRecordAppends();
114114

115115
public boolean hasBatchSequence() {
116-
return batchSequence() != LogRecordBatch.NO_BATCH_SEQUENCE;
116+
return batchSequence() != NO_BATCH_SEQUENCE;
117117
}
118118

119119
public void resetWriterState(long writerId, int batchSequence) {

fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.fluss.record.CompactedLogRecord;
2323
import org.apache.fluss.record.DefaultKvRecord;
2424
import org.apache.fluss.record.DefaultKvRecordBatch;
25-
import org.apache.fluss.record.DefaultLogRecordBatch;
2625
import org.apache.fluss.record.IndexedLogRecord;
2726
import org.apache.fluss.row.BinaryRow;
2827
import org.apache.fluss.row.InternalRow;
@@ -31,6 +30,8 @@
3130

3231
import javax.annotation.Nullable;
3332

33+
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
34+
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
3435
import static org.apache.fluss.utils.Preconditions.checkNotNull;
3536

3637
/**
@@ -87,7 +88,7 @@ public static WriteRecord forIndexedAppend(
8788
PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[] bucketKey) {
8889
checkNotNull(row);
8990
int estimatedSizeInBytes =
90-
IndexedLogRecord.sizeOf(row) + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
91+
IndexedLogRecord.sizeOf(row) + recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
9192
return new WriteRecord(
9293
tablePath,
9394
null,

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,22 @@
3939
import org.junit.jupiter.api.Test;
4040
import org.junit.jupiter.api.io.TempDir;
4141
import org.junit.jupiter.params.ParameterizedTest;
42+
import org.junit.jupiter.params.provider.Arguments;
43+
import org.junit.jupiter.params.provider.MethodSource;
4244
import org.junit.jupiter.params.provider.ValueSource;
4345

4446
import java.io.File;
4547
import java.nio.ByteBuffer;
48+
import java.util.ArrayList;
4649
import java.util.Arrays;
50+
import java.util.Collection;
4751
import java.util.HashMap;
4852
import java.util.List;
4953
import java.util.Map;
5054

5155
import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
56+
import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
57+
import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
5258
import static org.apache.fluss.record.TestData.DATA2;
5359
import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE;
5460
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
@@ -78,14 +84,15 @@ void beforeEach() {
7884
logScannerStatus.assignScanBuckets(scanBuckets);
7985
}
8086

81-
@Test
82-
void testSimple() throws Exception {
87+
@ParameterizedTest
88+
@ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
89+
void testSimple(byte recordBatchMagic) throws Exception {
8390
long fetchOffset = 0L;
8491
int bucketId = 0; // records for 0-10.
8592
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
8693
FetchLogResultForBucket resultForBucket0 =
8794
new FetchLogResultForBucket(
88-
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L);
95+
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L);
8996
DefaultCompletedFetch defaultCompletedFetch =
9097
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
9198
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(8);
@@ -100,14 +107,15 @@ void testSimple() throws Exception {
100107
assertThat(scanRecords.size()).isEqualTo(0);
101108
}
102109

103-
@Test
104-
void testNegativeFetchCount() throws Exception {
110+
@ParameterizedTest
111+
@ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
112+
void testNegativeFetchCount(byte recordBatchMagic) throws Exception {
105113
long fetchOffset = 0L;
106114
int bucketId = 0; // records for 0-10.
107115
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
108116
FetchLogResultForBucket resultForBucket0 =
109117
new FetchLogResultForBucket(
110-
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L);
118+
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L);
111119
DefaultCompletedFetch defaultCompletedFetch =
112120
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
113121
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(-10);
@@ -128,9 +136,8 @@ void testNoRecordsInFetch() {
128136
}
129137

130138
@ParameterizedTest
131-
@ValueSource(strings = {"INDEXED", "ARROW"})
132-
void testProjection(String format) throws Exception {
133-
LogFormat logFormat = LogFormat.fromString(format);
139+
@MethodSource("typeAndMagic")
140+
void testProjection(LogFormat logFormat, byte magic) throws Exception {
134141
Schema schema =
135142
Schema.newBuilder()
136143
.column("a", DataTypes.INT())
@@ -158,9 +165,9 @@ void testProjection(String format) throws Exception {
158165
Projection projection = Projection.of(new int[] {0, 2});
159166
MemoryLogRecords memoryLogRecords;
160167
if (logFormat == LogFormat.ARROW) {
161-
memoryLogRecords = genRecordsWithProjection(DATA2, projection);
168+
memoryLogRecords = genRecordsWithProjection(DATA2, projection, magic);
162169
} else {
163-
memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED);
170+
memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED, magic);
164171
}
165172
FetchLogResultForBucket resultForBucket0 =
166173
new FetchLogResultForBucket(tb, memoryLogRecords, 10L);
@@ -224,19 +231,28 @@ private DefaultCompletedFetch makeCompletedFetch(
224231
offset);
225232
}
226233

227-
private MemoryLogRecords createMemoryLogRecords(List<Object[]> objects, LogFormat logFormat)
228-
throws Exception {
234+
private static Collection<Arguments> typeAndMagic() {
235+
List<Arguments> params = new ArrayList<>();
236+
params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V1));
237+
params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V1));
238+
params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V0));
239+
params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V0));
240+
return params;
241+
}
242+
243+
private MemoryLogRecords createMemoryLogRecords(
244+
List<Object[]> objects, LogFormat logFormat, byte magic) throws Exception {
229245
return createRecordsWithoutBaseLogOffset(
230-
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, logFormat);
246+
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, logFormat);
231247
}
232248

233-
private MemoryLogRecords genRecordsWithProjection(List<Object[]> objects, Projection projection)
234-
throws Exception {
249+
private MemoryLogRecords genRecordsWithProjection(
250+
List<Object[]> objects, Projection projection, byte magic) throws Exception {
235251
File logFile = FlussPaths.logFile(tempDir, 0L);
236252
FileLogRecords fileLogRecords = FileLogRecords.open(logFile, false, 1024 * 1024, false);
237253
fileLogRecords.append(
238254
createRecordsWithoutBaseLogOffset(
239-
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, LogFormat.ARROW));
255+
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, LogFormat.ARROW));
240256
fileLogRecords.flush();
241257

242258
FileLogProjection fileLogProjection = new FileLogProjection();

fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import java.util.List;
4141
import java.util.concurrent.CompletableFuture;
4242

43+
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
44+
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
4345
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
4446
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
4547
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@
6666
import java.util.concurrent.TimeUnit;
6767
import java.util.stream.Collectors;
6868

69-
import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
69+
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
70+
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
7071
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
7172
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
7273
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -614,7 +615,7 @@ private void verifyTableBucketInBatches(
614615

615616
/** Return the offset delta. */
616617
private int expectedNumAppends(IndexedRow row, int batchSize) {
617-
int size = RECORD_BATCH_HEADER_SIZE;
618+
int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
618619
int offsetDelta = 0;
619620
while (true) {
620621
int recordSize = IndexedLogRecord.sizeOf(row);
@@ -652,7 +653,8 @@ private RecordAccumulator createTestRecordAccumulator(
652653
}
653654

654655
private long getTestBatchSize(BinaryRow row) {
655-
return RECORD_BATCH_HEADER_SIZE + DefaultKvRecord.sizeOf(new byte[4], row);
656+
return recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)
657+
+ DefaultKvRecord.sizeOf(new byte[4], row);
656658
}
657659

658660
private int getBatchNumInAccum(RecordAccumulator accum) {

fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.fluss.config.MemorySize;
2727
import org.apache.fluss.exception.TimeoutException;
2828
import org.apache.fluss.metadata.TableBucket;
29-
import org.apache.fluss.record.LogRecordBatch;
3029
import org.apache.fluss.record.MemoryLogRecords;
3130
import org.apache.fluss.row.GenericRow;
3231
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
@@ -51,6 +50,7 @@
5150
import java.util.Set;
5251
import java.util.concurrent.CompletableFuture;
5352

53+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
5454
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
5555
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
5656
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
@@ -797,7 +797,7 @@ private IdempotenceManager createIdempotenceManager(boolean idempotenceEnabled)
797797

798798
private static boolean hasIdempotentRecords(TableBucket tb, ProduceLogRequest request) {
799799
MemoryLogRecords memoryLogRecords = getProduceLogData(request).get(tb);
800-
return memoryLogRecords.batchIterator().next().writerId() != LogRecordBatch.NO_WRITER_ID;
800+
return memoryLogRecords.batchIterator().next().writerId() != NO_WRITER_ID;
801801
}
802802

803803
private static void assertBatchSequenceEquals(

0 commit comments

Comments
 (0)