Skip to content

Commit 7937996

Browse files
authored
[flink] Union read support log table in streaming mode (#1575)
1 parent faa0815 commit 7937996

File tree

18 files changed

+371
-76
lines changed

18 files changed

+371
-76
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.exception.InvalidReplicationFactorException;
3232
import org.apache.fluss.exception.InvalidTableException;
3333
import org.apache.fluss.exception.KvSnapshotNotExistException;
34+
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
3435
import org.apache.fluss.exception.NonPrimaryKeyTableException;
3536
import org.apache.fluss.exception.PartitionAlreadyExistsException;
3637
import org.apache.fluss.exception.PartitionNotExistException;
@@ -383,6 +384,7 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
383384
*
384385
* <ul>
385386
* <li>{@link TableNotExistException} if the table does not exist.
387+
* <li>{@link LakeTableSnapshotNotExistException} if no any lake snapshot exist.
386388
* </ul>
387389
*
388390
* @param tablePath the table path of the table.

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.client.admin.Admin;
2121
import org.apache.fluss.client.metadata.LakeSnapshot;
22+
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
2223
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
2324
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
2425
import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -29,6 +30,7 @@
2930
import org.apache.fluss.metadata.PartitionInfo;
3031
import org.apache.fluss.metadata.TableBucket;
3132
import org.apache.fluss.metadata.TableInfo;
33+
import org.apache.fluss.utils.ExceptionUtils;
3234

3335
import javax.annotation.Nullable;
3436

@@ -75,10 +77,23 @@ public LakeSplitGenerator(
7577
this.listPartitionSupplier = listPartitionSupplier;
7678
}
7779

78-
public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
79-
// get the file store
80-
LakeSnapshot lakeSnapshotInfo =
81-
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
80+
/**
81+
* Return A list of hybrid lake snapshot {@link LakeSnapshotSplit}, {@link
82+
* LakeSnapshotAndFlussLogSplit} and the corresponding Fluss {@link LogSplit} based on the lake
83+
* snapshot. Return null if no lake snapshot exists.
84+
*/
85+
@Nullable
86+
public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
87+
LakeSnapshot lakeSnapshotInfo;
88+
try {
89+
lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
90+
} catch (Exception exception) {
91+
if (ExceptionUtils.stripExecutionException(exception)
92+
instanceof LakeTableSnapshotNotExistException) {
93+
return null;
94+
}
95+
throw exception;
96+
}
8297

8398
boolean isLogTable = !tableInfo.hasPrimaryKey();
8499
boolean isPartitioned = tableInfo.isPartitioned();
@@ -90,10 +105,7 @@ public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
90105
(LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId)
91106
.plan());
92107

93-
if (lakeSplits.isEmpty()) {
94-
return Collections.emptyList();
95-
}
96-
108+
Map<TableBucket, Long> tableBucketsOffset = lakeSnapshotInfo.getTableBucketsOffset();
97109
if (isPartitioned) {
98110
Set<PartitionInfo> partitionInfos = listPartitionSupplier.get();
99111
Map<Long, String> partitionNameById =
@@ -103,16 +115,13 @@ public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
103115
PartitionInfo::getPartitionId,
104116
PartitionInfo::getPartitionName));
105117
return generatePartitionTableSplit(
106-
lakeSplits,
107-
isLogTable,
108-
lakeSnapshotInfo.getTableBucketsOffset(),
109-
partitionNameById);
118+
lakeSplits, isLogTable, tableBucketsOffset, partitionNameById);
110119
} else {
111120
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
112121
lakeSplits.values().iterator().next();
113122
// non-partitioned table
114123
return generateNoPartitionedTableSplit(
115-
nonPartitionLakeSplits, isLogTable, lakeSnapshotInfo.getTableBucketsOffset());
124+
nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
116125
}
117126
}
118127

