Skip to content

Commit a908690

Browse files
polyzoswuchong
andauthored
[common] Support COMPACTED format for log tables (#1605)
Co-authored-by: Jark Wu <[email protected]>
1 parent e0c4457 commit a908690

File tree

15 files changed

+845
-155
lines changed

15 files changed

+845
-155
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import org.apache.fluss.metadata.TablePath;
2727
import org.apache.fluss.row.InternalRow;
2828
import org.apache.fluss.row.InternalRow.FieldGetter;
29+
import org.apache.fluss.row.compacted.CompactedRow;
30+
import org.apache.fluss.row.encode.CompactedRowEncoder;
2931
import org.apache.fluss.row.encode.IndexedRowEncoder;
3032
import org.apache.fluss.row.encode.KeyEncoder;
3133
import org.apache.fluss.row.indexed.IndexedRow;
34+
import org.apache.fluss.types.DataType;
3235
import org.apache.fluss.types.RowType;
3336

3437
import javax.annotation.Nullable;
@@ -44,6 +47,7 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
4447

4548
private final LogFormat logFormat;
4649
private final IndexedRowEncoder indexedRowEncoder;
50+
private final CompactedRowEncoder compactedRowEncoder;
4751
private final FieldGetter[] fieldGetters;
4852
private final TableInfo tableInfo;
4953

@@ -58,8 +62,12 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
5862
this.bucketKeyEncoder = KeyEncoder.of(rowType, bucketKeys, lakeFormat);
5963
}
6064

65+
DataType[] fieldDataTypes =
66+
tableInfo.getSchema().getRowType().getChildren().toArray(new DataType[0]);
67+
6168
this.logFormat = tableInfo.getTableConfig().getLogFormat();
6269
this.indexedRowEncoder = new IndexedRowEncoder(tableInfo.getRowType());
70+
this.compactedRowEncoder = new CompactedRowEncoder(fieldDataTypes);
6371
this.fieldGetters = InternalRow.createFieldGetters(tableInfo.getRowType());
6472
this.tableInfo = tableInfo;
6573
}
@@ -80,13 +88,30 @@ public CompletableFuture<AppendResult> append(InternalRow row) {
8088
if (logFormat == LogFormat.INDEXED) {
8189
IndexedRow indexedRow = encodeIndexedRow(row);
8290
record = WriteRecord.forIndexedAppend(tableInfo, physicalPath, indexedRow, bucketKey);
91+
} else if (logFormat == LogFormat.COMPACTED) {
92+
CompactedRow compactedRow = encodeCompactedRow(row);
93+
record =
94+
WriteRecord.forCompactedAppend(
95+
tableInfo, physicalPath, compactedRow, bucketKey);
8396
} else {
8497
// ARROW format supports general internal row
8598
record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, bucketKey);
8699
}
87100
return send(record).thenApply(ignored -> APPEND_SUCCESS);
88101
}
89102

