Skip to content

Commit 949a29b

Browse files
committed
[lake/paimon] Support snapshot expiration for tables that are not write-only
1 parent 03c8602 commit 949a29b

File tree

2 files changed

+165
-68
lines changed

2 files changed

+165
-68
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
import org.apache.paimon.manifest.ManifestCommittable;
3333
import org.apache.paimon.manifest.ManifestEntry;
3434
import org.apache.paimon.manifest.SimpleFileEntry;
35-
import org.apache.paimon.operation.FileStoreCommit;
3635
import org.apache.paimon.table.FileStoreTable;
3736
import org.apache.paimon.table.sink.CommitCallback;
37+
import org.apache.paimon.table.sink.TableCommitImpl;
3838
import org.apache.paimon.utils.SnapshotManager;
3939

4040
import javax.annotation.Nullable;
@@ -55,7 +55,7 @@ public class PaimonLakeCommitter implements LakeCommitter<PaimonWriteResult, Pai
5555

5656
private final Catalog paimonCatalog;
5757
private final FileStoreTable fileStoreTable;
58-
private FileStoreCommit fileStoreCommit;
58+
private TableCommitImpl tableCommit;
5959
private static final ThreadLocal<Long> currentCommitSnapshotId = new ThreadLocal<>();
6060
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
6161

@@ -82,29 +82,26 @@ public long commit(PaimonCommittable committable, Map<String, String> snapshotPr
8282
snapshotProperties.forEach(manifestCommittable::addProperty);
8383

8484
try {
85-
fileStoreCommit =
86-
fileStoreTable
87-
.store()
88-
.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, fileStoreTable);
89-
fileStoreCommit.commit(manifestCommittable, false);
85+
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
86+
tableCommit.commit(manifestCommittable);
87+
9088
Long commitSnapshotId = currentCommitSnapshotId.get();
9189
currentCommitSnapshotId.remove();
9290

9391
return checkNotNull(commitSnapshotId, "Paimon committed snapshot id must be non-null.");
9492
} catch (Throwable t) {
95-
if (fileStoreCommit != null) {
93+
if (tableCommit != null) {
9694
// if any error happen while commit, abort the commit to clean committable
97-
fileStoreCommit.abort(manifestCommittable.fileCommittables());
95+
tableCommit.abort(manifestCommittable.fileCommittables());
9896
}
9997
throw new IOException(t);
10098
}
10199
}
102100

103101
@Override
104102
public void abort(PaimonCommittable committable) throws IOException {
105-
fileStoreCommit =
106-
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, fileStoreTable);
107-
fileStoreCommit.abort(committable.manifestCommittable().fileCommittables());
103+
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
104+
tableCommit.abort(committable.manifestCommittable().fileCommittables());
108105
}
109106

110107
@Nullable
@@ -191,8 +188,8 @@ private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) throws IOEx
191188
@Override
192189
public void close() throws Exception {
193190
try {
194-
if (fileStoreCommit != null) {
195-
fileStoreCommit.close();
191+
if (tableCommit != null) {
192+
tableCommit.close();
196193
}
197194
if (paimonCatalog != null) {
198195
paimonCatalog.close();

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 154 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.paimon.table.source.Split;
5050
import org.apache.paimon.types.DataTypes;
5151
import org.apache.paimon.utils.CloseableIterator;
52+
import org.apache.paimon.utils.SnapshotManager;
5253
import org.junit.jupiter.api.BeforeEach;
5354
import org.junit.jupiter.api.Test;
5455
import org.junit.jupiter.api.io.TempDir;
@@ -106,6 +107,18 @@ private static Stream<Arguments> tieringWriteArgs() {
106107
Arguments.of(false, false));
107108
}
108109

110+
private static Stream<Arguments> snapshotExpireArgs() {
111+
return Stream.of(
112+
Arguments.of(true, true, true),
113+
Arguments.of(true, true, false),
114+
Arguments.of(true, false, true),
115+
Arguments.of(true, false, false),
116+
Arguments.of(false, true, true),
117+
Arguments.of(false, true, false),
118+
Arguments.of(false, false, true),
119+
Arguments.of(false, false, false));
120+
}
121+
109122
@ParameterizedTest
110123
@MethodSource("tieringWriteArgs")
111124
void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) throws Exception {
@@ -118,7 +131,11 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr
118131
isPrimaryKeyTable ? "primary_key" : "log",
119132
isPartitioned ? "partitioned" : "non_partitioned"));
120133
createTable(
121-
tablePath, isPrimaryKeyTable, isPartitioned, isPrimaryKeyTable ? bucketNum : null);
134+
tablePath,
135+
isPrimaryKeyTable,
136+
isPartitioned,
137+
isPrimaryKeyTable ? bucketNum : null,
138+
Collections.emptyMap());
122139
TableDescriptor descriptor =
123140
TableDescriptor.builder()
124141
.schema(
@@ -132,12 +149,6 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr
132149
.build();
133150
TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L);
134151

