Skip to content

Commit a313d4f

Browse files
committed
[flink] Union read support read partition data for the partition exists in the lake but not in Fluss
1 parent 3815e9e commit a313d4f

File tree

13 files changed

+786
-48
lines changed

13 files changed

+786
-48
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
21+
import org.apache.fluss.metadata.PartitionInfo;
22+
import org.apache.fluss.predicate.Predicate;
23+
import org.apache.fluss.utils.CloseableIterator;
24+
25+
import java.io.ByteArrayInputStream;
26+
import java.io.ByteArrayOutputStream;
27+
import java.io.DataInputStream;
28+
import java.io.DataOutputStream;
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
/** A testing implementation of {@link LakeSource}. */
34+
public class TestingLakeSource implements LakeSource<LakeSplit> {
35+
36+
private final int bucketNum;
37+
private final List<PartitionInfo> partitionInfos;
38+
39+
public TestingLakeSource() {
40+
this.bucketNum = 0;
41+
this.partitionInfos = null;
42+
}
43+
44+
public TestingLakeSource(int bucketNum, List<PartitionInfo> partitionInfos) {
45+
this.bucketNum = bucketNum;
46+
this.partitionInfos = partitionInfos;
47+
}
48+
49+
@Override
50+
public void withProject(int[][] project) {}
51+
52+
@Override
53+
public void withLimit(int limit) {}
54+
55+
@Override
56+
public FilterPushDownResult withFilters(List<Predicate> predicates) {
57+
return null;
58+
}
59+
60+
@Override
61+
public Planner<LakeSplit> createPlanner(PlannerContext context) throws IOException {
62+
return new TestingPlanner(bucketNum, partitionInfos);
63+
}
64+
65+
@Override
66+
public RecordReader createRecordReader(ReaderContext<LakeSplit> context) throws IOException {
67+
return CloseableIterator::emptyIterator;
68+
}
69+
70+
@Override
71+
public SimpleVersionedSerializer<LakeSplit> getSplitSerializer() {
72+
return new SimpleVersionedSerializer<LakeSplit>() {
73+
74+
@Override
75+
public int getVersion() {
76+
return 0;
77+
}
78+
79+
@Override
80+
public byte[] serialize(LakeSplit split) throws IOException {
81+
if (split instanceof TestingLakeSplit) {
82+
TestingLakeSplit testingSplit = (TestingLakeSplit) split;
83+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
84+
try (DataOutputStream dos = new DataOutputStream(baos)) {
85+
// Serialize bucket
86+
dos.writeInt(testingSplit.bucket());
87+
88+
// Serialize partition list
89+
List<String> partition = testingSplit.partition();
90+
if (partition == null) {
91+
dos.writeInt(-1);
92+
} else {
93+
dos.writeInt(partition.size());
94+
for (String part : partition) {
95+
// Write a boolean flag to indicate if the string is null
96+
dos.writeBoolean(part != null);
97+
if (part != null) {
98+
dos.writeUTF(part);
99+
}
100+
}
101+
}
102+
}
103+
return baos.toByteArray();
104+
}
105+
throw new IOException("Unsupported split type: " + split.getClass().getName());
106+
}
107+
108+
@Override
109+
public LakeSplit deserialize(int version, byte[] serialized) throws IOException {
110+
if (version != 0) {
111+
throw new IOException("Unsupported version: " + version);
112+
}
113+
114+
try (DataInputStream dis =
115+
new DataInputStream(new ByteArrayInputStream(serialized))) {
116+
// Deserialize bucket
117+
int bucket = dis.readInt();
118+
119+
// Deserialize partition list
120+
int partitionSize = dis.readInt();
121+
List<String> partition;
122+
if (partitionSize < 0) {
123+
partition = null;
124+
} else {
125+
partition = new ArrayList<>(partitionSize);
126+
for (int i = 0; i < partitionSize; i++) {
127+
// Read boolean flag to determine if the string is null
128+
boolean isNotNull = dis.readBoolean();
129+
String part = isNotNull ? dis.readUTF() : null;
130+
partition.add(part);
131+
}
132+
}
133+
134+
return new TestingLakeSplit(bucket, partition);
135+
}
136+
}
137+
};
138+
}
139+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import java.util.List;
21+
22+
/** A testing implementation of {@link LakeSplit}. */
23+
public class TestingLakeSplit implements LakeSplit {
24+
25+
private final int bucket;
26+
private final List<String> partition;
27+
28+
public TestingLakeSplit(int bucket, List<String> partition) {
29+
this.bucket = bucket;
30+
this.partition = partition;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return "TestingLakeSplit{" + "bucket=" + bucket + ", partition=" + partition + '}';
36+
}
37+
38+
@Override
39+
public int bucket() {
40+
return bucket;
41+
}
42+
43+
@Override
44+
public List<String> partition() {
45+
return partition;
46+
}
47+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import org.apache.fluss.metadata.PartitionInfo;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
/** A testing implementation of {@link Planner}. */
27+
public class TestingPlanner implements Planner<LakeSplit> {
28+
29+
private final int bucketNum;
30+
private final List<PartitionInfo> partitionInfos;
31+
32+
public TestingPlanner(int bucketNum, List<PartitionInfo> partitionInfos) {
33+
this.bucketNum = bucketNum;
34+
this.partitionInfos = partitionInfos;
35+
}
36+
37+
@Override
38+
public List<LakeSplit> plan() throws IOException {
39+
List<LakeSplit> splits = new ArrayList<>();
40+
41+
for (PartitionInfo partitionInfo : partitionInfos) {
42+
for (int i = 0; i < bucketNum; i++) {
43+
splits.add(
44+
new TestingLakeSplit(
45+
i, partitionInfo.getResolvedPartitionSpec().getPartitionValues()));
46+
}
47+
}
48+
49+
return splits;
50+
}
51+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.config.ConfigOptions;
2525
import org.apache.fluss.config.Configuration;
2626
import org.apache.fluss.flink.lake.LakeSplitGenerator;
27+
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
2728
import org.apache.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
2829
import org.apache.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
2930
import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -83,7 +84,8 @@
8384
* <p>The enumerator is responsible for:
8485
*
8586
* <ul>
86-
* <li>Get the all splits(snapshot split + log split) for a table of Fluss to be read.
87+
* <li>Get the all splits(lake split + snapshot split + log split) for a table of Fluss to be
88+
* read.
8789
* <li>Assign the splits to readers with the guarantee that the splits belong to the same bucket
8890
* will be assigned to same reader.
8991
* </ul>
@@ -110,10 +112,15 @@ public class FlinkSourceEnumerator
110112
*
111113
* <p>It's mainly used to help enumerator to broadcast the partition removed event to the
112114
* readers when partitions is dropped.
115+
*
116+
* <p>If an assigned partition exists only in the lake and has already expired in Fluss, it will
117+
* remain here indefinitely and will not be removed. However, considering that only a small
118+
* number of such lake-only partitions might exist during the initial startup, and they consume
119+
* minimal memory, this issue is being ignored for now.
113120
*/
114121
private final Map<Long, String> assignedPartitions;
115122

116-
/** buckets that have been assigned to readers. */
123+
/** Buckets that have been assigned to readers. */
117124
private final Set<TableBucket> assignedTableBuckets;
118125

119126
@Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;
@@ -222,12 +229,12 @@ public FlinkSourceEnumerator(
222229
this.context = checkNotNull(context);
223230
this.pendingSplitAssignment = new HashMap<>();
224231
this.assignedTableBuckets = new HashSet<>(assignedTableBuckets);
225-
this.startingOffsetsInitializer = startingOffsetsInitializer;
226232
this.assignedPartitions = new HashMap<>(assignedPartitions);
227233
this.pendingHybridLakeFlussSplits =
228234
pendingHybridLakeFlussSplits == null
229235
? null
230236
: new LinkedList<>(pendingHybridLakeFlussSplits);
237+
this.startingOffsetsInitializer = startingOffsetsInitializer;
231238
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
232239
this.streaming = streaming;
233240
this.partitionFilters = partitionFilters;
@@ -258,6 +265,10 @@ public void start() {
258265
// we'll need to consider lake splits
259266
List<SourceSplitBase> hybridLakeFlussSplits = generateHybridLakeFlussSplits();
260267
if (hybridLakeFlussSplits != null) {
268+
LOG.info(
269+
"Generated {} hybrid lake splits for table {}.",
270+
hybridLakeFlussSplits.size(),
271+
tablePath);
261272
// handle hybrid lake fluss splits firstly
262273
handleSplitsAdd(hybridLakeFlussSplits, null);
263274
}
@@ -554,7 +565,8 @@ private List<SourceSplitBase> getSnapshotAndLogSplits(
554565
// hybrid snapshot log split;
555566
OptionalLong logOffset = snapshots.getLogOffset(bucketId);
556567
checkState(
557-
logOffset.isPresent(), "Log offset should be present if snapshot id is.");
568+
logOffset.isPresent(),
569+
"Log offset should be present if snapshot id is present.");
558570
splits.add(
559571
new HybridSnapshotLogSplit(
560572
tb, partitionName, snapshotId.getAsLong(), logOffset.getAsLong()));
@@ -616,6 +628,7 @@ private List<SourceSplitBase> generateHybridLakeFlussSplits() {
616628
// should be restored from checkpoint, shouldn't
617629
// list splits again
618630
if (pendingHybridLakeFlussSplits != null) {
631+
LOG.info("Still have pending lake fluss splits, shouldn't list splits again.");
619632
return pendingHybridLakeFlussSplits;
620633
}
621634
try {
@@ -664,9 +677,16 @@ private void handlePartitionsRemoved(Collection<Partition> removedPartitionInfo)
664677
pendingSplitAssignment.forEach(
665678
(reader, splits) ->
666679
splits.removeIf(
667-
split ->
668-
removedPartitionsMap.containsKey(
669-
split.getTableBucket().getPartitionId())));
680+
split -> {
681+
// Never remove LakeSnapshotSplit, because during union reads,
682+
// data from the lake partition must still be read even if the
683+
// partition has already expired in Fluss.
684+
if (split instanceof LakeSnapshotSplit) {
685+
return false;
686+
}
687+
return removedPartitionsMap.containsKey(
688+
split.getTableBucket().getPartitionId());
689+
}));
670690

671691
// send partition removed event to all readers
672692
PartitionsRemovedEvent event = new PartitionsRemovedEvent(removedPartitionsMap);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.connector.source.SourceEvent;
2121

2222
import java.util.Map;
23+
import java.util.Objects;
2324

2425
/**
2526
* A source event to represent partitions is removed to send from enumerator to reader.
@@ -41,6 +42,20 @@ public Map<Long, String> getRemovedPartitions() {
4142
return removedPartitions;
4243
}
4344

45+
@Override
46+
public boolean equals(Object o) {
47+
if (o == null || getClass() != o.getClass()) {
48+
return false;
49+
}
50+
PartitionsRemovedEvent that = (PartitionsRemovedEvent) o;
51+
return Objects.equals(removedPartitions, that.removedPartitions);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hashCode(removedPartitions);
57+
}
58+
4459
@Override
4560
public String toString() {
4661
return "PartitionsRemovedEvent{" + "removedPartitions=" + removedPartitions + '}';

0 commit comments

Comments
 (0)