Skip to content

Commit eb3b5f8

Browse files
authored
[flink] Union read decouple with paimon for pk table (#1543)
1 parent 8317cd9 commit eb3b5f8

File tree

12 files changed

+1276
-140
lines changed

12 files changed

+1276
-140
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alibaba.fluss.flink.lake;
1919

2020
import com.alibaba.fluss.client.table.scanner.ScanRecord;
21+
import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
2122
import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
2223
import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
2324
import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
@@ -52,6 +53,10 @@ public void emitRecord(
5253
} else if (splitState instanceof LakeSnapshotSplitState) {
5354
((LakeSnapshotSplitState) splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
5455
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
56+
} else if (splitState instanceof LakeSnapshotAndFlussLogSplitState) {
57+
((LakeSnapshotAndFlussLogSplitState) splitState)
58+
.setRecordsToSkip(recordAndPos.readRecordsCount());
59+
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
5560
} else {
5661
throw new UnsupportedOperationException(
5762
"Unknown split state type: " + splitState.getClass());

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 14 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import com.alibaba.fluss.client.admin.Admin;
2121
import com.alibaba.fluss.client.metadata.LakeSnapshot;
22+
import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
2223
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
23-
import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
2424
import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
2525
import com.alibaba.fluss.flink.source.split.LogSplit;
2626
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
@@ -30,17 +30,6 @@
3030
import com.alibaba.fluss.metadata.TableBucket;
3131
import com.alibaba.fluss.metadata.TableInfo;
3232

33-
import org.apache.paimon.CoreOptions;
34-
import org.apache.paimon.catalog.Catalog;
35-
import org.apache.paimon.catalog.Identifier;
36-
import org.apache.paimon.flink.FlinkCatalogFactory;
37-
import org.apache.paimon.flink.source.FileStoreSourceSplit;
38-
import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
39-
import org.apache.paimon.options.MemorySize;
40-
import org.apache.paimon.options.Options;
41-
import org.apache.paimon.table.FileStoreTable;
42-
import org.apache.paimon.table.source.InnerTableScan;
43-
4433
import javax.annotation.Nullable;
4534

4635
import java.util.ArrayList;
@@ -53,8 +42,6 @@
5342
import java.util.stream.IntStream;
5443

5544
import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
56-
import static com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
57-
import static com.alibaba.fluss.utils.Preconditions.checkState;
5845

5946
/** A generator for lake splits. */
6047
public class LakeSplitGenerator {
@@ -86,10 +73,6 @@ public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
8673
// get the file store
8774
LakeSnapshot lakeSnapshotInfo =
8875
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
89-
FileStoreTable fileStoreTable =
90-
getTable(
91-
lakeSnapshotInfo.getSnapshotId(),
92-
extractLakeCatalogProperties(tableInfo.getProperties()));
9376

9477
boolean isLogTable = !tableInfo.hasPrimaryKey();
9578
boolean isPartitioned = tableInfo.isPartitioned();
@@ -113,17 +96,13 @@ public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
11396
lakeSplits,
11497
isLogTable,
11598
lakeSnapshotInfo.getTableBucketsOffset(),
116-
partitionNameById,
117-
fileStoreTable);
99+
partitionNameById);
118100
} else {
119101
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
120102
lakeSplits.values().iterator().next();
121103
// non-partitioned table
122104
return generateNoPartitionedTableSplit(
123-
nonPartitionLakeSplits,
124-
isLogTable,
125-
lakeSnapshotInfo.getTableBucketsOffset(),
126-
fileStoreTable);
105+
nonPartitionLakeSplits, isLogTable, lakeSnapshotInfo.getTableBucketsOffset());
127106
}
128107
}
129108

@@ -145,8 +124,7 @@ private List<SourceSplitBase> generatePartitionTableSplit(
145124
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
146125
boolean isLogTable,
147126
Map<TableBucket, Long> tableBucketSnapshotLogOffset,
148-
Map<Long, String> partitionNameById,
149-
@Nullable FileStoreTable fileStoreTable)
127+
Map<Long, String> partitionNameById)
150128
throws Exception {
151129
List<SourceSplitBase> splits = new ArrayList<>();
152130
Map<String, Long> flussPartitionIdByName =
@@ -181,8 +159,7 @@ private List<SourceSplitBase> generatePartitionTableSplit(
181159
partitionName,
182160
isLogTable,
183161
tableBucketSnapshotLogOffset,
184-
bucketEndOffset,
185-
fileStoreTable));
162+
bucketEndOffset));
186163