103+
private CompactedRow encodeCompactedRow(InternalRow row) {
104+
if (row instanceof CompactedRow) {
105+
return (CompactedRow) row;
106+
}
107+
108+
compactedRowEncoder.startNewRow();
109+
for (int i = 0; i < fieldCount; i++) {
110+
compactedRowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(row));
111+
}
112+
return compactedRowEncoder.finishRow();
113+
}
114+
90115
private IndexedRow encodeIndexedRow(InternalRow row) {
91116
if (row instanceof IndexedRow) {
92117
return (IndexedRow) row;

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.client.table.writer;
1919

20+
import org.apache.fluss.client.write.WriteFormat;
2021
import org.apache.fluss.client.write.WriteRecord;
2122
import org.apache.fluss.client.write.WriterClient;
2223
import org.apache.fluss.metadata.DataLakeFormat;
@@ -50,6 +51,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
5051
private final KeyEncoder bucketKeyEncoder;
5152

5253
private final KvFormat kvFormat;
54+
private final WriteFormat writeFormat;
5355
private final RowEncoder rowEncoder;
5456
private final FieldGetter[] fieldGetters;
5557
private final TableInfo tableInfo;
@@ -78,6 +80,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
7880
: KeyEncoder.of(rowType, tableInfo.getBucketKeys(), lakeFormat);
7981

8082
this.kvFormat = tableInfo.getTableConfig().getKvFormat();
83+
this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
8184
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
8285
this.fieldGetters = InternalRow.createFieldGetters(rowType);
8386
this.tableInfo = tableInfo;
@@ -164,6 +167,7 @@ public CompletableFuture<UpsertResult> upsert(InternalRow row) {
164167
encodeRow(row),
165168
key,
166169
bucketKey,
170+
writeFormat,
167171
targetColumns);
168172
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
169173
}
@@ -182,7 +186,12 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
182186
bucketKeyEncoder == primaryKeyEncoder ? key : bucketKeyEncoder.encodeKey(row);
183187
WriteRecord record =
184188
WriteRecord.forDelete(
185-
tableInfo, getPhysicalPath(row), key, bucketKey, targetColumns);
189+
tableInfo,
190+
getPhysicalPath(row),
191+
key,
192+
bucketKey,
193+
writeFormat,
194+
targetColumns);
186195
return send(record).thenApply(ignored -> DELETE_SUCCESS);
187196
}
188197

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.write;
19+
20+
import org.apache.fluss.exception.FlussRuntimeException;
21+
import org.apache.fluss.memory.AbstractPagedOutputView;
22+
import org.apache.fluss.memory.MemorySegment;
23+
import org.apache.fluss.metadata.PhysicalTablePath;
24+
import org.apache.fluss.record.ChangeType;
25+
import org.apache.fluss.record.MemoryLogRecordsRowBuilder;
26+
import org.apache.fluss.record.bytesview.BytesView;
27+
import org.apache.fluss.row.InternalRow;
28+
29+
import java.io.IOException;
30+
import java.util.List;
31+
32+
import static org.apache.fluss.utils.Preconditions.checkArgument;
33+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
34+
35+
/**
36+
* Abstract base class to deduplicate logic for row-based log write batches backed by in-memory
37+
* builders. Concrete subclasses only need to provide a row-type validator/caster and a
38+
* RecordsBuilderAdapter implementation.
39+
*/
40+
abstract class AbstractRowLogWriteBatch<R> extends WriteBatch {
41+
42+
private final AbstractPagedOutputView outputView;
43+
private final MemoryLogRecordsRowBuilder<R> recordsBuilder;
44+
private final String buildErrorMessage;
45+
46+
protected AbstractRowLogWriteBatch(
47+
int bucketId,
48+
PhysicalTablePath physicalTablePath,
49+
long createdMs,
50+
AbstractPagedOutputView outputView,
51+
MemoryLogRecordsRowBuilder<R> recordsBuilder,
52+
String buildErrorMessage) {
53+
super(bucketId, physicalTablePath, createdMs);
54+
this.outputView = outputView;
55+
this.recordsBuilder = recordsBuilder;
56+
this.buildErrorMessage = buildErrorMessage;
57+
}
58+
59+
@Override
60+
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
61+
checkNotNull(callback, "write callback must be not null");
62+
InternalRow rowObj = writeRecord.getRow();
63+
checkNotNull(rowObj, "row must not be null for log record");
64+
checkArgument(writeRecord.getKey() == null, "key must be null for log record");
65+
checkArgument(
66+
writeRecord.getTargetColumns() == null,
67+
"target columns must be null for log record");
68+
69+
R row = requireAndCastRow(rowObj);
70+
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
71+
return false;
72+
}
73+
recordsBuilder.append(ChangeType.APPEND_ONLY, row);
74+
recordCount++;
75+
callbacks.add(callback);
76+
return true;
77+
}
78+
79+
protected abstract R requireAndCastRow(InternalRow row);
80+
81+
@Override
82+
public boolean isLogBatch() {
83+
return true;
84+
}
85+
86+
@Override
87+
public BytesView build() {
88+
try {
89+
return recordsBuilder.build();
90+
} catch (IOException e) {
91+
throw new FlussRuntimeException(buildErrorMessage, e);
92+
}
93+
}
94+
95+
@Override
96+
public boolean isClosed() {
97+
return recordsBuilder.isClosed();
98+
}
99+
100+
@Override
101+
public void close() throws Exception {
102+
recordsBuilder.close();
103+
reopened = false;
104+
}
105+
106+
@Override
107+
public List<MemorySegment> pooledMemorySegments() {
108+
return outputView.allocatedPooledSegments();
109+
}
110+
111+
@Override
112+
public void setWriterState(long writerId, int batchSequence) {
113+
recordsBuilder.setWriterState(writerId, batchSequence);
114+
}
115+
116+
@Override
117+
public long writerId() {
118+
return recordsBuilder.writerId();
119+
}
120+
121+
@Override
122+
public int batchSequence() {
123+
return recordsBuilder.batchSequence();
124+
}
125+
126+
@Override
127+
public void abortRecordAppends() {
128+
recordsBuilder.abort();
129+
}
130+
131+
@Override
132+
public void resetWriterState(long writerId, int batchSequence) {
133+
super.resetWriterState(writerId, batchSequence);
134+
recordsBuilder.resetWriterState(writerId, batchSequence);
135+
}
136+
137+
@Override
138+
public int estimatedSizeInBytes() {
139+
return recordsBuilder.getSizeInBytes();
140+
}
141+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.write;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.memory.AbstractPagedOutputView;
22+
import org.apache.fluss.metadata.PhysicalTablePath;
23+
import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
24+
import org.apache.fluss.row.InternalRow;
25+
import org.apache.fluss.row.compacted.CompactedRow;
26+
import org.apache.fluss.rpc.messages.ProduceLogRequest;
27+
28+
import javax.annotation.concurrent.NotThreadSafe;
29+
30+
import static org.apache.fluss.utils.Preconditions.checkArgument;
31+
32+
/**
33+
* A batch of log records managed in COMPACTED format that is or will be sent to server by {@link
34+
* ProduceLogRequest}.
35+
*
36+
* <p>This class is not thread safe and external synchronization must be used when modifying it.
37+
*/
38+
@NotThreadSafe
39+
@Internal
40+
public final class CompactedLogWriteBatch extends AbstractRowLogWriteBatch<CompactedRow> {
41+
42+
public CompactedLogWriteBatch(
43+
int bucketId,
44+
PhysicalTablePath physicalTablePath,
45+
int schemaId,
46+
int writeLimit,
47+
AbstractPagedOutputView outputView,
48+
long createdMs) {
49+
super(
50+
bucketId,
51+
physicalTablePath,
52+
createdMs,
53+
outputView,
54+
MemoryLogRecordsCompactedBuilder.builder(schemaId, writeLimit, outputView, true),
55+
"Failed to build compacted log record batch.");
56+
}
57+
58+
@Override
59+
protected CompactedRow requireAndCastRow(InternalRow row) {
60+
checkArgument(
61+
row instanceof CompactedRow, "row must be CompactedRow for compacted log table");
62+
return (CompactedRow) row;
63+
}
64+
}

0 commit comments

Comments
 (0)