Skip to content

Commit 20b243e

Browse files
MehulBatraluoyuxia
andauthored
[lake/paimon] Support tiering multi partition paimon table (#1024)
--------- Co-authored-by: luoyuxia <[email protected]>
1 parent 0823598 commit 20b243e

File tree

8 files changed

+387
-81
lines changed

8 files changed

+387
-81
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public void setFlussRecord(LogRecord logRecord) {
5050
@Override
5151
public int getFieldCount() {
5252
return
53-
// three system fields: bucket, offset, timestamp
53+
// business (including partitions) + system (three system fields: bucket, offset,
54+
// timestamp)
5455
originRowFieldCount + 3;
5556
}
5657

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeWriter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.table.sink.CommitMessage;
2929

3030
import java.io.IOException;
31+
import java.util.List;
3132

3233
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
3334

@@ -43,16 +44,20 @@ public PaimonLakeWriter(
4344
this.paimonCatalog = paimonCatalogProvider.get();
4445
FileStoreTable fileStoreTable = getTable(writerInitContext.tablePath());
4546

47+
List<String> partitionKeys = fileStoreTable.partitionKeys();
48+
4649
this.recordWriter =
4750
fileStoreTable.primaryKeys().isEmpty()
4851
? new AppendOnlyWriter(
4952
fileStoreTable,
5053
writerInitContext.tableBucket(),
51-
writerInitContext.partition())
54+
writerInitContext.partition(),
55+
partitionKeys)
5256
: new MergeTreeWriter(
5357
fileStoreTable,
5458
writerInitContext.tableBucket(),
55-
writerInitContext.partition());
59+
writerInitContext.partition(),
60+
partitionKeys);
5661
}
5762

5863
@Override

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/RecordWriter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
import java.util.List;
2929

30-
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimonBinaryRow;
30+
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimonPartitionBinaryRow;
3131
import static com.alibaba.fluss.utils.Preconditions.checkState;
3232

3333
/** A base interface to write {@link LogRecord} to Paimon. */
@@ -39,10 +39,13 @@ public abstract class RecordWriter<T> implements AutoCloseable {
3939
protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow;
4040

4141
public RecordWriter(
42-
TableWriteImpl<T> tableWrite, TableBucket tableBucket, @Nullable String partition) {
42+
TableWriteImpl<T> tableWrite,
43+
TableBucket tableBucket,
44+
@Nullable String partition,
45+
List<String> partitionKeys) {
4346
this.tableWrite = tableWrite;
4447
this.bucket = tableBucket.getBucket();
45-
this.partition = toPaimonBinaryRow(partition);
48+
this.partition = toPaimonPartitionBinaryRow(partitionKeys, partition);
4649
this.flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket.getBucket());
4750
}
4851

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,26 @@
2626

2727
import javax.annotation.Nullable;
2828

29+
import java.util.List;
30+
2931
import static com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
3032

3133
/** A {@link RecordWriter} to write to Paimon's append-only table. */
3234
public class AppendOnlyWriter extends RecordWriter<InternalRow> {
3335

3436
public AppendOnlyWriter(
35-
FileStoreTable fileStoreTable, TableBucket tableBucket, @Nullable String partition) {
37+
FileStoreTable fileStoreTable,
38+
TableBucket tableBucket,
39+
@Nullable String partition,
40+
List<String> partitionKeys) {
3641
//noinspection unchecked
3742
super(
3843
(TableWriteImpl<InternalRow>)
3944
// todo: set ioManager to support write-buffer-spillable
4045
fileStoreTable.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER),
4146
tableBucket,
42-
partition);
47+
partition,
48+
partitionKeys); // Pass to parent
4349
}
4450

4551
@Override

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import javax.annotation.Nullable;
2929

30+
import java.util.List;
31+
3032
import static com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
3133
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
3234

