Skip to content

Commit 058cf13

Browse files
authored
[lake/paimon] Support tiering paimon deletion vector enabled table (#1725)
1 parent 221db5e commit 058cf13

File tree

6 files changed

+158
-94
lines changed

6 files changed

+158
-94
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ private Table getOrMoveToTable(TieringSplit split) {
234234
// instead of fail directly
235235
checkArgument(
236236
currentTableInfo.getTableId() == split.getTableBucket().getTableId(),
237-
"The current table id %s for table path % is different from the table id %s in TieringSplit split.",
237+
"The current table id %s for table path %s is different from the table id %s in TieringSplit split.",
238238
currentTableInfo.getTableId(),
239239
tablePath,
240240
split.getTableBucket().getTableId());

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ public RecordWriter(
5959

6060
CommitMessage complete() throws Exception {
6161
List<CommitMessage> commitMessages = tableWrite.prepareCommit();
62-
checkState(commitMessages.size() == 1, "The size of CommitMessage must be 1.");
62+
checkState(
63+
commitMessages.size() == 1,
64+
"The size of CommitMessage must be 1, but got %s.",
65+
commitMessages);
6366
return commitMessages.get(0);
6467
}
6568

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,25 @@
2222
import org.apache.fluss.record.LogRecord;
2323

2424
import org.apache.paimon.KeyValue;
25+
import org.apache.paimon.disk.IOManager;
2526
import org.apache.paimon.table.FileStoreTable;
2627
import org.apache.paimon.table.sink.RowKeyExtractor;
2728
import org.apache.paimon.table.sink.TableWriteImpl;
2829

2930
import javax.annotation.Nullable;
3031

3132
import java.util.List;
33+
import java.util.Map;
3234

3335
import static org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
3436
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
3537

3638
/** A {@link RecordWriter} to write to Paimon's primary-key table. */
3739
public class MergeTreeWriter extends RecordWriter<KeyValue> {
3840

41+
// the option key to configure the temporary directory used by fluss tiering
42+
private static final String FLUSS_TIERING_TMP_DIR_KEY = "fluss.tiering.io-tmpdir";
43+
3944
private final KeyValue keyValue = new KeyValue();
4045

4146
private final RowKeyExtractor rowKeyExtractor;
@@ -55,8 +60,18 @@ public MergeTreeWriter(
5560
}
5661

5762
private static TableWriteImpl<KeyValue> createTableWrite(FileStoreTable fileStoreTable) {
63+
// we allow users to configure the temporary directory used by fluss tiering
64+
// since the default java.io.tmpdir may not be suitable.
65+
// currently, we don't expose the option, as a workaround way, maybe in the future we can
66+
// expose it if it's needed
67+
Map<String, String> props = fileStoreTable.options();
68+
String tmpDir =
69+
props.getOrDefault(FLUSS_TIERING_TMP_DIR_KEY, System.getProperty("java.io.tmpdir"));
5870
//noinspection unchecked
59-
return (TableWriteImpl<KeyValue>) fileStoreTable.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER);
71+
return (TableWriteImpl<KeyValue>)
72+
fileStoreTable
73+
.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER)
74+
.withIOManager(IOManager.create(tmpDir));
6075
}
6176

6277
@Override

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.time.Duration;
6060
import java.util.ArrayList;
6161
import java.util.Arrays;
62+
import java.util.Collections;
6263
import java.util.HashMap;
6364
import java.util.Iterator;
6465
import java.util.List;
@@ -329,8 +330,25 @@ protected long createPkTable(TablePath tablePath) throws Exception {
329330
return createPkTable(tablePath, 1);
330331
}
331332

333+
protected long createPkTable(
334+
TablePath tablePath,
335+
Map<String, String> tableProperties,
336+
Map<String, String> tableCustomProperties)
337+
throws Exception {
338+
return createPkTable(tablePath, 1, tableProperties, tableCustomProperties);
339+
}
340+
332341
protected long createPkTable(TablePath tablePath, int bucketNum) throws Exception {
333-
TableDescriptor table1Descriptor =
342+
return createPkTable(tablePath, bucketNum, Collections.emptyMap(), Collections.emptyMap());
343+
}
344+
345+
protected long createPkTable(
346+
TablePath tablePath,
347+
int bucketNum,
348+
Map<String, String> tableProperties,
349+
Map<String, String> tableCustomProperties)
350+
throws Exception {
351+
TableDescriptor.Builder tableDescriptor =
334352
TableDescriptor.builder()
335353
.schema(
336354
Schema.newBuilder()
@@ -340,9 +358,10 @@ protected long createPkTable(TablePath tablePath, int bucketNum) throws Exceptio
340358
.build())
341359
.distributedBy(bucketNum)
342360
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
343-
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500))
344-
.build();
345-
return createTable(tablePath, table1Descriptor);
361+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
362+
tableDescriptor.customProperties(tableCustomProperties);
363+
tableDescriptor.properties(tableProperties);
364+
return createTable(tablePath, tableDescriptor.build());
346365
}
347366

