Skip to content

Commit e75f628

Browse files
committed
[flink] Union read support read partition data for the partition exists in the lake but not in Fluss
1 parent 3815e9e commit e75f628

File tree

21 files changed

+1209
-88
lines changed

21 files changed

+1209
-88
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
21+
import org.apache.fluss.metadata.PartitionInfo;
22+
import org.apache.fluss.predicate.Predicate;
23+
import org.apache.fluss.utils.CloseableIterator;
24+
25+
import java.io.ByteArrayInputStream;
26+
import java.io.ByteArrayOutputStream;
27+
import java.io.DataInputStream;
28+
import java.io.DataOutputStream;
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
/** A testing implementation of {@link LakeSource}. */
34+
public class TestingLakeSource implements LakeSource<LakeSplit> {
35+
36+
private final int bucketNum;
37+
private final List<PartitionInfo> partitionInfos;
38+
39+
public TestingLakeSource() {
40+
this.bucketNum = 0;
41+
this.partitionInfos = null;
42+
}
43+
44+
public TestingLakeSource(int bucketNum, List<PartitionInfo> partitionInfos) {
45+
this.bucketNum = bucketNum;
46+
this.partitionInfos = partitionInfos;
47+
}
48+
49+
@Override
50+
public void withProject(int[][] project) {}
51+
52+
@Override
53+
public void withLimit(int limit) {}
54+
55+
@Override
56+
public FilterPushDownResult withFilters(List<Predicate> predicates) {
57+
return null;
58+
}
59+
60+
@Override
61+
public Planner<LakeSplit> createPlanner(PlannerContext context) throws IOException {
62+
return new TestingPlanner(bucketNum, partitionInfos);
63+
}
64+
65+
@Override
66+
public RecordReader createRecordReader(ReaderContext<LakeSplit> context) throws IOException {
67+
return CloseableIterator::emptyIterator;
68+
}
69+
70+
@Override
71+
public SimpleVersionedSerializer<LakeSplit> getSplitSerializer() {
72+
return new SimpleVersionedSerializer<LakeSplit>() {
73+
74+
@Override
75+
public int getVersion() {
76+
return 0;
77+
}
78+
79+
@Override
80+
public byte[] serialize(LakeSplit split) throws IOException {
81+
if (split instanceof TestingLakeSplit) {
82+
TestingLakeSplit testingSplit = (TestingLakeSplit) split;
83+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
84+
try (DataOutputStream dos = new DataOutputStream(baos)) {
85+
// Serialize bucket
86+
dos.writeInt(testingSplit.bucket());
87+
88+
// Serialize partition list
89+
List<String> partition = testingSplit.partition();
90+
if (partition == null) {
91+
dos.writeInt(-1);
92+
} else {
93+
dos.writeInt(partition.size());
94+
for (String part : partition) {
95+
// Write a boolean flag to indicate if the string is null
96+
dos.writeBoolean(part != null);
97+
if (part != null) {
98+
dos.writeUTF(part);
99+
}
100+
}
101+
}
102+
}
103+
return baos.toByteArray();
104+
}
105+
throw new IOException("Unsupported split type: " + split.getClass().getName());
106+
}
107+
108+
@Override
109+
public LakeSplit deserialize(int version, byte[] serialized) throws IOException {
110+
if (version != 0) {
111+
throw new IOException("Unsupported version: " + version);
112+
}
113+
114+
try (DataInputStream dis =
115+
new DataInputStream(new ByteArrayInputStream(serialized))) {
116+
// Deserialize bucket
117+
int bucket = dis.readInt();
118+
119+
// Deserialize partition list
120+
int partitionSize = dis.readInt();
121+
List<String> partition;
122+
if (partitionSize < 0) {
123+
partition = null;
124+
} else {
125+
partition = new ArrayList<>(partitionSize);
126+
for (int i = 0; i < partitionSize; i++) {
127+
// Read boolean flag to determine if the string is null
128+
boolean isNotNull = dis.readBoolean();
129+
String part = isNotNull ? dis.readUTF() : null;
130+
partition.add(part);
131+
}
132+
}
133+
134+
return new TestingLakeSplit(bucket, partition);
135+
}
136+
}
137+
};
138+
}
139+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import java.util.List;
21+
22+
/** A testing implementation of {@link LakeSplit}. */
23+
public class TestingLakeSplit implements LakeSplit {
24+
25+
private final int bucket;
26+
private final List<String> partition;
27+
28+
public TestingLakeSplit(int bucket, List<String> partition) {
29+
this.bucket = bucket;
30+
this.partition = partition;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return "TestingLakeSplit{" + "bucket=" + bucket + ", partition=" + partition + '}';
36+
}
37+
38+
@Override
39+
public int bucket() {
40+
return bucket;
41+
}
42+
43+
@Override
44+
public List<String> partition() {
45+
return partition;
46+
}
47+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.source;
19+
20+
import org.apache.fluss.metadata.PartitionInfo;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
/** A testing implementation of {@link Planner}. */
27+
public class TestingPlanner implements Planner<LakeSplit> {
28+
29+
private final int bucketNum;
30+
private final List<PartitionInfo> partitionInfos;
31+
32+
public TestingPlanner(int bucketNum, List<PartitionInfo> partitionInfos) {
33+
this.bucketNum = bucketNum;
34+
this.partitionInfos = partitionInfos;
35+
}
36+
37+
@Override
38+
public List<LakeSplit> plan() throws IOException {
39+
List<LakeSplit> splits = new ArrayList<>();
40+
41+
for (PartitionInfo partitionInfo : partitionInfos) {
42+
for (int i = 0; i < bucketNum; i++) {
43+
splits.add(
44+
new TestingLakeSplit(
45+
i, partitionInfo.getResolvedPartitionSpec().getPartitionValues()));
46+
}
47+
}
48+
49+
return splits;
50+
}
51+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ private List<SourceSplitBase> generatePartitionTableSplit(
187187
lakeSplitsOfPartition,
188188
partitionName,
189189
// now, we can't get partition id for the partition only
190-
// in lake, set them to a arbitrary partition id, but
190+
// in lake, set them to an arbitrary partition id, but
191191
// make sure different partition have different partition id
192192
// to enable different partition can be distributed to different
193193
// tasks
@@ -235,7 +235,7 @@ private List<SourceSplitBase> generateSplit(
235235
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
236236
Long stoppingOffset = bucketEndOffset.get(bucket);
237237
if (snapshotLogOffset == null) {
238-
// no any data commit to this bucket, scan from fluss log
238+
// no data commit to this bucket, scan from fluss log
239239
splits.add(
240240
new LogSplit(
241241
tableBucket, partitionName, EARLIEST_OFFSET, stoppingOffset));

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252

5353
/** Flink source for Fluss. */
5454
public class FlinkSource<OUT>
55-
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable {
55+
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable<OUT> {
5656
private static final long serialVersionUID = 1L;
5757

5858
private final Configuration flussConf;
@@ -155,6 +155,8 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
155155
splitEnumeratorContext,
156156
sourceEnumeratorState.getAssignedBuckets(),
157157
sourceEnumeratorState.getAssignedPartitions(),
158+
sourceEnumeratorState.getAssignedLakeBuckets(),
159+
sourceEnumeratorState.getAssignedLakePartitions(),
158160
sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
159161
offsetsInitializer,
160162
scanPartitionDiscoveryIntervalMs,

0 commit comments

Comments
 (0)