@@ -38,8 +40,11 @@ public class MergeTreeWriter extends RecordWriter<KeyValue> {
3840
private final RowKeyExtractor rowKeyExtractor;
3941

4042
public MergeTreeWriter(
41-
FileStoreTable fileStoreTable, TableBucket tableBucket, @Nullable String partition) {
42-
super(createTableWrite(fileStoreTable), tableBucket, partition);
43+
FileStoreTable fileStoreTable,
44+
TableBucket tableBucket,
45+
@Nullable String partition,
46+
List<String> partitionKeys) {
47+
super(createTableWrite(fileStoreTable), tableBucket, partition, partitionKeys);
4348
this.rowKeyExtractor = fileStoreTable.createRowKeyExtractor();
4449
}
4550

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.lake.paimon.utils;
1818

19+
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
1920
import com.alibaba.fluss.metadata.TablePath;
2021
import com.alibaba.fluss.record.ChangeType;
2122

@@ -27,6 +28,8 @@
2728

2829
import javax.annotation.Nullable;
2930

31+
import java.util.List;
32+
3033
/** Utils for conversion between Paimon and Fluss. */
3134
public class PaimonConversions {
3235

@@ -50,14 +53,28 @@ public static Identifier toPaimon(TablePath tablePath) {
5053
return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
5154
}
5255

53-
public static BinaryRow toPaimonBinaryRow(@Nullable String value) {
54-
if (value == null) {
56+
public static BinaryRow toPaimonPartitionBinaryRow(
57+
List<String> partitionKeys, @Nullable String partitionName) {
58+
if (partitionName == null || partitionKeys.isEmpty()) {
5559
return BinaryRow.EMPTY_ROW;
5660
}
57-
BinaryRow binaryRow = new BinaryRow(1);
58-
BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
59-
writer.writeString(0, BinaryString.fromString(value));
61+
62+
// Fluss's existing utility
63+
ResolvedPartitionSpec resolvedPartitionSpec =
64+
ResolvedPartitionSpec.fromPartitionName(partitionKeys, partitionName);
65+
66+
BinaryRow partitionBinaryRow = new BinaryRow(partitionKeys.size());
67+
BinaryRowWriter writer = new BinaryRowWriter(partitionBinaryRow);
68+
69+
List<String> partitionValues = resolvedPartitionSpec.getPartitionValues();
70+
for (int i = 0; i < partitionKeys.size(); i++) {
71+
// Todo Currently, partition column must be String datatype, so we can always use
72+
// `BinaryString.fromString` to convert to Paimon's data structure. Revisit here when
73+
// #489 is finished.
74+
writer.writeString(i, BinaryString.fromString(partitionValues.get(i)));
75+
}
76+
6077
writer.complete();
61-
return binaryRow;
78+
return partitionBinaryRow;
6279
}
6380
}

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ void testLogTableRecordAllTypes() {
8383
.isEqualTo(1698235273182L);
8484
assertThat(flussRecordAsPaimonRow.getBinary(12)).isEqualTo(new byte[] {1, 2, 3, 4});
8585
assertThat(flussRecordAsPaimonRow.isNullAt(13)).isTrue();
86-
// verify FlussRecordAsPaimonRow system columns
86+
87+
// verify FlussRecordAsPaimonRow system columns (no partition fields, so indices stay same)
8788
assertThat(flussRecordAsPaimonRow.getInt(14)).isEqualTo(bucket);
8889
assertThat(flussRecordAsPaimonRow.getLong(15)).isEqualTo(logOffset);
8990
assertThat(flussRecordAsPaimonRow.getLong(16)).isEqualTo(timeStamp);
@@ -92,11 +93,7 @@ void testLogTableRecordAllTypes() {
9293
assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.INSERT);
9394

9495
assertThat(flussRecordAsPaimonRow.getFieldCount())
95-
.isEqualTo(
96-
14
97-
+
98-
// 3 is for system columns
99-
3);
96+
.isEqualTo(14 + 3); // business + system = 14 + 0 + 3 = 17
10097
}
10198

10299
@Test

0 commit comments

Comments
 (0)