Skip to content

Commit 0964d86

Browse files
authored
[flink] Union read supports reading partition data that exists in the lake but is expired in Fluss (apache#2197)
1 parent 8d85f67 commit 0964d86

File tree

13 files changed

+846
-49
lines changed

13 files changed

+846
-49
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
21+
import org.apache.fluss.metadata.PartitionInfo;
22+
import org.apache.fluss.predicate.Predicate;
23+
import org.apache.fluss.utils.CloseableIterator;
24+
25+
import java.io.ByteArrayInputStream;
26+
import java.io.ByteArrayOutputStream;
27+
import java.io.DataInputStream;
28+
import java.io.DataOutputStream;
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
/** A testing implementation of {@link LakeSource}. */
34+
public class TestingLakeSource implements LakeSource<LakeSplit> {
35+
36+
// bucket num of source table
37+
private final int bucketNum;
38+
39+
// partition infos of partitions contain lake splits
40+
private final List<PartitionInfo> partitionInfos;
41+
42+
public TestingLakeSource() {
43+
this.bucketNum = 0;
44+
this.partitionInfos = null;
45+
}
46+
47+
public TestingLakeSource(int bucketNum, List<PartitionInfo> partitionInfos) {
48+
this.bucketNum = bucketNum;
49+
this.partitionInfos = partitionInfos;
50+
}
51+
52+
@Override
53+
public void withProject(int[][] project) {}
54+
55+
@Override
56+
public void withLimit(int limit) {}
57+
58+
@Override
59+
public FilterPushDownResult withFilters(List<Predicate> predicates) {
60+
return null;
61+
}
62+
63+
@Override
64+
public Planner<LakeSplit> createPlanner(PlannerContext context) throws IOException {
65+
return new TestingPlanner(bucketNum, partitionInfos);
66+
}
67+
68+
@Override
69+
public RecordReader createRecordReader(ReaderContext<LakeSplit> context) throws IOException {
70+
return CloseableIterator::emptyIterator;
71+
}
72+
73+
@Override
74+
public SimpleVersionedSerializer<LakeSplit> getSplitSerializer() {
75+
return new SimpleVersionedSerializer<LakeSplit>() {
76+
77+
@Override
78+
public int getVersion() {
79+
return 0;
80+
}
81+
82+
@Override
83+
public byte[] serialize(LakeSplit split) throws IOException {
84+
if (split instanceof TestingLakeSplit) {
85+
TestingLakeSplit testingSplit = (TestingLakeSplit) split;
86+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
87+
try (DataOutputStream dos = new DataOutputStream(baos)) {
88+
// Serialize bucket
89+
dos.writeInt(testingSplit.bucket());
90+
91+
// Serialize partition list
92+
List<String> partition = testingSplit.partition();
93+
if (partition == null) {
94+
dos.writeInt(-1);
95+
} else {
96+
dos.writeInt(partition.size());
97+
for (String part : partition) {
98+
// Write a boolean flag to indicate if the string is null
99+
dos.writeBoolean(part != null);
100+
if (part != null) {
101+
dos.writeUTF(part);
102+
}
103+
}
104+
}
105+
}
106+
return baos.toByteArray();
107+
}
108+
throw new IOException("Unsupported split type: " + split.getClass().getName());
109+
}
110+
111+
@Override
112+
public LakeSplit deserialize(int version, byte[] serialized) throws IOException {
113+
if (version != 0) {
114+
throw new IOException("Unsupported version: " + version);
115+
}
116+
117+
try (DataInputStream dis =
118+
new DataInputStream(new ByteArrayInputStream(serialized))) {
119+
// Deserialize bucket
120+
int bucket = dis.readInt();
121+
122+
// Deserialize partition list
123+
int partitionSize = dis.readInt();
124+
List<String> partition;
125+
if (partitionSize < 0) {
126+
partition = null;
127+
} else {
128+
partition = new ArrayList<>(partitionSize);
129+
for (int i = 0; i < partitionSize; i++) {
130+
// Read boolean flag to determine if the string is null
131+
boolean isNotNull = dis.readBoolean();
132+
String part = isNotNull ? dis.readUTF() : null;
133+
partition.add(part);
134+
}
135+
}
136+
137+
return new TestingLakeSplit(bucket, partition);
138+
}
139+
}
140+
};
141+
}
142+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import java.util.List;
21+
22+
/** A testing implementation of {@link LakeSplit}. */
23+
public class TestingLakeSplit implements LakeSplit {
24+
25+
private final int bucket;
26+
private final List<String> partition;
27+
28+
public TestingLakeSplit(int bucket, List<String> partition) {
29+
this.bucket = bucket;
30+
this.partition = partition;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return "TestingLakeSplit{" + "bucket=" + bucket + ", partition=" + partition + '}';
36+
}
37+
38+
@Override
39+
public int bucket() {
40+
return bucket;
41+
}
42+
43+
@Override
44+
public List<String> partition() {
45+
return partition;
46+
}
47+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import org.apache.fluss.metadata.PartitionInfo;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
/** A testing implementation of {@link Planner}. */
27+
public class TestingPlanner implements Planner<LakeSplit> {
28+
29+
private final int bucketNum;
30+
private final List<PartitionInfo> partitionInfos;
31+
32+
public TestingPlanner(int bucketNum, List<PartitionInfo> partitionInfos) {
33+
this.bucketNum = bucketNum;
34+
this.partitionInfos = partitionInfos;
35+
}
36+
37+
@Override
38+
public List<LakeSplit> plan() throws IOException {
39+
List<LakeSplit> splits = new ArrayList<>();
40+
41+
for (PartitionInfo partitionInfo : partitionInfos) {
42+
for (int i = 0; i < bucketNum; i++) {
43+
splits.add(
44+
new TestingLakeSplit(
45+
i, partitionInfo.getResolvedPartitionSpec().getPartitionValues()));
46+
}
47+
}
48+
49+
return splits;
50+
}
51+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.fluss.config.ConfigOptions;
2525
import org.apache.fluss.config.Configuration;
2626
import org.apache.fluss.flink.lake.LakeSplitGenerator;
27+
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
28+
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
2729
import org.apache.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
2830
import org.apache.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
2931
import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -83,7 +85,8 @@
8385
* <p>The enumerator is responsible for:
8486
*
8587
* <ul>
86-
* <li>Get the all splits(snapshot split + log split) for a table of Fluss to be read.
88+
* <li>Get the all splits(lake split + kv snapshot split + log split) for a table of Fluss to be
89+
* read.
8790
* <li>Assign the splits to readers with the guarantee that the splits belong to the same bucket
8891
* will be assigned to same reader.
8992
* </ul>
@@ -110,10 +113,15 @@ public class FlinkSourceEnumerator
110113
*
111114
* <p>It's mainly used to help enumerator to broadcast the partition removed event to the
112115
* readers when partitions is dropped.
116+
*
117+
* <p>If an assigned partition exists only in the lake and has already expired in Fluss, it will
118+
* remain here indefinitely and will not be removed. However, considering that only a small
119+
* number of such lake-only partitions might exist during the initial startup, and they consume
120+
* minimal memory, this issue is being ignored for now.
113121
*/
114122
private final Map<Long, String> assignedPartitions;
115123

116-
/** buckets that have been assigned to readers. */
124+
/** Buckets that have been assigned to readers. */
117125
private final Set<TableBucket> assignedTableBuckets;
118126

119127
@Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;
@@ -222,12 +230,12 @@ public FlinkSourceEnumerator(
222230
this.context = checkNotNull(context);
223231
this.pendingSplitAssignment = new HashMap<>();
224232
this.assignedTableBuckets = new HashSet<>(assignedTableBuckets);
225-
this.startingOffsetsInitializer = startingOffsetsInitializer;
226233
this.assignedPartitions = new HashMap<>(assignedPartitions);
227234
this.pendingHybridLakeFlussSplits =
228235
pendingHybridLakeFlussSplits == null
229236
? null
230237
: new LinkedList<>(pendingHybridLakeFlussSplits);
238+
this.startingOffsetsInitializer = startingOffsetsInitializer;
231239
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
232240
this.streaming = streaming;
233241
this.partitionFilters = partitionFilters;
@@ -258,6 +266,10 @@ public void start() {
258266
// we'll need to consider lake splits
259267
List<SourceSplitBase> hybridLakeFlussSplits = generateHybridLakeFlussSplits();
260268
if (hybridLakeFlussSplits != null) {
269+
LOG.info(
270+
"Generated {} hybrid lake splits for table {}.",
271+
hybridLakeFlussSplits.size(),
272+
tablePath);
261273
// handle hybrid lake fluss splits firstly
262274
handleSplitsAdd(hybridLakeFlussSplits, null);
263275
}
@@ -554,7 +566,8 @@ private List<SourceSplitBase> getSnapshotAndLogSplits(
554566
// hybrid snapshot log split;
555567
OptionalLong logOffset = snapshots.getLogOffset(bucketId);
556568
checkState(
557-
logOffset.isPresent(), "Log offset should be present if snapshot id is.");
569+
logOffset.isPresent(),
570+
"Log offset should be present if snapshot id is present.");
558571
splits.add(
559572
new HybridSnapshotLogSplit(
560573
tb, partitionName, snapshotId.getAsLong(), logOffset.getAsLong()));
@@ -616,6 +629,7 @@ private List<SourceSplitBase> generateHybridLakeFlussSplits() {
616629
// should be restored from checkpoint, shouldn't
617630
// list splits again
618631
if (pendingHybridLakeFlussSplits != null) {
632+
LOG.info("Still have pending lake fluss splits, shouldn't list splits again.");
619633
return pendingHybridLakeFlussSplits;
620634
}
621635
try {
@@ -664,9 +678,28 @@ private void handlePartitionsRemoved(Collection<Partition> removedPartitionInfo)
664678
pendingSplitAssignment.forEach(
665679
(reader, splits) ->
666680
splits.removeIf(
667-
split ->
668-
removedPartitionsMap.containsKey(
669-
split.getTableBucket().getPartitionId())));
681+
split -> {
682+
// Never remove LakeSnapshotSplit, because during union reads,
683+
// data from the lake must still be read even if the partition
684+
// has already expired in Fluss.
685+
if (split instanceof LakeSnapshotSplit) {
686+
return false;
687+
}
688+
689+
// Similar to LakeSnapshotSplit, if it contains any lake split,
690+
// never remove it; otherwise, it can be removed when the Fluss
691+
// partition expires.
692+
if (split instanceof LakeSnapshotAndFlussLogSplit) {
693+
LakeSnapshotAndFlussLogSplit hybridSplit =
694+
(LakeSnapshotAndFlussLogSplit) split;
695+
if (!hybridSplit.isLakeSplitFinished()) {
696+
return false;
697+
}
698+
}
699+
700+
return removedPartitionsMap.containsKey(
701+
split.getTableBucket().getPartitionId());
702+
}));
670703

671704
// send partition removed event to all readers
672705
PartitionsRemovedEvent event = new PartitionsRemovedEvent(removedPartitionsMap);
@@ -863,6 +896,11 @@ Map<Long, String> getAssignedPartitions() {
863896
return assignedPartitions;
864897
}
865898

899+
@VisibleForTesting
900+
Map<Integer, List<SourceSplitBase>> getPendingSplitAssignment() {
901+
return pendingSplitAssignment;
902+
}
903+
866904
@Override
867905
public void addSplitsBack(List<SourceSplitBase> splits, int subtaskId) {
868906
LOG.debug("Flink Source Enumerator adds splits back: {}", splits);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.connector.source.SourceEvent;
2121

2222
import java.util.Map;
23+
import java.util.Objects;
2324

2425
/**
2526
* A source event to represent partitions is removed to send from enumerator to reader.
@@ -41,6 +42,20 @@ public Map<Long, String> getRemovedPartitions() {
4142
return removedPartitions;
4243
}
4344

45+
@Override
46+
public boolean equals(Object o) {
47+
if (o == null || getClass() != o.getClass()) {
48+
return false;
49+
}
50+
PartitionsRemovedEvent that = (PartitionsRemovedEvent) o;
51+
return Objects.equals(removedPartitions, that.removedPartitions);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hashCode(removedPartitions);
57+
}
58+
4459
@Override
4560
public String toString() {
4661
return "PartitionsRemovedEvent{" + "removedPartitions=" + removedPartitions + '}';

0 commit comments

Comments
 (0)