Skip to content

Commit 03c8602

Browse files
luoyuxiapolyzos
andauthored
[kv] Supports compacted row as change log (apache#2108)
--------- Co-authored-by: ipolyzos <[email protected]>
1 parent e91106c commit 03c8602

File tree

21 files changed

+1007
-223
lines changed

21 files changed

+1007
-223
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ void testCreateTableWithInvalidProperty() {
659659
.cause()
660660
.isInstanceOf(InvalidConfigException.class)
661661
.hasMessageContaining(
662-
"Currently, Primary Key Table only supports ARROW log format if kv format is COMPACTED.");
662+
"Currently, Primary Key Table supports ARROW or COMPACTED log format when kv format is COMPACTED.");
663663
}
664664

665665
@Test

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,11 @@ void testPutAndPoll(String kvFormat) throws Exception {
753753
verifyAppendOrPut(false, "ARROW", kvFormat);
754754
}
755755

756+
@Test
757+
void testPutAndPollCompacted() throws Exception {
758+
verifyAppendOrPut(false, "COMPACTED", "COMPACTED");
759+
}
760+
756761
void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat)
757762
throws Exception {
758763
Schema schema =
@@ -911,8 +916,9 @@ void testAppendAndProject(String format) throws Exception {
911916
}
912917
}
913918