187164
} else {
188165
// only lake data
@@ -216,8 +193,7 @@ private List<SourceSplitBase> generatePartitionTableSplit(
216193
isLogTable,
217194
// pass empty map since we won't read lake splits
218195
Collections.emptyMap(),
219-
bucketEndOffset,
220-
fileStoreTable));
196+
bucketEndOffset));
221197
}
222198
return splits;
223199
}
@@ -228,8 +204,7 @@ private List<SourceSplitBase> generateSplit(
228204
@Nullable String partitionName,
229205
boolean isLogTable,
230206
Map<TableBucket, Long> tableBucketSnapshotLogOffset,
231-
Map<Integer, Long> bucketEndOffset,
232-
@Nullable FileStoreTable fileStoreTable) {
207+
Map<Integer, Long> bucketEndOffset) {
233208
List<SourceSplitBase> splits = new ArrayList<>();
234209
if (isLogTable) {
235210
if (lakeSplits != null) {
@@ -264,12 +239,9 @@ private List<SourceSplitBase> generateSplit(
264239
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
265240
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
266241
long stoppingOffset = bucketEndOffset.get(bucket);
267-
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
268-
269242
splits.add(
270243
generateSplitForPrimaryKeyTableBucket(
271-
fileStoreTable,
272-
splitGenerator,
244+
lakeSplits != null ? lakeSplits.get(bucket) : null,
273245
tableBucket,
274246
partitionName,
275247
snapshotLogOffset,
@@ -295,83 +267,26 @@ private List<SourceSplitBase> toLakeSnapshotSplits(
295267
}
296268

297269
private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
298-
FileStoreTable fileStoreTable,
299-
FileStoreSourceSplitGenerator splitGenerator,
270+
@Nullable List<LakeSplit> lakeSplits,
300271
TableBucket tableBucket,
301272
@Nullable String partitionName,
302273
@Nullable Long snapshotLogOffset,
303274
long stoppingOffset) {
304-
305275
// no snapshot data for this bucket or no a corresponding log offset in this bucket,
306276
// can only scan from change log
307277
if (snapshotLogOffset == null || snapshotLogOffset < 0) {
308-
return new PaimonSnapshotAndFlussLogSplit(
278+
return new LakeSnapshotAndFlussLogSplit(
309279
tableBucket, partitionName, null, EARLIEST_OFFSET, stoppingOffset);
310280
}
311281

312-
// then, generate a split contains
313-
// snapshot and change log so that we can merge change log and snapshot
314-
// to get the full data
315-
fileStoreTable =
316-
fileStoreTable.copy(
317-
Collections.singletonMap(
318-
CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(),
319-
// we set a max size to make sure only one splits
320-
MemorySize.MAX_VALUE.toString()));
321-
InnerTableScan tableScan =
322-
fileStoreTable.newScan().withBucketFilter((b) -> b == tableBucket.getBucket());
323-
324-
if (partitionName != null) {
325-
tableScan =
326-
tableScan.withPartitionFilter(getPartitionSpec(fileStoreTable, partitionName));
327-
}
328-
329-
List<FileStoreSourceSplit> fileStoreSourceSplits =
330-
splitGenerator.createSplits(tableScan.plan());
331-
332-
checkState(fileStoreSourceSplits.size() == 1, "Splits for primary key table must be 1.");
333-
FileStoreSourceSplit fileStoreSourceSplit = fileStoreSourceSplits.get(0);
334-
return new PaimonSnapshotAndFlussLogSplit(
335-
tableBucket,
336-
partitionName,
337-
fileStoreSourceSplit,
338-
snapshotLogOffset,
339-
stoppingOffset);
340-
}
341-
342-
private Map<String, String> getPartitionSpec(
343-
FileStoreTable fileStoreTable, String partitionName) {
344-
List<String> partitionKeys = fileStoreTable.partitionKeys();
345-
checkState(
346-
partitionKeys.size() == 1,
347-
"Must only one partition key for paimon table %, but got %s, the partition keys are: ",
348-
tableInfo.getTablePath(),
349-
partitionKeys.size(),
350-
partitionKeys);
351-
return Collections.singletonMap(partitionKeys.get(0), partitionName);
352-
}
353-
354-
private FileStoreTable getTable(long snapshotId, Map<String, String> catalogProperties)
355-
throws Exception {
356-
try (Catalog catalog =
357-
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(catalogProperties))) {
358-
return (FileStoreTable)
359-
catalog.getTable(
360-
Identifier.create(
361-
tableInfo.getTablePath().getDatabaseName(),
362-
tableInfo.getTablePath().getTableName()))
363-
.copy(
364-
Collections.singletonMap(
365-
CoreOptions.SCAN_SNAPSHOT_ID.key(),
366-
String.valueOf(snapshotId)));
367-
}
282+
return new LakeSnapshotAndFlussLogSplit(
283+
tableBucket, partitionName, lakeSplits, snapshotLogOffset, stoppingOffset);
368284
}
369285

370286
private List<SourceSplitBase> generateNoPartitionedTableSplit(
371287
Map<Integer, List<LakeSplit>> lakeSplits,
372288
boolean isLogTable,
373-
Map<TableBucket, Long> tableBucketSnapshotLogOffset,
374-
FileStoreTable fileStoreTable) {
289+
Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
375290
// iterate all bucket
376291
// assume bucket is from 0 to bucket count
377292
Map<Integer, Long> bucketEndOffset =
@@ -380,12 +295,6 @@ private List<SourceSplitBase> generateNoPartitionedTableSplit(
380295
IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()),
381296
bucketOffsetsRetriever);
382297
return generateSplit(
383-
lakeSplits,
384-
null,
385-
null,
386-
isLogTable,
387-
tableBucketSnapshotLogOffset,
388-
bucketEndOffset,
389-
fileStoreTable);
298+
lakeSplits, null, null, isLogTable, tableBucketSnapshotLogOffset, bucketEndOffset);
390299
}
391300
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package com.alibaba.fluss.flink.lake;
1919

2020
import com.alibaba.fluss.client.table.Table;
21+
import com.alibaba.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner;
2122
import com.alibaba.fluss.flink.lake.reader.LakeSnapshotScanner;
23+
import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
2224
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
2325
import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
2426
import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
@@ -73,7 +75,8 @@ public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> boundedSplits
7375
boundedSplits.add(split);
7476
} else if (split instanceof LakeSnapshotSplit) {
7577
boundedSplits.add(split);
76-
// TODO support primary key table in https://github.com/apache/fluss/issues/1434
78+
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
79+
boundedSplits.add(split);
7780
} else {
7881
throw new UnsupportedOperationException(
7982
String.format("The split type of %s is not supported.", split.getClass()));
@@ -112,7 +115,15 @@ public BoundedSplitReader getBoundedSplitScanner(SourceSplitBase split) {
112115
new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
113116
return new BoundedSplitReader(
114117
lakeSnapshotScanner, lakeSnapshotSplit.getRecordsToSplit());
115-
// TODO support primary key table in https://github.com/apache/fluss/issues/1434
118+
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
119+
LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
120+
(LakeSnapshotAndFlussLogSplit) split;
121+
LakeSnapshotAndLogSplitScanner lakeSnapshotAndLogSplitScanner =
122+
new LakeSnapshotAndLogSplitScanner(
123+
table, lakeSource, lakeSnapshotAndFlussLogSplit, projectedFields);
124+
return new BoundedSplitReader(
125+
lakeSnapshotAndLogSplitScanner,
126+
lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
116127
} else {
117128
throw new UnsupportedOperationException(
118129
String.format("The split type of %s is not supported.", split.getClass()));

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package com.alibaba.fluss.flink.lake;
1919

20+
import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
2021
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
21-
import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
2222
import com.alibaba.fluss.flink.source.split.LogSplit;
2323
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
2424
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
@@ -27,15 +27,15 @@
2727

2828
import org.apache.flink.core.memory.DataInputDeserializer;
2929
import org.apache.flink.core.memory.DataOutputSerializer;
30-
import org.apache.paimon.flink.source.FileStoreSourceSplit;
31-
import org.apache.paimon.flink.source.FileStoreSourceSplitSerializer;
3230

3331
import javax.annotation.Nullable;
3432

3533
import java.io.IOException;
34+
import java.util.ArrayList;
35+
import java.util.List;
3636

37+
import static com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
3738
import static com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit.LAKE_SNAPSHOT_SPLIT_KIND;
38-
import static com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit.PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
3939

4040
/** A serializer for lake split. */
4141
public class LakeSplitSerializer {
@@ -52,32 +52,30 @@ public void serialize(DataOutputSerializer out, SourceSplitBase split) throws IO
5252
sourceSplitSerializer.serialize(((LakeSnapshotSplit) split).getLakeSplit());
5353
out.writeInt(serializeBytes.length);
5454
out.write(serializeBytes);
55-
} else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
56-
// TODO support primary key table in https://github.com/apache/fluss/issues/1434
57-
FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
58-
new FileStoreSourceSplitSerializer();
55+
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
5956
// writing file store source split
60-
PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
61-
((PaimonSnapshotAndFlussLogSplit) split);
62-
FileStoreSourceSplit fileStoreSourceSplit =
63-
paimonSnapshotAndFlussLogSplit.getSnapshotSplit();
64-
if (fileStoreSourceSplit == null) {
57+
LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
58+
((LakeSnapshotAndFlussLogSplit) split);
59+
List<LakeSplit> lakeSplits = lakeSnapshotAndFlussLogSplit.getLakeSplits();
60+
if (lakeSplits == null) {
6561
// no snapshot data for the bucket
6662
out.writeBoolean(false);
6763
} else {
6864
out.writeBoolean(true);
69-
byte[] serializeBytes =
70-
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
71-
out.writeInt(serializeBytes.length);
72-
out.write(serializeBytes);
65+
out.writeInt(lakeSplits.size());
66+
for (LakeSplit lakeSplit : lakeSplits) {
67+
byte[] serializeBytes = sourceSplitSerializer.serialize(lakeSplit);
68+
out.writeInt(serializeBytes.length);
69+
out.write(serializeBytes);
70+
}
7371
}
7472
// writing starting/stopping offset
75-
out.writeLong(paimonSnapshotAndFlussLogSplit.getStartingOffset());
73+
out.writeLong(lakeSnapshotAndFlussLogSplit.getStartingOffset());
7674
out.writeLong(
77-
paimonSnapshotAndFlussLogSplit
75+
lakeSnapshotAndFlussLogSplit
7876
.getStoppingOffset()
7977
.orElse(LogSplit.NO_STOPPING_OFFSET));
80-
out.writeLong(paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
78+
out.writeLong(lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
8179
} else {
8280
throw new UnsupportedOperationException(
8381
"Unsupported split type: " + split.getClass().getName());
@@ -97,25 +95,26 @@ public SourceSplitBase deserialize(
9795
sourceSplitSerializer.deserialize(
9896
sourceSplitSerializer.getVersion(), serializeBytes);
9997
return new LakeSnapshotSplit(tableBucket, partition, fileStoreSourceSplit);
100-
// TODO support primary key table in https://github.com/apache/fluss/issues/1434
101-
} else if (splitKind == PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
102-
FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
103-
new FileStoreSourceSplitSerializer();
104-
FileStoreSourceSplit fileStoreSourceSplit = null;
98+
} else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
99+
List<LakeSplit> lakeSplits = null;
105100
if (input.readBoolean()) {
106-
byte[] serializeBytes = new byte[input.readInt()];
107-
input.read(serializeBytes);
108-
fileStoreSourceSplit =
109-
fileStoreSourceSplitSerializer.deserialize(
110-
fileStoreSourceSplitSerializer.getVersion(), serializeBytes);
101+
int lakeSplitSize = input.readInt();
102+
lakeSplits = new ArrayList<>(lakeSplitSize);
103+
for (int i = 0; i < lakeSplitSize; i++) {
104+
byte[] serializeBytes = new byte[input.readInt()];
105+
input.read(serializeBytes);
106+
lakeSplits.add(
107+
sourceSplitSerializer.deserialize(
108+
sourceSplitSerializer.getVersion(), serializeBytes));
109+
}
111110
}
112111
long startingOffset = input.readLong();
113112
long stoppingOffset = input.readLong();
114113
long recordsToSkip = input.readLong();
115-
return new PaimonSnapshotAndFlussLogSplit(
114+
return new LakeSnapshotAndFlussLogSplit(
116115
tableBucket,
117116
partition,
118-
fileStoreSourceSplit,
117+
lakeSplits,
119118
startingOffset,
120119
stoppingOffset,
121120
recordsToSkip);

0 commit comments

Comments
 (0)