Skip to content

Commit 960b9d7

Browse files
luoyuxiawuchong
authored andcommitted
[hotfix] Always use full restart strategy for tiering service to avoid table can't be committed successfully (#1103)
1 parent d37016b commit 960b9d7

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.flink.api.connector.source.SourceReaderContext;
2727
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
2828

29+
import java.util.Collections;
30+
import java.util.List;
2931
import java.util.Map;
3032

3133
/** A {@link SourceReader} that read records from Fluss and write to lake. */
@@ -61,6 +63,12 @@ protected void onSplitFinished(Map<String, TieringSplitState> finishedSplitIds)
6163
context.sendSplitRequest();
6264
}
6365

66+
@Override
67+
public List<TieringSplit> snapshotState(long checkpointId) {
68+
// we return empty list to make source reader be stateless
69+
return Collections.emptyList();
70+
}
71+
6472
@Override
6573
protected TieringSplitState initializedState(TieringSplit split) {
6674
if (split.isTieringSnapshotSplit()) {

fluss-flink/fluss-flink-tiering/src/main/java/com/alibaba/fluss/flink/tiering/FlussLakeTieringEntrypoint.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import com.alibaba.fluss.config.Configuration;
2121

2222
import org.apache.flink.api.java.utils.MultipleParameterTool;
23+
import org.apache.flink.configuration.JobManagerOptions;
2324
import org.apache.flink.core.execution.JobClient;
2425
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2526

2627
import java.util.Map;
2728

2829
import static com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
2930
import static com.alibaba.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
31+
import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
3032

3133
/** The entrypoint for Flink to tiering fluss data to Paimon. */
3234
public class FlussLakeTieringEntrypoint {
@@ -62,9 +64,17 @@ public static void main(String[] args) throws Exception {
6264
extractAndRemovePrefix(
6365
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));
6466

67+
// now, we must use full restart strategy if any task is failed,
68+
// since committer is stateless, if tiering committer is failover, committer
69+
// will lost the collected committable, and will never collect all committable to do commit
70+
// todo: support region failover
71+
org.apache.flink.configuration.Configuration flinkConfig =
72+
new org.apache.flink.configuration.Configuration();
73+
flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME);
74+
6575
// build tiering source
6676
final StreamExecutionEnvironment execEnv =
67-
StreamExecutionEnvironment.getExecutionEnvironment();
77+
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
6878

6979
// build lake tiering job
7080
JobClient jobClient =

0 commit comments

Comments
 (0)