348367
protected void dropTable(TablePath tablePath) throws Exception {
@@ -422,7 +441,7 @@ protected void waitUntilBucketSynced(TableBucket tb) {
422441
"bucket " + tb + "not synced");
423442
}
424443

425-
protected void checkDataInPaimonPrimayKeyTable(
444+
protected void checkDataInPaimonPrimaryKeyTable(
426445
TablePath tablePath, List<InternalRow> expectedRows) throws Exception {
427446
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
428447
getPaimonRowCloseableIterator(tablePath);

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 111 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -92,97 +92,124 @@ void testTiering() throws Exception {
9292
// then start tiering job
9393
JobClient jobClient = buildTieringJob(execEnv);
9494

95-
// check the status of replica after synced
96-
assertReplicaStatus(t1Bucket, 3);
97-
// check data in paimon
98-
checkDataInPaimonPrimayKeyTable(t1, rows);
99-
// check snapshot property in paimon
100-
Map<String, String> properties =
101-
new HashMap<String, String>() {
102-
{
103-
put(
104-
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
105-
"[{\"bucket_id\":0,\"log_offset\":3}]");
106-
}
107-
};
108-
checkSnapshotPropertyInPaimon(t1, properties);
109-
110-
// then, create another log table
111-
TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
112-
long t2Id = createLogTable(t2);
113-
TableBucket t2Bucket = new TableBucket(t2Id, 0);
114-
List<InternalRow> flussRows = new ArrayList<>();
115-
// write records
116-
for (int i = 0; i < 10; i++) {
117-
rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
118-
flussRows.addAll(rows);
95+
try {
96+
// check the status of replica after synced
97+
assertReplicaStatus(t1Bucket, 3);
98+
// check data in paimon
99+
checkDataInPaimonPrimaryKeyTable(t1, rows);
100+
// check snapshot property in paimon
101+
Map<String, String> properties =
102+
new HashMap<String, String>() {
103+
{
104+
put(
105+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
106+
"[{\"bucket_id\":0,\"log_offset\":3}]");
107+
}
108+
};
109+
checkSnapshotPropertyInPaimon(t1, properties);
110+
111+
// then, create another log table
112+
TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
113+
long t2Id = createLogTable(t2);
114+
TableBucket t2Bucket = new TableBucket(t2Id, 0);
115+
List<InternalRow> flussRows = new ArrayList<>();
116+
// write records
117+
for (int i = 0; i < 10; i++) {
118+
rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
119+
flussRows.addAll(rows);
120+
// write records
121+
writeRows(t2, rows, true);
122+
}
123+
// check the status of replica after synced;
124+
// note: we can't update log start offset for unaware bucket mode log table
125+
assertReplicaStatus(t2Bucket, 30);
126+
127+
// check data in paimon
128+
checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
129+
130+
// then write data to the pk tables
119131
// write records
120-
writeRows(t2, rows, true);
132+
rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333"));
133+
// write records
134+
writeRows(t1, rows, false);
135+
136+
// check the status of replica of t2 after synced
137+
// not check start offset since we won't
138+
// update start log offset for primary key table
139+
assertReplicaStatus(t1Bucket, 9);
140+
141+
checkDataInPaimonPrimaryKeyTable(t1, rows);
142+
143+
// then create partitioned table and wait partitions are ready
144+
TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, "partitionedTable");
145+
Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
146+
createPartitionedTable(partitionedTablePath);
147+
Map<Long, String> partitionNameByIds = waitUntilPartitions(partitionedTablePath);
148+
149+
// now, write rows into partitioned table
150+
TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
151+
Map<String, List<InternalRow>> writtenRowsByPartition =
152+
writeRowsIntoPartitionedTable(
153+
partitionedTablePath, partitionedTableDescriptor, partitionNameByIds);
154+
long tableId = tableIdAndDescriptor.f0;
155+
156+
// wait until synced to paimon
157+
for (Long partitionId : partitionNameByIds.keySet()) {
158+
TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
159+
assertReplicaStatus(tableBucket, 3);
160+
}
161+
162+
// now, let's check data in paimon per partition
163+
// check data in paimon
164+
String partitionCol = partitionedTableDescriptor.getPartitionKeys().get(0);
165+
for (String partitionName : partitionNameByIds.values()) {
166+
checkDataInPaimonAppendOnlyPartitionedTable(
167+
partitionedTablePath,
168+
Collections.singletonMap(partitionCol, partitionName),
169+
writtenRowsByPartition.get(partitionName),
170+
0);
171+
}
172+
173+
properties =
174+
new HashMap<String, String>() {
175+
{
176+
put(
177+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
178+
"["
179+
+ "{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
180+
+ "{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
181+
+ "]");
182+
}
183+
};
184+
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
185+
} finally {
186+
jobClient.cancel().get();
121187
}
122-
// check the status of replica after synced;
123-
// note: we can't update log start offset for unaware bucket mode log table
124-
assertReplicaStatus(t2Bucket, 30);
125-
126-
// check data in paimon
127-
checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
188+
}
128189

129-
// then write data to the pk tables
130-
// write records
131-
rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333"));
190+
@Test
191+
void testTieringToDvEnabledTable() throws Exception {
192+
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
193+
long t1Id =
194+
createPkTable(
195+
t1,
196+
Collections.singletonMap("table.datalake.auto-compaction", "true"),
197+
Collections.singletonMap("paimon.deletion-vectors.enabled", "true"));
132198
// write records
199+
List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
133200
writeRows(t1, rows, false);
201+
waitUntilSnapshot(t1Id, 1, 0);
134202

135-
// check the status of replica of t2 after synced
136-
// not check start offset since we won't
137-
// update start log offset for primary key table
138-
assertReplicaStatus(t1Bucket, 9);
139-
140-
checkDataInPaimonPrimayKeyTable(t1, rows);
141-
142-
// then create partitioned table and wait partitions are ready
143-
TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, "partitionedTable");
144-
Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
145-
createPartitionedTable(partitionedTablePath);
146-
Map<Long, String> partitionNameByIds = waitUntilPartitions(partitionedTablePath);
147-
148-
// now, write rows into partitioned table
149-
TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
150-
Map<String, List<InternalRow>> writtenRowsByPartition =
151-
writeRowsIntoPartitionedTable(
152-
partitionedTablePath, partitionedTableDescriptor, partitionNameByIds);
153-
long tableId = tableIdAndDescriptor.f0;
154-
155-
// wait until synced to paimon
156-
for (Long partitionId : partitionNameByIds.keySet()) {
157-
TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
158-
assertReplicaStatus(tableBucket, 3);
159-
}
160-
161-
// now, let's check data in paimon per partition
162-
// check data in paimon
163-
String partitionCol = partitionedTableDescriptor.getPartitionKeys().get(0);
164-
for (String partitionName : partitionNameByIds.values()) {
165-
checkDataInPaimonAppendOnlyPartitionedTable(
166-
partitionedTablePath,
167-
Collections.singletonMap(partitionCol, partitionName),
168-
writtenRowsByPartition.get(partitionName),
169-
0);
203+
// then start tiering job
204+
JobClient jobClient = buildTieringJob(execEnv);
205+
try {
206+
// check the status of replica after synced
207+
assertReplicaStatus(new TableBucket(t1Id, 0), 3);
208+
// check data in paimon
209+
checkDataInPaimonPrimaryKeyTable(t1, rows);
210+
} finally {
211+
jobClient.cancel().get();
170212
}
171-
172-
properties =
173-
new HashMap<String, String>() {
174-
{
175-
put(
176-
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
177-
"["
178-
+ "{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
179-
+ "{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
180-
+ "]");
181-
}
182-
};
183-
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
184-
185-
jobClient.cancel().get();
186213
}
187214

188215
private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath partitionedTablePath)

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ void testReCreateSameTable() throws Exception {
7373
// check the status of replica after synced
7474
assertReplicaStatus(t1Bucket, 3);
7575
// check data in paimon
76-
checkDataInPaimonPrimayKeyTable(t1, rows);
76+
checkDataInPaimonPrimaryKeyTable(t1, rows);
7777

7878
// then drop the table
7979
dropTable(t1);
@@ -88,7 +88,7 @@ void testReCreateSameTable() throws Exception {
8888
// check the status of replica after synced
8989
assertReplicaStatus(t2Bucket, 2);
9090
// check data in paimon
91-
checkDataInPaimonPrimayKeyTable(t1, newRows);
91+
checkDataInPaimonPrimaryKeyTable(t1, newRows);
9292

9393
// stop the tiering job
9494
jobClient.cancel().get();

0 commit comments

Comments
 (0)