914-
@Test
915-
void testPutAndProject() throws Exception {
919+
@ParameterizedTest
920+
@ValueSource(strings = {"ARROW", "COMPACTED"})
921+
void testPutAndProject(String changelogFormat) throws Exception {
916922
Schema schema =
917923
Schema.newBuilder()
918924
.column("a", DataTypes.INT())
@@ -921,7 +927,11 @@ void testPutAndProject() throws Exception {
921927
.column("d", DataTypes.BIGINT())
922928
.primaryKey("a")
923929
.build();
924-
TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build();
930+
TableDescriptor tableDescriptor =
931+
TableDescriptor.builder()
932+
.schema(schema)
933+
.property(ConfigOptions.TABLE_LOG_FORMAT.key(), changelogFormat)
934+
.build();
925935
TablePath tablePath = TablePath.of("test_db_1", "test_pk_table_1");
926936
createTable(tablePath, tableDescriptor, false);
927937

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1230,7 +1230,7 @@ public class ConfigOptions {
12301230
.defaultValue(LogFormat.ARROW)
12311231
.withDescription(
12321232
"The format of the log records in log store. The default value is `arrow`. "
1233-
+ "The supported formats are `arrow` and `indexed`.");
1233+
+ "The supported formats are `arrow`, `indexed` and `compacted`.");
12341234

12351235
public static final ConfigOption<ArrowCompressionType> TABLE_LOG_ARROW_COMPRESSION_TYPE =
12361236
key("table.log.arrow.compression.type")

fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
package org.apache.fluss.metadata;
1919

2020
import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
21+
import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
2122
import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
23+
import org.apache.fluss.row.compacted.CompactedRow;
2224
import org.apache.fluss.row.indexed.IndexedRow;
2325

24-
/** The format of the log records in log store. The supported formats are 'arrow' and 'indexed'. */
26+
/**
27+
* The format of the log records in log store. The supported formats are 'arrow', 'indexed' and
28+
* 'compacted'.
29+
*/
2530
public enum LogFormat {
2631

2732
/**
@@ -41,18 +46,29 @@ public enum LogFormat {
4146
*
4247
* @see MemoryLogRecordsIndexedBuilder
4348
*/
44-
INDEXED;
49+
INDEXED,
50+
51+
/**
52+
* The log record batches are stored in {@link CompactedRow} format which is a compact
53+
* row-oriented format optimized for primary key tables to reduce storage while trading CPU for
54+
* reads.
55+
*
56+
* @see MemoryLogRecordsCompactedBuilder
57+
*/
58+
COMPACTED;
4559

4660
/**
47-
* Creates a {@link LogFormat} from the given string. The string must be either 'arrow' or
48-
* 'indexed'.
61+
* Creates a {@link LogFormat} from the given string. The string must be either 'arrow',
62+
* 'indexed' or 'compacted'.
4963
*/
5064
public static LogFormat fromString(String format) {
5165
switch (format.toUpperCase()) {
5266
case "ARROW":
5367
return ARROW;
5468
case "INDEXED":
5569
return INDEXED;
70+
case "COMPACTED":
71+
return COMPACTED;
5672
default:
5773
throw new IllegalArgumentException("Unsupported log format: " + format);
5874
}
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.record;
19+
20+
import org.apache.fluss.memory.AbstractPagedOutputView;
21+
import org.apache.fluss.memory.MemorySegment;
22+
import org.apache.fluss.memory.MemorySegmentOutputView;
23+
import org.apache.fluss.record.bytesview.BytesView;
24+
import org.apache.fluss.record.bytesview.MultiBytesView;
25+
import org.apache.fluss.utils.crc.Crc32C;
26+
27+
import java.io.IOException;
28+
29+
import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
30+
import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
31+
import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
32+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
33+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
34+
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
35+
import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
36+
import static org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
37+
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
38+
import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
39+
import static org.apache.fluss.utils.Preconditions.checkArgument;
40+
41+
/** Abstract base builder for row-based MemoryLogRecords builders sharing common logic. */
42+
abstract class AbstractRowMemoryLogRecordsBuilder<T> implements AutoCloseable {
43+
protected static final int BUILDER_DEFAULT_OFFSET = 0;
44+
45+
protected final long baseLogOffset;
46+
protected final int schemaId;
47+
// The max bytes can be appended.
48+
protected final int writeLimit;
49+
protected final byte magic;
50+
protected final AbstractPagedOutputView pagedOutputView;
51+
protected final MemorySegment firstSegment;
52+
protected final boolean appendOnly;
53+
54+
private BytesView builtBuffer = null;
55+
private long writerId;
56+
private int batchSequence;
57+
private int currentRecordNumber;
58+
private int sizeInBytes;
59+
private volatile boolean isClosed;
60+
private boolean aborted = false;
61+
62+
protected AbstractRowMemoryLogRecordsBuilder(
63+
long baseLogOffset,
64+
int schemaId,
65+
int writeLimit,
66+
byte magic,
67+
AbstractPagedOutputView pagedOutputView,
68+
boolean appendOnly) {
69+
this.appendOnly = appendOnly;
70+
checkArgument(
71+
schemaId <= Short.MAX_VALUE,
72+
"schemaId shouldn't be greater than the max value of short: " + Short.MAX_VALUE);
73+
this.baseLogOffset = baseLogOffset;
74+
this.schemaId = schemaId;
75+
this.writeLimit = writeLimit;
76+
this.magic = magic;
77+
this.pagedOutputView = pagedOutputView;
78+
this.firstSegment = pagedOutputView.getCurrentSegment();
79+
this.writerId = NO_WRITER_ID;
80+
this.batchSequence = NO_BATCH_SEQUENCE;
81+
this.currentRecordNumber = 0;
82+
this.isClosed = false;
83+
84+
// Skip header initially; will be written in build()
85+
int headerSize = recordBatchHeaderSize(magic);
86+
this.pagedOutputView.setPosition(headerSize);
87+
this.sizeInBytes = headerSize;
88+
}
89+
90+
/** Implement to return size of the record (including length field). */
91+
protected abstract int sizeOf(T row);
92+
93+
/** Implement to write the record and return total written bytes including length field. */
94+
protected abstract int writeRecord(ChangeType changeType, T row) throws IOException;
95+
96+
public boolean hasRoomFor(T row) {
97+
return sizeInBytes + sizeOf(row) <= writeLimit;
98+
}
99+
100+
public void append(ChangeType changeType, T row) throws Exception {
101+
appendRecord(changeType, row);
102+
}
103+
104+
private void appendRecord(ChangeType changeType, T row) throws IOException {
105+
if (aborted) {
106+
throw new IllegalStateException(
107+
"Tried to append a record, but "
108+
+ getClass().getSimpleName()
109+
+ " has already been aborted");
110+
}
111+
if (isClosed) {
112+
throw new IllegalStateException(
113+
"Tried to append a record, but MemoryLogRecordsBuilder is closed for record appends");
114+
}
115+
if (appendOnly && changeType != ChangeType.APPEND_ONLY) {
116+
throw new IllegalArgumentException(
117+
"Only append-only change type is allowed for append-only row log builder, but got "
118+
+ changeType);
119+
}
120+
121+
int recordByteSizes = writeRecord(changeType, row);
122+
currentRecordNumber++;
123+
sizeInBytes += recordByteSizes;
124+
}
125+
126+
public BytesView build() throws IOException {
127+
if (aborted) {
128+
throw new IllegalStateException("Attempting to build an aborted record batch");
129+
}
130+
if (builtBuffer != null) {
131+
return builtBuffer;
132+
}
133+
writeBatchHeader();
134+
builtBuffer =
135+
MultiBytesView.builder()
136+
.addMemorySegmentByteViewList(pagedOutputView.getWrittenSegments())
137+
.build();
138+
return builtBuffer;
139+
}
140+
141+
public void setWriterState(long writerId, int batchBaseSequence) {
142+
this.writerId = writerId;
143+
this.batchSequence = batchBaseSequence;
144+
}
145+
146+
public void resetWriterState(long writerId, int batchSequence) {
147+
// trigger to rewrite batch header
148+
this.builtBuffer = null;
149+
this.writerId = writerId;
150+
this.batchSequence = batchSequence;
151+
}
152+
153+
public long writerId() {
154+
return writerId;
155+
}
156+
157+
public int batchSequence() {
158+
return batchSequence;
159+
}
160+
161+
public boolean isClosed() {
162+
return isClosed;
163+
}
164+
165+
public void abort() {
166+
aborted = true;
167+
}
168+
169+
@Override
170+
public void close() throws IOException {
171+
if (aborted) {
172+
throw new IllegalStateException(
173+
"Cannot close "
174+
+ getClass().getSimpleName()
175+
+ " as it has already been aborted");
176+
}
177+
isClosed = true;
178+
}
179+
180+
public int getSizeInBytes() {
181+
return sizeInBytes;
182+
}
183+
184+
// ----------------------- internal methods -------------------------------
185+
private void writeBatchHeader() throws IOException {
186+
// pagedOutputView doesn't support seek to previous segment,
187+
// so we create a new output view on the first segment
188+
MemorySegmentOutputView outputView = new MemorySegmentOutputView(firstSegment);
189+
outputView.setPosition(0);
190+
// update header.
191+
outputView.writeLong(baseLogOffset);
192+
outputView.writeInt(sizeInBytes - BASE_OFFSET_LENGTH - LENGTH_LENGTH);
193+
outputView.writeByte(magic);
194+
195+
// write empty timestamp which will be overridden on server side
196+
outputView.writeLong(0);
197+
198+
// write empty leaderEpoch which will be overridden on server side
199+
if (magic >= LOG_MAGIC_VALUE_V1) {
200+
outputView.writeInt(NO_LEADER_EPOCH);
201+
}
202+
203+
// write empty crc first.
204+
outputView.writeUnsignedInt(0);
205+
206+
outputView.writeShort((short) schemaId);
207+
// write attributes (currently only appendOnly flag)
208+
outputView.writeBoolean(appendOnly);
209+
// skip write attribute byte for now.
210+
outputView.setPosition(lastOffsetDeltaOffset(magic));
211+
if (currentRecordNumber > 0) {
212+
outputView.writeInt(currentRecordNumber - 1);
213+
} else {
214+
// If there is no record, we write 0 for field lastOffsetDelta, see the comments about
215+
// the field 'lastOffsetDelta' in DefaultLogRecordBatch.
216+
outputView.writeInt(0);
217+
}
218+
outputView.writeLong(writerId);
219+
outputView.writeInt(batchSequence);
220+
outputView.writeInt(currentRecordNumber);
221+
222+
// Update crc.
223+
long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), schemaIdOffset(magic));
224+
outputView.setPosition(crcOffset(magic));
225+
outputView.writeUnsignedInt(crc);
226+
}
227+
}

0 commit comments

Comments
 (0)