135-
List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
136-
SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer =
137-
paimonLakeTieringFactory.getWriteResultSerializer();
138-
SimpleVersionedSerializer<PaimonCommittable> committableSerializer =
139-
paimonLakeTieringFactory.getCommittableSerializer();
140-
141152
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter =
142153
createLakeCommitter(tablePath)) {
143154
// should no any missing snapshot
@@ -155,53 +166,9 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr
155166
}
156167
}
157168
: Collections.singletonMap(null, null);
158-
Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
159-
// first, write data
160-
for (int bucket = 0; bucket < bucketNum; bucket++) {
161-
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
162-
String partition = entry.getValue();
163-
try (LakeWriter<PaimonWriteResult> lakeWriter =
164-
createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) {
165-
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
166-
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
167-
isPrimaryKeyTable
168-
? genPrimaryKeyTableRecords(partition, bucket)
169-
: genLogTableRecords(partition, bucket, 10);
170-
List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
171-
List<LogRecord> expectRecords = writeAndExpectRecords.f1;
172-
recordsByBucket.put(partitionBucket, expectRecords);
173-
tableBucketOffsets.put(new TableBucket(0, entry.getKey(), bucket), 10L);
174-
for (LogRecord logRecord : writtenRecords) {
175-
lakeWriter.write(logRecord);
176-
}
177-
// serialize/deserialize writeResult
178-
PaimonWriteResult paimonWriteResult = lakeWriter.complete();
179-
byte[] serialized = writeResultSerializer.serialize(paimonWriteResult);
180-
paimonWriteResults.add(
181-
writeResultSerializer.deserialize(
182-
writeResultSerializer.getVersion(), serialized));
183-
}
184-
}
185-
}
186169

187-
// second, commit data
188-
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter =
189-
createLakeCommitter(tablePath)) {
190-
// serialize/deserialize committable
191-
PaimonCommittable paimonCommittable = lakeCommitter.toCommittable(paimonWriteResults);
192-
byte[] serialized = committableSerializer.serialize(paimonCommittable);
193-
paimonCommittable =
194-
committableSerializer.deserialize(
195-
committableSerializer.getVersion(), serialized);
196-
long snapshot =
197-
lakeCommitter.commit(
198-
paimonCommittable,
199-
toBucketOffsetsProperty(
200-
tableBucketOffsets,
201-
partitionIdAndName,
202-
getPartitionKeys(tablePath)));
203-
assertThat(snapshot).isEqualTo(1);
204-
}
170+
// firstly, write some data
171+
writeData(tableInfo, recordsByBucket, partitionIdAndName);
205172

206173
// then, check data
207174
for (int bucket = 0; bucket < 3; bucket++) {
@@ -401,6 +368,77 @@ void testThreePartitionTiering() throws Exception {
401368
}
402369
}
403370

