Skip to content

Commit d78c6a1

Browse files
committed
[lake/source] Introduce TieringSource and TieringSourceEnumerator
1 parent 3b96888 commit d78c6a1

File tree

12 files changed

+2081
-2
lines changed

12 files changed

+2081
-2
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,14 @@ public long getSnapshotId() {
4747
public Map<TableBucket, Long> getTableBucketsOffset() {
4848
return tableBucketsOffset;
4949
}
50+
51+
@Override
52+
public String toString() {
53+
return "LakeSnapshot{"
54+
+ "snapshotId="
55+
+ snapshotId
56+
+ ", tableBucketsOffset="
57+
+ tableBucketsOffset
58+
+ '}';
59+
}
5060
}

fluss-common/src/main/java/com/alibaba/fluss/metadata/TableBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ public TableBucket(long tableId, int bucket) {
5757

5858
public TableBucket(long tableId, @Nullable Long partitionId, int bucket) {
5959
this.tableId = tableId;
60-
this.bucket = bucket;
6160
this.partitionId = partitionId;
61+
this.bucket = bucket;
6262
}
6363

6464
public int getBucket() {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.source;
18+
19+
import com.alibaba.fluss.config.Configuration;
20+
import com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator;
21+
import com.alibaba.fluss.flink.tiering.source.split.TieringSplit;
22+
import com.alibaba.fluss.flink.tiering.source.split.TieringSplitSerializer;
23+
import com.alibaba.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
24+
import com.alibaba.fluss.flink.tiering.source.state.TieringSourceEnumeratorStateSerializer;
25+
import com.alibaba.fluss.lakehouse.writer.LakeTieringFactory;
26+
27+
import org.apache.flink.api.connector.source.Boundedness;
28+
import org.apache.flink.api.connector.source.Source;
29+
import org.apache.flink.api.connector.source.SourceReader;
30+
import org.apache.flink.api.connector.source.SourceReaderContext;
31+
import org.apache.flink.api.connector.source.SplitEnumerator;
32+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
33+
import org.apache.flink.core.io.SimpleVersionedSerializer;
34+
35+
import static com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
36+
37+
/**
38+
* The flink source implementation for tiering data from Fluss to downstream lake.
39+
*
40+
* @param <WriteResult> the type of write lake result.
41+
*/
42+
public class TieringSource<WriteResult>
43+
implements Source<
44+
TableBucketWriteResult<WriteResult>, TieringSplit, TieringSourceEnumeratorState> {
45+
46+
private final Configuration flussConf;
47+
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
48+
private final long pollTieringTableIntervalMs;
49+
50+
public TieringSource(
51+
Configuration flussConf,
52+
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
53+
long pollTieringTableIntervalMs) {
54+
this.flussConf = flussConf;
55+
this.lakeTieringFactory = lakeTieringFactory;
56+
this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
57+
}
58+
59+
@Override
60+
public Boundedness getBoundedness() {
61+
return Boundedness.CONTINUOUS_UNBOUNDED;
62+
}
63+
64+
@Override
65+
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> createEnumerator(
66+
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext) throws Exception {
67+
return new TieringSourceEnumerator(
68+
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
69+
}
70+
71+
@Override
72+
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> restoreEnumerator(
73+
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext,
74+
TieringSourceEnumeratorState tieringSourceEnumeratorState)
75+
throws Exception {
76+
// stateless operator
77+
return new TieringSourceEnumerator(
78+
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
79+
}
80+
81+
@Override
82+
public SimpleVersionedSerializer<TieringSplit> getSplitSerializer() {
83+
return TieringSplitSerializer.INSTANCE;
84+
}
85+
86+
@Override
87+
public SimpleVersionedSerializer<TieringSourceEnumeratorState>
88+
getEnumeratorCheckpointSerializer() {
89+
return TieringSourceEnumeratorStateSerializer.INSTANCE;
90+
}
91+
92+
@Override
93+
public SourceReader<TableBucketWriteResult<WriteResult>, TieringSplit> createReader(
94+
SourceReaderContext sourceReaderContext) throws Exception {
95+
return new TieringSourceReader<>(sourceReaderContext, flussConf, lakeTieringFactory);
96+
}
97+
98+
/** Builder for {@link TieringSource}. */
99+
public static class Builder<WriteResult> {
100+
101+
private final Configuration flussConf;
102+
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
103+
private long pollTieringTableIntervalMs =
104+
POLL_TIERING_TABLE_INTERVAL.defaultValue().toMillis();
105+
106+
public Builder(
107+
Configuration flussConf, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
108+
this.flussConf = flussConf;
109+
this.lakeTieringFactory = lakeTieringFactory;
110+
}
111+
112+
public Builder<WriteResult> withPollTieringTableIntervalMs(long pollTieringTableInterval) {
113+
this.pollTieringTableIntervalMs = pollTieringTableInterval;
114+
return this;
115+
}
116+
117+
public TieringSource<WriteResult> build() {
118+
return new TieringSource<>(flussConf, lakeTieringFactory, pollTieringTableIntervalMs);
119+
}
120+
}
121+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.source;
18+
19+
import com.alibaba.fluss.config.ConfigOption;
20+
21+
import java.time.Duration;
22+
23+
import static com.alibaba.fluss.config.ConfigBuilder.key;
24+
25+
/** Configuration options for the {@link TieringSource}. */
26+
public class TieringSourceOptions {
27+
28+
public static final ConfigOption<Duration> POLL_TIERING_TABLE_INTERVAL =
29+
key("tiering.poll.table.interval")
30+
.durationType()
31+
.defaultValue(Duration.ofSeconds(30))
32+
.withDescription(
33+
"The fixed interval to request tiering table from Fluss cluster, by default 30 seconds.");
34+
}

0 commit comments

Comments
 (0)