|
20 | 20 | import com.alibaba.fluss.config.Configuration; |
21 | 21 |
|
22 | 22 | import org.apache.flink.api.java.utils.MultipleParameterTool; |
| 23 | +import org.apache.flink.configuration.JobManagerOptions; |
23 | 24 | import org.apache.flink.core.execution.JobClient; |
24 | 25 | import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
25 | 26 |
|
26 | 27 | import java.util.Map; |
27 | 28 |
|
28 | 29 | import static com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX; |
29 | 30 | import static com.alibaba.fluss.utils.PropertiesUtils.extractAndRemovePrefix; |
| 31 | +import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME; |
30 | 32 |
|
31 | 33 | /** The entrypoint for Flink to tiering fluss data to Paimon. */ |
32 | 34 | public class FlussLakeTieringEntrypoint { |
@@ -62,9 +64,17 @@ public static void main(String[] args) throws Exception { |
62 | 64 | extractAndRemovePrefix( |
63 | 65 | paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake)); |
64 | 66 |
|
| 67 | + // now, we must use full restart strategy if any task is failed, |
| 68 | + // since committer is stateless, if tiering committer is failover, committer |
| 69 | + // will lost the collected committable, and will never collect all committable to do commit |
| 70 | + // todo: support region failover |
| 71 | + org.apache.flink.configuration.Configuration flinkConfig = |
| 72 | + new org.apache.flink.configuration.Configuration(); |
| 73 | + flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME); |
| 74 | + |
65 | 75 | // build tiering source |
66 | 76 | final StreamExecutionEnvironment execEnv = |
67 | | - StreamExecutionEnvironment.getExecutionEnvironment(); |
| 77 | + StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); |
68 | 78 |
|
69 | 79 | // build lake tiering job |
70 | 80 | JobClient jobClient = |
|
0 commit comments