371+
@ParameterizedTest
372+
@MethodSource("snapshotExpireArgs")
373+
void testSnapshotExpiration(
374+
boolean isPrimaryKeyTable, boolean isPartitioned, boolean isWriteOnly)
375+
throws Exception {
376+
int bucketNum = 3;
377+
TablePath tablePath =
378+
TablePath.of(
379+
"paimon",
380+
String.format(
381+
"test_tiering_snapshot_expiration_table_%s_%s",
382+
isPrimaryKeyTable ? "primary_key" : "log",
383+
isPartitioned ? "partitioned" : "non_partitioned"));
384+
Map<String, String> options = new HashMap<>();
385+
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
386+
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "2");
387+
if (isWriteOnly) {
388+
options.put(CoreOptions.WRITE_ONLY.key(), "true");
389+
}
390+
createTable(
391+
tablePath,
392+
isPrimaryKeyTable,
393+
isPartitioned,
394+
isPrimaryKeyTable ? bucketNum : null,
395+
options);
396+
TableDescriptor descriptor =
397+
TableDescriptor.builder()
398+
.schema(
399+
org.apache.fluss.metadata.Schema.newBuilder()
400+
.column("c1", org.apache.fluss.types.DataTypes.INT())
401+
.column("c2", org.apache.fluss.types.DataTypes.STRING())
402+
.column("c3", org.apache.fluss.types.DataTypes.STRING())
403+
.build())
404+
.distributedBy(bucketNum)
405+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
406+
.build();
407+
TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L);
408+
// Get the FileStoreTable to verify snapshots
409+
FileStoreTable fileStoreTable =
410+
(FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath));
411+
SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
412+
413+
Map<Long, String> partitionIdAndName =
414+
isPartitioned
415+
? new HashMap<Long, String>() {
416+
{
417+
put(1L, "p1");
418+
put(2L, "p2");
419+
put(3L, "p3");
420+
}
421+
}
422+
: Collections.singletonMap(null, null);
423+
424+
// Write some data to generate 2 snapshots
425+
writeData(tableInfo, new HashMap<>(), partitionIdAndName);
426+
writeData(tableInfo, new HashMap<>(), partitionIdAndName);
427+
assertThat(snapshotManager.snapshotCount()).isEqualTo(2);
428+
429+
// write more data
430+
for (int i = 0; i < 5; i++) {
431+
writeData(tableInfo, new HashMap<>(), partitionIdAndName);
432+
if (isWriteOnly) {
433+
// if write-only table, snapshot should never be expired
434+
assertThat(snapshotManager.snapshotCount()).isGreaterThan(2);
435+
} else {
436+
// if not write-only table, snapshot should be expired
437+
assertThat(snapshotManager.snapshotCount()).isEqualTo(2);
438+
}
439+
}
440+
}
441+
404442
private void verifyLogTableRecordsMultiPartition(
405443
CloseableIterator<InternalRow> actualRecords,
406444
List<LogRecord> expectRecords,
@@ -726,7 +764,8 @@ private void createTable(
726764
TablePath tablePath,
727765
boolean isPrimaryTable,
728766
boolean isPartitioned,
729-
@Nullable Integer numBuckets)
767+
@Nullable Integer numBuckets,
768+
Map<String, String> options)
730769
throws Exception {
731770
Schema.Builder builder =
732771
Schema.newBuilder()
@@ -749,6 +788,7 @@ private void createTable(
749788
if (numBuckets != null) {
750789
builder.option(CoreOptions.BUCKET.key(), String.valueOf(numBuckets));
751790
}
791+
builder.options(options);
752792
doCreatePaimonTable(tablePath, builder);
753793
}
754794

@@ -804,4 +844,64 @@ private List<String> getPartitionKeys(TablePath tablePath) throws Exception {
804844
FileStoreTable fileStoreTable = (FileStoreTable) paimonCatalog.getTable(identifier);
805845
return fileStoreTable.partitionKeys();
806846
}
847+
848+
private void writeData(
849+
TableInfo tableInfo,
850+
Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket,
851+
Map<Long, String> partitionIdAndName)
852+
throws Exception {
853+
TablePath tablePath = tableInfo.getTablePath();
854+
int bucketNum = tableInfo.getNumBuckets();
855+
boolean isPrimaryKeyTable = tableInfo.hasPrimaryKey();
856+
857+
List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
858+
SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer =
859+
paimonLakeTieringFactory.getWriteResultSerializer();
860+
SimpleVersionedSerializer<PaimonCommittable> committableSerializer =
861+
paimonLakeTieringFactory.getCommittableSerializer();
862+
863+
Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
864+
// first, write data
865+
for (int bucket = 0; bucket < bucketNum; bucket++) {
866+
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
867+
String partition = entry.getValue();
868+
try (LakeWriter<PaimonWriteResult> lakeWriter =
869+
createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) {
870+
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
871+
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
872+
isPrimaryKeyTable
873+
? genPrimaryKeyTableRecords(partition, bucket)
874+
: genLogTableRecords(partition, bucket, 10);
875+
List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
876+
List<LogRecord> expectRecords = writeAndExpectRecords.f1;
877+
recordsByBucket.put(partitionBucket, expectRecords);
878+
tableBucketOffsets.put(new TableBucket(0, entry.getKey(), bucket), 10L);
879+
for (LogRecord logRecord : writtenRecords) {
880+
lakeWriter.write(logRecord);
881+
}
882+
// serialize/deserialize writeResult
883+
PaimonWriteResult paimonWriteResult = lakeWriter.complete();
884+
byte[] serialized = writeResultSerializer.serialize(paimonWriteResult);
885+
paimonWriteResults.add(
886+
writeResultSerializer.deserialize(
887+
writeResultSerializer.getVersion(), serialized));
888+
}
889+
}
890+
}
891+
892+
// second, commit data
893+
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter =
894+
createLakeCommitter(tablePath)) {
895+
// serialize/deserialize committable
896+
PaimonCommittable paimonCommittable = lakeCommitter.toCommittable(paimonWriteResults);
897+
byte[] serialized = committableSerializer.serialize(paimonCommittable);
898+
paimonCommittable =
899+
committableSerializer.deserialize(
900+
committableSerializer.getVersion(), serialized);
901+
lakeCommitter.commit(
902+
paimonCommittable,
903+
toBucketOffsetsProperty(
904+
tableBucketOffsets, partitionIdAndName, getPartitionKeys(tablePath)));
905+
}
906+
}
807907
}

0 commit comments

Comments
 (0)