Skip to content

Commit 8d93f56

Browse files
authored
[lake/flink] Introduce Fluss Lake tiering to chain all flink operators for tiering (#1062)
1 parent d96b62b commit 8d93f56

File tree

22 files changed

+1690
-36
lines changed

22 files changed

+1690
-36
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer.BucketOffsetsRetriever;
2323
import com.alibaba.fluss.metadata.TablePath;
2424

25+
import org.apache.flink.util.ExceptionUtils;
2526
import org.apache.flink.util.FlinkRuntimeException;
2627

2728
import javax.annotation.Nullable;
@@ -81,7 +82,8 @@ private Map<Integer, Long> listOffsets(
8182
"Interrupted while listing offsets for table buckets: " + buckets, e);
8283
} catch (ExecutionException e) {
8384
throw new FlinkRuntimeException(
84-
"Failed to list offsets for table buckets: " + buckets + " due to", e);
85+
"Failed to list offsets for table buckets: " + buckets + " due to",
86+
ExceptionUtils.stripExecutionException(e));
8587
}
8688
}
8789
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering;
18+
19+
import com.alibaba.fluss.config.ConfigOptions;
20+
import com.alibaba.fluss.config.Configuration;
21+
import com.alibaba.fluss.flink.tiering.committer.CommittableMessageTypeInfo;
22+
import com.alibaba.fluss.flink.tiering.committer.TieringCommitOperatorFactory;
23+
import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultTypeInfo;
24+
import com.alibaba.fluss.flink.tiering.source.TieringSource;
25+
import com.alibaba.fluss.lakehouse.lakestorage.LakeStorage;
26+
import com.alibaba.fluss.lakehouse.lakestorage.LakeStoragePlugin;
27+
import com.alibaba.fluss.lakehouse.lakestorage.LakeStoragePluginSetUp;
28+
import com.alibaba.fluss.lakehouse.writer.LakeTieringFactory;
29+
30+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
31+
import org.apache.flink.core.execution.JobClient;
32+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
33+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34+
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
35+
36+
import java.util.Collections;
37+
38+
import static com.alibaba.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
39+
import static com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
40+
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
41+
42+
/** The builder to build Flink lake tiering job. */
43+
public class LakeTieringJobBuilder {
44+
45+
private final StreamExecutionEnvironment env;
46+
private final Configuration flussConfig;
47+
private final Configuration dataLakeConfig;
48+
private final String dataLakeFormat;
49+
50+
private LakeTieringJobBuilder(
51+
StreamExecutionEnvironment env,
52+
Configuration flussConfig,
53+
Configuration dataLakeConfig,
54+
String dataLakeFormat) {
55+
this.env = checkNotNull(env);
56+
this.flussConfig = checkNotNull(flussConfig);
57+
this.dataLakeConfig = checkNotNull(dataLakeConfig);
58+
this.dataLakeFormat = checkNotNull(dataLakeFormat);
59+
}
60+
61+
public static LakeTieringJobBuilder newBuilder(
62+
StreamExecutionEnvironment env,
63+
Configuration flussConfig,
64+
Configuration dataLakeConfig,
65+
String dataLakeFormat) {
66+
return new LakeTieringJobBuilder(env, flussConfig, dataLakeConfig, dataLakeFormat);
67+
}
68+
69+
@SuppressWarnings({"rawtypes", "unchecked"})
70+
public JobClient build() throws Exception {
71+
// get the lake storage plugin
72+
LakeStoragePlugin lakeStoragePlugin =
73+
LakeStoragePluginSetUp.fromConfiguration(
74+
Configuration.fromMap(
75+
Collections.singletonMap(
76+
ConfigOptions.DATALAKE_FORMAT.key(), dataLakeFormat)),
77+
null);
78+
// create lake storage from configurations
79+
LakeStorage lakeStorage = checkNotNull(lakeStoragePlugin).createLakeStorage(dataLakeConfig);
80+
81+
LakeTieringFactory lakeTieringFactory = lakeStorage.createLakeTieringFactory();
82+
83+
// build tiering source
84+
TieringSource.Builder<?> tieringSourceBuilder =
85+
new TieringSource.Builder<>(flussConfig, lakeTieringFactory);
86+
if (flussConfig.get(POLL_TIERING_TABLE_INTERVAL) != null) {
87+
tieringSourceBuilder.withPollTieringTableIntervalMs(
88+
flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
89+
}
90+
TieringSource<?> tieringSource = tieringSourceBuilder.build();
91+
DataStreamSource<?> source =
92+
env.fromSource(
93+
tieringSource,
94+
WatermarkStrategy.noWatermarks(),
95+
"TieringSource",
96+
TableBucketWriteResultTypeInfo.of(
97+
() -> lakeTieringFactory.getWriteResultSerializer()));
98+
99+
source.getTransformation().setUid(TIERING_SOURCE_TRANSFORMATION_UID);
100+
101+
source.transform(
102+
"TieringCommitter",
103+
CommittableMessageTypeInfo.of(
104+
() -> lakeTieringFactory.getCommitableSerializer()),
105+
new TieringCommitOperatorFactory(flussConfig, lakeTieringFactory))
106+
.setParallelism(1)
107+
.setMaxParallelism(1)
108+
.sinkTo(new DiscardingSink());
109+
110+
return env.executeAsync();
111+
}
112+
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ public boolean isKeyType() {
8080
@Override
8181
public TypeSerializer<CommittableMessage<Committable>> createSerializer(
8282
ExecutionConfig executionConfig) {
83-
// no copy, so that data from writer is directly going into upstream operator while chaining
84-
SimpleVersionedSerializer<Committable> committableSerializer =
85-
committableSerializerFactory.get();
8683
return new SimpleVersionedSerializerTypeSerializerProxy<CommittableMessage<Committable>>(
8784
() ->
8885
new org.apache.flink.core.io.SimpleVersionedSerializer<
8986
CommittableMessage<Committable>>() {
87+
private final SimpleVersionedSerializer<Committable>
88+
committableSerializer = committableSerializerFactory.get();
89+
9090
@Override
9191
public int getVersion() {
9292
return committableSerializer.getVersion();
@@ -123,7 +123,7 @@ public CommittableMessage<Committable> copy(
123123

124124
@Override
125125
public String toString() {
126-
return "LakeCommittableTypeInfo";
126+
return "CommittableMessageTypeInfo";
127127
}
128128

129129
@Override

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alibaba.fluss.flink.tiering.committer;
1818

1919
import com.alibaba.fluss.config.Configuration;
20+
import com.alibaba.fluss.flink.tiering.event.FinishTieringEvent;
2021
import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult;
2122
import com.alibaba.fluss.flink.tiering.source.TieringSource;
2223
import com.alibaba.fluss.lakehouse.committer.LakeCommitter;
@@ -25,6 +26,8 @@
2526
import com.alibaba.fluss.metadata.TableBucket;
2627
import com.alibaba.fluss.metadata.TablePath;
2728

29+
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
30+
import org.apache.flink.runtime.source.event.SourceEventWrapper;
2831
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
2932
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
3033
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -67,6 +70,9 @@ public class TieringCommitOperator<WriteResult, Committable>
6770
private final LakeTieringFactory<WriteResult, Committable> lakeTieringFactory;
6871
private final FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter;
6972

73+
// gateway to send event to flink source coordinator
74+
private final OperatorEventGateway operatorEventGateway;
75+
7076
// tableid -> write results
7177
private final Map<Long, List<TableBucketWriteResult<WriteResult>>>
7278
collectedTableBucketWriteResults;
@@ -82,6 +88,10 @@ public TieringCommitOperator(
8288
parameters.getContainingTask(),
8389
parameters.getStreamConfig(),
8490
parameters.getOutput());
91+
operatorEventGateway =
92+
parameters
93+
.getOperatorEventDispatcher()
94+
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
8595
}
8696

8797
@Override
@@ -106,9 +116,9 @@ public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
106116
commitWriteResults(
107117
tableId, tableBucketWriteResult.tablePath(), committableWriteResults);
108118
collectedTableBucketWriteResults.remove(tableId);
109-
// todo: uncomment it in next pr // notify that the table id has been finished tier
110-
// operatorEventGateway.sendEventToCoordinator(
111-
// new SourceEventWrapper(new FinishTieringEvent(tableId)));
119+
// notify that the table id has been finished tier
120+
operatorEventGateway.sendEventToCoordinator(
121+
new SourceEventWrapper(new FinishTieringEvent(tableId)));
112122
// only emit when committable is not-null
113123
if (committable != null) {
114124
output.collect(new StreamRecord<>(new CommittableMessage<>(committable)));

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717
package com.alibaba.fluss.flink.tiering.committer;
1818

1919
import com.alibaba.fluss.config.Configuration;
20+
import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult;
2021
import com.alibaba.fluss.lakehouse.writer.LakeTieringFactory;
2122

2223
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
24+
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
2325
import org.apache.flink.streaming.api.operators.StreamOperator;
2426
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
2527

2628
/** The factory to create {@link TieringCommitOperator}. */
2729
public class TieringCommitOperatorFactory<WriteResult, Committable>
28-
extends AbstractStreamOperatorFactory<CommittableMessage<Committable>> {
30+
extends AbstractStreamOperatorFactory<CommittableMessage<Committable>>
31+
implements OneInputStreamOperatorFactory<
32+
TableBucketWriteResult<WriteResult>, CommittableMessage<Committable>> {
2933

3034
private final Configuration flussConfig;
3135
private final LakeTieringFactory<WriteResult, Committable> lakeTieringFactory;

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSource.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import com.alibaba.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
2424
import com.alibaba.fluss.flink.tiering.source.state.TieringSourceEnumeratorStateSerializer;
2525
import com.alibaba.fluss.lakehouse.writer.LakeTieringFactory;
26+
import com.alibaba.fluss.shaded.guava32.com.google.common.hash.HashFunction;
27+
import com.alibaba.fluss.shaded.guava32.com.google.common.hash.Hasher;
28+
import com.alibaba.fluss.shaded.guava32.com.google.common.hash.Hashing;
2629

2730
import org.apache.flink.api.connector.source.Boundedness;
2831
import org.apache.flink.api.connector.source.Source;
@@ -31,6 +34,10 @@
3134
import org.apache.flink.api.connector.source.SplitEnumerator;
3235
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
3336
import org.apache.flink.core.io.SimpleVersionedSerializer;
37+
import org.apache.flink.runtime.jobgraph.OperatorID;
38+
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
39+
40+
import java.nio.charset.StandardCharsets;
3441

3542
import static com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
3643

@@ -43,6 +50,11 @@ public class TieringSource<WriteResult>
4350
implements Source<
4451
TableBucketWriteResult<WriteResult>, TieringSplit, TieringSourceEnumeratorState> {
4552

53+
public static final String TIERING_SOURCE_TRANSFORMATION_UID =
54+
"$$fluss_tiering_source_operator$$";
55+
public static final OperatorID TIERING_SOURCE_OPERATOR_UID =
56+
new OperatorID(generateOperatorHash());
57+
4658
private final Configuration flussConf;
4759
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
4860
private final long pollTieringTableIntervalMs;
@@ -95,6 +107,14 @@ public SourceReader<TableBucketWriteResult<WriteResult>, TieringSplit> createRea
95107
return new TieringSourceReader<>(sourceReaderContext, flussConf, lakeTieringFactory);
96108
}
97109

110+
/** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */
111+
private static byte[] generateOperatorHash() {
112+
final HashFunction hashFunction = Hashing.murmur3_128(0);
113+
Hasher hasher = hashFunction.newHasher();
114+
hasher.putString(TIERING_SOURCE_TRANSFORMATION_UID, StandardCharsets.UTF_8);
115+
return hasher.hash().asBytes();
116+
}
117+
98118
/** Builder for {@link TieringSource}. */
99119
public static class Builder<WriteResult> {
100120

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSourceOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
/** Configuration options for the {@link TieringSource}. */
2626
public class TieringSourceOptions {
2727

28+
public static final String DATA_LAKE_CONFIG_PREFIX = "datalake.";
29+
2830
public static final ConfigOption<Duration> POLL_TIERING_TABLE_INTERVAL =
2931
key("tiering.poll.table.interval")
3032
.durationType()

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public class TieringSplitReader<WriteResult>
8888
@Nullable private Integer currentTableNumberOfSplits;
8989

9090
// map from table bucket to split id
91-
private final Map<TableBucket, String> currentTableSplitsByBucket;
91+
private final Map<TableBucket, TieringSplit> currentTableSplitsByBucket;
9292
private final Map<TableBucket, Long> currentTableStoppingOffsets;
9393
private final Set<TieringLogSplit> currentTableEmptyLogSplits;
9494

@@ -169,7 +169,7 @@ public void handleSplitsChanges(SplitsChange<TieringSplit> splitsChange) {
169169
}
170170

171171
private void addSplitToCurrentTable(TieringSplit split) {
172-
this.currentTableSplitsByBucket.put(split.getTableBucket(), split.splitId());
172+
this.currentTableSplitsByBucket.put(split.getTableBucket(), split);
173173
if (split.isTieringSnapshotSplit()) {
174174
this.currentPendingSnapshotSplits.add((TieringSnapshotSplit) split);
175175
} else if (split.isTieringLogSplit()) {
@@ -259,7 +259,9 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
259259
if (stoppingOffset == null) {
260260
continue;
261261
}
262-
LakeWriter<WriteResult> lakeWriter = getOrCreateLakeWriter(bucket);
262+
LakeWriter<WriteResult> lakeWriter =
263+
getOrCreateLakeWriter(
264+
bucket, currentTableSplitsByBucket.get(bucket).getPartitionName());
263265
for (ScanRecord record : bucketScanRecords) {
264266
// if record is less than stopping offset
265267
if (record.logOffset() < stoppingOffset) {
@@ -278,7 +280,7 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
278280
}
279281
// put write result of the bucket
280282
writeResults.put(bucket, completeLakeWriter(bucket, stoppingOffset));
281-
String currentSplitId = currentTableSplitsByBucket.remove(bucket);
283+
String currentSplitId = currentTableSplitsByBucket.remove(bucket).splitId();
282284
// put split of the bucket
283285
finishedSplitIds.put(bucket, currentSplitId);
284286
LOG.info("Split {} has been finished.", currentSplitId);
@@ -292,12 +294,13 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
292294
return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds);
293295
}
294296

295-
private LakeWriter<WriteResult> getOrCreateLakeWriter(TableBucket bucket) throws IOException {
297+
private LakeWriter<WriteResult> getOrCreateLakeWriter(
298+
TableBucket bucket, @Nullable String partitionName) throws IOException {
296299
LakeWriter<WriteResult> lakeWriter = lakeWriters.get(bucket);
297300
if (lakeWriter == null) {
298301
lakeWriter =
299302
lakeTieringFactory.createLakeWriter(
300-
new TieringWriterInitContext(currentTablePath, bucket));
303+
new TieringWriterInitContext(currentTablePath, bucket, partitionName));
301304
lakeWriters.put(bucket, lakeWriter);
302305
}
303306
return lakeWriter;
@@ -345,7 +348,7 @@ private void mayFinishCurrentTable() throws IOException {
345348
private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws IOException {
346349
TableBucket tableBucket = currentSnapshotSplit.getTableBucket();
347350
long logEndOffset = currentSnapshotSplit.getLogOffsetOfSnapshot();
348-
String splitId = currentTableSplitsByBucket.remove(tableBucket);
351+
String splitId = currentTableSplitsByBucket.remove(tableBucket).splitId();
349352
TableBucketWriteResult<WriteResult> writeResult =
350353
completeLakeWriter(tableBucket, logEndOffset);
351354
closeCurrentSnapshotSplit();
@@ -357,7 +360,9 @@ private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws I
357360

358361
private TableBucketWriteResultWithSplitIds forSnapshotSplitRecords(
359362
TableBucket bucket, CloseableIterator<RecordAndPos> recordIterator) throws IOException {
360-
LakeWriter<WriteResult> lakeWriter = getOrCreateLakeWriter(bucket);
363+
LakeWriter<WriteResult> lakeWriter =
364+
getOrCreateLakeWriter(
365+
bucket, checkNotNull(currentSnapshotSplit).getPartitionName());
361366
while (recordIterator.hasNext()) {
362367
ScanRecord scanRecord = recordIterator.next().record();
363368
lakeWriter.write(scanRecord);

0 commit comments

Comments
 (0)