@@ -134,8 +143,7 @@ private List<SourceSplitBase> generatePartitionTableSplit(
134143
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
135144
boolean isLogTable,
136145
Map<TableBucket, Long> tableBucketSnapshotLogOffset,
137-
Map<Long, String> partitionNameById)
138-
throws Exception {
146+
Map<Long, String> partitionNameById) {
139147
List<SourceSplitBase> splits = new ArrayList<>();
140148
Map<String, Long> flussPartitionIdByName =
141149
partitionNameById.entrySet().stream()
@@ -189,7 +197,7 @@ private List<SourceSplitBase> generatePartitionTableSplit(
189197
// iterate remain fluss splits
190198
for (Map.Entry<String, Long> partitionIdByNameEntry : flussPartitionIdByName.entrySet()) {
191199
String partitionName = partitionIdByNameEntry.getKey();
192-
long partitionId = partitionIdByNameEntry.getValue();
200+
Long partitionId = partitionIdByNameEntry.getValue();
193201
Map<Integer, Long> bucketEndOffset =
194202
stoppingOffsetInitializer.getBucketOffsets(
195203
partitionName,
@@ -224,7 +232,7 @@ private List<SourceSplitBase> generateSplit(
224232
TableBucket tableBucket =
225233
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
226234
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
227-
long stoppingOffset = bucketEndOffset.get(bucket);
235+
Long stoppingOffset = bucketEndOffset.get(bucket);
228236
if (snapshotLogOffset == null) {
229237
// no any data commit to this bucket, scan from fluss log
230238
splits.add(
@@ -248,7 +256,7 @@ private List<SourceSplitBase> generateSplit(
248256
TableBucket tableBucket =
249257
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
250258
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
251-
long stoppingOffset = bucketEndOffset.get(bucket);
259+
Long stoppingOffset = bucketEndOffset.get(bucket);
252260
splits.add(
253261
generateSplitForPrimaryKeyTableBucket(
254262
lakeSplits != null ? lakeSplits.get(bucket) : null,
@@ -267,11 +275,14 @@ private List<SourceSplitBase> toLakeSnapshotSplits(
267275
@Nullable String partitionName,
268276
@Nullable Long partitionId) {
269277
List<SourceSplitBase> splits = new ArrayList<>();
278+
// we may have multiple table buckets; so we need to
279+
// introduce an index to make split unique
280+
int index = 0;
270281
for (LakeSplit lakeSplit :
271282
lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList())) {
272283
TableBucket tableBucket =
273284
new TableBucket(tableInfo.getTableId(), partitionId, lakeSplit.bucket());
274-
splits.add(new LakeSnapshotSplit(tableBucket, partitionName, lakeSplit));
285+
splits.add(new LakeSnapshotSplit(tableBucket, partitionName, lakeSplit, index++));
275286
}
276287
return splits;
277288
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ public LakeSplitSerializer(SimpleVersionedSerializer<LakeSplit> sourceSplitSeria
4848

4949
public void serialize(DataOutputSerializer out, SourceSplitBase split) throws IOException {
5050
if (split instanceof LakeSnapshotSplit) {
51-
byte[] serializeBytes =
52-
sourceSplitSerializer.serialize(((LakeSnapshotSplit) split).getLakeSplit());
51+
LakeSnapshotSplit lakeSplit = (LakeSnapshotSplit) split;
52+
out.writeInt(lakeSplit.getSplitIndex());
53+
byte[] serializeBytes = sourceSplitSerializer.serialize(lakeSplit.getLakeSplit());
5354
out.writeInt(serializeBytes.length);
5455
out.write(serializeBytes);
5556
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
@@ -89,12 +90,13 @@ public SourceSplitBase deserialize(
8990
DataInputDeserializer input)
9091
throws IOException {
9192
if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
93+
int splitIndex = input.readInt();
9294
byte[] serializeBytes = new byte[input.readInt()];
9395
input.read(serializeBytes);
94-
LakeSplit fileStoreSourceSplit =
96+
LakeSplit lakeSplit =
9597
sourceSplitSerializer.deserialize(
9698
sourceSplitSerializer.getVersion(), serializeBytes);
97-
return new LakeSnapshotSplit(tableBucket, partition, fileStoreSourceSplit);
99+
return new LakeSnapshotSplit(tableBucket, partition, lakeSplit, splitIndex);
98100
} else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
99101
List<LakeSplit> lakeSplits = null;
100102
if (input.readBoolean()) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,23 @@ public String splitId() {
107107
public List<LakeSplit> getLakeSplits() {
108108
return lakeSnapshotSplits;
109109
}
110+
111+
@Override
112+
public String toString() {
113+
return "LakeSnapshotAndFlussLogSplit{"
114+
+ "lakeSnapshotSplits="
115+
+ lakeSnapshotSplits
116+
+ ", recordOffset="
117+
+ recordOffset
118+
+ ", startingOffset="
119+
+ startingOffset
120+
+ ", stoppingOffset="
121+
+ stoppingOffset
122+
+ ", tableBucket="
123+
+ tableBucket
124+
+ ", partitionName='"
125+
+ partitionName
126+
+ '\''
127+
+ '}';
128+
}
110129
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,25 @@ public class LakeSnapshotSplit extends SourceSplitBase {
3232

3333
private final long recordsToSplit;
3434

35+
private final int splitIndex;
36+
3537
public LakeSnapshotSplit(
36-
TableBucket tableBucket, @Nullable String partitionName, LakeSplit lakeSplit) {
37-
this(tableBucket, partitionName, lakeSplit, 0);
38+
TableBucket tableBucket,
39+
@Nullable String partitionName,
40+
LakeSplit lakeSplit,
41+
int splitIndex) {
42+
this(tableBucket, partitionName, lakeSplit, splitIndex, 0);
3843
}
3944

4045
public LakeSnapshotSplit(
4146
TableBucket tableBucket,
4247
@Nullable String partitionName,
4348
LakeSplit lakeSplit,
49+
int splitIndex,
4450
long recordsToSplit) {
4551
super(tableBucket, partitionName);
4652
this.lakeSplit = lakeSplit;
53+
this.splitIndex = splitIndex;
4754
this.recordsToSplit = recordsToSplit;
4855
}
4956

@@ -55,14 +62,20 @@ public long getRecordsToSplit() {
5562
return recordsToSplit;
5663
}
5764

65+
public int getSplitIndex() {
66+
return splitIndex;
67+
}
68+
5869
@Override
5970
public String splitId() {
6071
return toSplitId(
61-
"lake-snapshot-",
62-
new TableBucket(
63-
tableBucket.getTableId(),
64-
tableBucket.getPartitionId(),
65-
lakeSplit.bucket()));
72+
"lake-snapshot-",
73+
new TableBucket(
74+
tableBucket.getTableId(),
75+
tableBucket.getPartitionId(),
76+
lakeSplit.bucket()))
77+
+ "-"
78+
+ splitIndex;
6679
}
6780

6881
@Override
@@ -74,4 +87,21 @@ public boolean isLakeSplit() {
7487
public byte splitKind() {
7588
return LAKE_SNAPSHOT_SPLIT_KIND;
7689
}
90+
91+
@Override
92+
public String toString() {
93+
return "LakeSnapshotSplit{"
94+
+ "lakeSplit="
95+
+ lakeSplit
96+
+ ", recordsToSplit="
97+
+ recordsToSplit
98+
+ ", splitIndex="
99+
+ splitIndex
100+
+ ", tableBucket="
101+
+ tableBucket
102+
+ ", partitionName='"
103+
+ partitionName
104+
+ '\''
105+
+ '}';
106+
}
77107
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public SourceSplitBase toSourceSplit() {
4343
split.getTableBucket(),
4444
split.getPartitionName(),
4545
split.getLakeSplit(),
46+
split.getSplitIndex(),
4647
recordsToSplit);
4748
}
4849
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
161161
splitEnumeratorContext,
162162
sourceEnumeratorState.getAssignedBuckets(),
163163
sourceEnumeratorState.getAssignedPartitions(),
164+
sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
164165
offsetsInitializer,
165166
scanPartitionDiscoveryIntervalMs,
166167
streaming,
@@ -175,7 +176,7 @@ public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
175176

176177
@Override
177178
public SimpleVersionedSerializer<SourceEnumeratorState> getEnumeratorCheckpointSerializer() {
178-
return FlussSourceEnumeratorStateSerializer.INSTANCE;
179+
return new FlussSourceEnumeratorStateSerializer(lakeSource);
179180
}
180181

181182
@Override

0 commit comments

Comments
 (0)