Skip to content

Commit d029050

Browse files
authored
[lake] Introduce Union read lake base interfaces (#1480)
1 parent 74ab1f1 commit d029050

File tree

13 files changed

+400
-0
lines changed

13 files changed

+400
-0
lines changed

fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStorage.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package com.alibaba.fluss.lake.lakestorage;
1919

2020
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.lake.source.LakeSource;
2122
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
23+
import com.alibaba.fluss.metadata.TablePath;
2224

2325
/**
2426
* The LakeStorage interface defines how to implement lakehouse storage system such as Paimon and
@@ -38,4 +40,14 @@ public interface LakeStorage {
3840

3941
/** Create lake catalog. */
4042
LakeCatalog createLakeCatalog();
43+
44+
/**
45+
* Creates a lake source instance for reading lakehouse data from the specified table path. The
46+
* lake source provides capabilities for split planning and record reading, enabling efficient
47+
* distributed processing of lakehouse data.
48+
*
49+
* @param tablePath the logical path identifying the table in the lakehouse storage
50+
* @return a configured lake source instance for the specified table
51+
*/
52+
LakeSource<?> createLakeSource(TablePath tablePath);
4153
}

fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.alibaba.fluss.config.Configuration;
2121
import com.alibaba.fluss.exception.TableAlreadyExistException;
22+
import com.alibaba.fluss.lake.source.LakeSource;
2223
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
2324
import com.alibaba.fluss.metadata.TableDescriptor;
2425
import com.alibaba.fluss.metadata.TablePath;
@@ -116,5 +117,10 @@ public LakeCatalog createLakeCatalog() {
116117
return new ClassLoaderFixingLakeCatalog(inner.createLakeCatalog(), loader);
117118
}
118119
}
120+
121+
@Override
122+
public LakeSource<?> createLakeSource(TablePath tablePath) {
123+
return inner.createLakeSource(tablePath);
124+
}
119125
}
120126
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.source;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
22+
import com.alibaba.fluss.predicate.Predicate;
23+
24+
import java.io.IOException;
25+
import java.io.Serializable;
26+
import java.util.List;
27+
28+
/**
29+
* A generic interface for lake data sources that defines how to plan splits and read data. Any data
30+
* lake format supporting reading from data tiered in lake as Fluss records should implement this
31+
* interface.
32+
*
33+
* <p>This interface provides methods for projection, filtering, limiting to enable query engine to
34+
* push to lake source. Implementations must ensure that split planning and record reading
35+
* operations properly account for these pushed-down operations during execution.
36+
*
37+
* @param <Split> The type of data split, which must extend {@link LakeSplit}
38+
* @since 0.8
39+
*/
40+
@PublicEvolving
41+
public interface LakeSource<Split extends LakeSplit> extends Serializable {
42+
43+
/**
44+
* Applies column projection to the data source. it provides the field index paths that should
45+
* be used for a projection. The indices are 0-based and support fields within (possibly nested)
46+
* structures.
47+
*
48+
* <p>For nested, given the following SQL, CREATE TABLE t (i INT, r ROW < d DOUBLE, b BOOLEAN>,
49+
* s STRING); SELECT s, r.d FROM t; the project will be [[2], [1, 0]]
50+
*/
51+
void withProject(int[][] project);
52+
53+
/** Applies a row limit to the lake source. */
54+
void withLimit(int limit);
55+
56+
/** Applies filters to the lake source. */
57+
FilterPushDownResult withFilters(List<Predicate> predicates);
58+
59+
/**
60+
* Creates a planner for plan splits to be read.
61+
*
62+
* @param context The planning context providing necessary planning information
63+
* @return A planner instance for this lake source
64+
* @throws IOException if an error occurs during planner creation
65+
*/
66+
Planner<Split> createPlanner(PlannerContext context) throws IOException;
67+
68+
/**
69+
* Creates a record reader for reading data from the lake source for the specified split.
70+
*
71+
* @param context The reader context containing the split to be read
72+
* @return A record reader instance for the given split
73+
* @throws IOException if an error occurs during reader creation
74+
*/
75+
RecordReader createRecordReader(ReaderContext<Split> context) throws IOException;
76+
77+
/**
78+
* Returns the serializer for the data split, used to transfer split information in distributed
79+
* environment.
80+
*
81+
* @return The serializer for the split
82+
*/
83+
SimpleVersionedSerializer<Split> getSplitSerializer();
84+
85+
/**
86+
* Context interface for planners, providing the snapshot id of the table in data-lake to plan
87+
* splits.
88+
*/
89+
interface PlannerContext extends Serializable {
90+
long snapshotId();
91+
}
92+
93+
/**
94+
* Context interface for record readers, providing access to the lake split being read.
95+
*
96+
* @param <Split> The type of lake split
97+
*/
98+
interface ReaderContext<Split extends LakeSplit> extends Serializable {
99+
Split lakeSplit();
100+
}
101+
102+
/**
103+
* Represents the result of a filter push down operation to lake source, indicating which
104+
* predicates were accepted by the source and which remain to be evaluated.
105+
*
106+
* @since 0.8
107+
*/
108+
@PublicEvolving
109+
final class FilterPushDownResult {
110+
private final List<Predicate> acceptedPredicates;
111+
private final List<Predicate> remainingPredicates;
112+
113+
private FilterPushDownResult(
114+
List<Predicate> acceptedPredicates, List<Predicate> remainingPredicates) {
115+
this.acceptedPredicates = acceptedPredicates;
116+
this.remainingPredicates = remainingPredicates;
117+
}
118+
119+
/**
120+
* Creates a new FilterPushDownResult instance.
121+
*
122+
* @param acceptedPredicates The accepted predicates
123+
* @param remainingPredicates The remaining predicates
124+
* @return A new FilterPushDownResult instance
125+
*/
126+
public static FilterPushDownResult of(
127+
List<Predicate> acceptedPredicates, List<Predicate> remainingPredicates) {
128+
return new FilterPushDownResult(acceptedPredicates, remainingPredicates);
129+
}
130+
131+
/**
132+
* Returns the predicates that were accepted by the source.
133+
*
134+
* @return The list of accepted predicates
135+
*/
136+
public List<Predicate> acceptedPredicates() {
137+
return acceptedPredicates;
138+
}
139+
140+
/**
141+
* Returns the predicates that remain to be evaluated.
142+
*
143+
* @return The list of remaining predicates
144+
*/
145+
public List<Predicate> remainingPredicates() {
146+
return remainingPredicates;
147+
}
148+
}
149+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.source;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
22+
import java.util.List;
23+
24+
/**
25+
* Represents a logical partition or segment of data in data-lake.
26+
*
27+
* @since 0.8
28+
*/
29+
@PublicEvolving
30+
public interface LakeSplit {
31+
32+
/**
33+
* Returns the bucket id for this data split. Any data split in lake must belong to a Fluss
34+
* bucket. The bucket id is used to aggregate splits that in same Fluss bucket into same reader
35+
* in bucket-aware table, such primary key table, log table with pre-defined bucket keys. If
36+
* it's not bucket-aware table, it's also feasible to return -1 directly for all data splits.
37+
*
38+
* @return the bucket id
39+
*/
40+
int bucket();
41+
42+
/**
43+
* Returns the hierarchical partition values for this split, or an empty list if the split
44+
* doesn't belong to a specific partition in non-partitioned table.
45+
*
46+
* <p>The returned list represents the complete partition path, with each element corresponding
47+
* to one level of the partitioning hierarchy in order. For example, in a table partitioned by
48+
* {@code dt=20250101/hr=12}, this method would return {@code ["20250101", "12"]}.
49+
*
50+
* <p>The list size should match the table's partition column count, and each element's position
51+
* corresponds to the declared partition column order. Values should be in their
52+
* string-represented form as they would appear in the filesystem path.
53+
*
54+
* @return the resolved partition values specification, or {@code null} if this split doesn't
55+
* belong to a specific partition in non-partitioned table.
56+
*/
57+
List<String> partition();
58+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.source;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
22+
import java.io.IOException;
23+
import java.util.List;
24+
25+
/**
26+
* A planner interface for generating readable splits for lake data sources.
27+
*
28+
* <p>Implementations of this interface are responsible for determining how to divide the data into
29+
* manageable splits that can be read in parallel. The planning should consider the pushed-down
30+
* optimizations (filters, limits, etc.) from {@link LakeSource}.
31+
*
32+
* @param <Split> the type of data split this planner generates, must extend {@link LakeSplit}
33+
* @since 0.8
34+
*/
35+
@PublicEvolving
36+
public interface Planner<Split extends LakeSplit> {
37+
38+
/**
39+
* Plans and generates a list of readable data splits in parallel.
40+
*
41+
* @return the list of readable data splits
42+
* @throws IOException if an I/O error occurs
43+
*/
44+
List<Split> plan() throws IOException;
45+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.source;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.record.LogRecord;
22+
import com.alibaba.fluss.utils.CloseableIterator;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* An interface for reading records from {@link LakeSplit}.
28+
*
29+
* <p>Implementations of this interface provide an iterator-style access to records, allowing
30+
* efficient sequential reading of potentially large datasets without loading all data into memory
31+
* at once. The reading should consider the pushed-down optimizations (project, filters, limits,
32+
* etc.) from {@link LakeSource}.
33+
*
34+
* @since 0.8
35+
*/
36+
@PublicEvolving
37+
public interface RecordReader {
38+
39+
/**
40+
* Read a {@link LakeSplit} into a closeable iterator.
41+
*
42+
* @return the closeable iterator of records
43+
* @throws IOException if an I/O error occurs
44+
*/
45+
CloseableIterator<LogRecord> read() throws IOException;
46+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.source;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.row.InternalRow;
22+
23+
import java.util.Comparator;
24+
25+
/**
26+
* A specialized {@link RecordReader} that produces records in a defined sorted order.
27+
*
28+
* <p>Extends the basic record reading capability with sorting semantics, ensuring that records are
29+
* returned according to a specified ordering.
30+
*
31+
* <p>Implementations must guarantee that the {@link #read()} method returns records in the order
32+
* defined by the comparator from {@link #order()}.
33+
*
34+
* <p>Note: This is mainly used for union read primary key table since we will do sort merge records
35+
* in lake and fluss. The records in primary key table for lake may should implement this method for
36+
* union read with a better performance.
37+
*
38+
* @since 0.8
39+
*/
40+
@PublicEvolving
41+
public interface SortedRecordReader extends RecordReader {
42+
43+
/**
44+
* Returns the comparator that defines the sort order of the records.
45+
*
46+
* @return a non-null comparator defining the sort order of the records
47+
*/
48+
Comparator<InternalRow> order();
49+
}

0 commit comments

Comments
 (0)