diff --git a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java new file mode 100644 index 0000000000..be97b7048a --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * An adapter for Flink {@link RuntimeContext} class. The {@link RuntimeContext} class added the + * `getJobInfo` and `getTaskInfo` methods in version 1.19 and deprecated many methods, such as + * `getAttemptNumber`. + * + *

TODO: remove this class when no longer support flink 1.18. + */ +public class RuntimeContextAdapter { + + public static int getAttemptNumber(RuntimeContext runtimeContext) { + return runtimeContext.getAttemptNumber(); + } +} diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java new file mode 100644 index 0000000000..49f7038a46 --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.util.function.Supplier; + +/** + * Adapter for {@link StreamOperatorParameters} because the constructor is compatibility in flink + * 1.18 and 1.19. However, this constructor only used in test. + * + *

TODO: remove this class when no longer support flink 1.18 and 1.19. + */ +public class StreamOperatorParametersAdapter { + + public static StreamOperatorParameters create( + StreamTask containingTask, + StreamConfig config, + Output> output, + Supplier processingTimeServiceFactory, + OperatorEventDispatcher operatorEventDispatcher, + MailboxExecutor mailboxExecutor) { + return new StreamOperatorParameters<>( + containingTask, + config, + output, + processingTimeServiceFactory, + operatorEventDispatcher); + } +} diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java new file mode 100644 index 0000000000..b3c04505e5 --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/committer/Flink118TieringCommitOperatorTest.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.committer; + +/** + * UT for {@link TieringCommitOperator}. Test the compatibility of the `getAttemptNumber` method in + * flink 1.18. + */ +public class Flink118TieringCommitOperatorTest extends TieringCommitOperatorTest {} diff --git a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java new file mode 100644 index 0000000000..49f7038a46 --- /dev/null +++ b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.util.function.Supplier; + +/** + * Adapter for {@link StreamOperatorParameters} because the constructor is compatibility in flink + * 1.18 and 1.19. However, this constructor only used in test. + * + *

TODO: remove this class when no longer support flink 1.18 and 1.19. + */ +public class StreamOperatorParametersAdapter { + + public static StreamOperatorParameters create( + StreamTask containingTask, + StreamConfig config, + Output> output, + Supplier processingTimeServiceFactory, + OperatorEventDispatcher operatorEventDispatcher, + MailboxExecutor mailboxExecutor) { + return new StreamOperatorParameters<>( + containingTask, + config, + output, + processingTimeServiceFactory, + operatorEventDispatcher); + } +} diff --git a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java new file mode 100644 index 0000000000..7fd1a20d0b --- /dev/null +++ b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/committer/Flink119TieringCommitOperatorTest.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.committer; + +/** + * UT for {@link TieringCommitOperator}. Test the compatibility of the `getAttemptNumber` method in + * flink 1.19. + */ +public class Flink119TieringCommitOperatorTest extends TieringCommitOperatorTest {} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java new file mode 100644 index 0000000000..25cbfcc3f6 --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/committer/Flink22TieringCommitOperatorTest.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.committer; + +/** + * UT for {@link TieringCommitOperator}. Test the compatibility of the `getAttemptNumber` method in + * flink 2.2. + */ +public class Flink22TieringCommitOperatorTest extends TieringCommitOperatorTest {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java new file mode 100644 index 0000000000..f635fbeb61 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * An adapter for Flink {@link RuntimeContext} class. The {@link RuntimeContext} class added the + * `getJobInfo` and `getTaskInfo` methods in version 1.19 and deprecated many methods, such as + * `getAttemptNumber`. + * + *

TODO: remove this class when no longer support flink 1.18. + */ +public class RuntimeContextAdapter { + + public static int getAttemptNumber(RuntimeContext runtimeContext) { + return runtimeContext.getTaskInfo().getAttemptNumber(); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java index 9d60c8899c..8954ed6f58 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java @@ -23,6 +23,7 @@ import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; +import org.apache.fluss.flink.adapter.RuntimeContextAdapter; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringFailOverEvent; @@ -132,7 +133,7 @@ public void setup( StreamConfig config, Output>> output) { super.setup(containingTask, config, output); - int attemptNumber = getRuntimeContext().getAttemptNumber(); + int attemptNumber = RuntimeContextAdapter.getAttemptNumber(getRuntimeContext()); if (attemptNumber > 0) { LOG.info("Send TieringFailoverEvent, current attempt number: {}", attemptNumber); // attempt number is greater than zero, the job must failover diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java new file mode 100644 index 0000000000..e0989cb1ea --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/StreamOperatorParametersAdapter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.util.function.Supplier; + +/** + * Adapter for {@link StreamOperatorParameters} because the constructor is compatibility in flink + * 1.18 and 1.19. However, this constructor only used in test. + * + *

TODO: remove this class when no longer support flink 1.18 and 1.19. + */ +public class StreamOperatorParametersAdapter { + + public static StreamOperatorParameters create( + StreamTask containingTask, + StreamConfig config, + Output> output, + Supplier processingTimeServiceFactory, + OperatorEventDispatcher operatorEventDispatcher, + MailboxExecutor mailboxExecutor) { + return new StreamOperatorParameters<>( + containingTask, + config, + output, + processingTimeServiceFactory, + operatorEventDispatcher, + mailboxExecutor); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java index 0be879e9fb..618d454453 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; +import org.apache.fluss.flink.adapter.StreamOperatorParametersAdapter; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; @@ -76,7 +77,7 @@ void beforeEach() throws Exception { MockOperatorEventDispatcher mockOperatorEventDispatcher = new MockOperatorEventDispatcher(mockOperatorEventGateway); parameters = - new StreamOperatorParameters<>( + StreamOperatorParametersAdapter.create( new SourceOperatorStreamTask(new DummyEnvironment()), new MockStreamConfig(new Configuration(), 1), new MockOutput<>(new ArrayList<>()),