1717 */
1818package org .apache .beam .runners .flink ;
1919
20+ import static org .apache .beam .sdk .util .Preconditions .checkStateNotNull ;
2021import static org .apache .beam .sdk .util .construction .resources .PipelineResources .detectClassPathResourcesToStage ;
2122
2223import java .util .UUID ;
3132import org .apache .beam .vendor .grpc .v1p69p0 .com .google .protobuf .Struct ;
3233import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Strings ;
3334import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ListeningExecutorService ;
34- import org .checkerframework .checker .nullness .qual .Nullable ;
3535import org .slf4j .Logger ;
3636import org .slf4j .LoggerFactory ;
3737
3838/** Job Invoker for the {@link FlinkRunner}. */
39- @ SuppressWarnings ({
40- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
41- })
4239public class FlinkJobInvoker extends JobInvoker {
4340 private static final Logger LOG = LoggerFactory .getLogger (FlinkJobInvoker .class );
4441
@@ -57,7 +54,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo
5754 protected JobInvocation invokeWithExecutor (
5855 RunnerApi .Pipeline pipeline ,
5956 Struct options ,
60- @ Nullable String retrievalToken ,
57+ String retrievalToken ,
6158 ListeningExecutorService executorService ) {
6259
6360 // TODO: How to make Java/Python agree on names of keys and their values?
@@ -74,20 +71,22 @@ protected JobInvocation invokeWithExecutor(
7471
7572 PortablePipelineOptions portableOptions = flinkOptions .as (PortablePipelineOptions .class );
7673
74+ ClassLoader thisClassLoader =
75+ checkStateNotNull (
76+ FlinkJobInvoker .class .getClassLoader (),
77+ "FlinkJobInvoker class loader is null - this means it was loaded by the bootstrap classloader, which should be impossible" );
78+
7779 PortablePipelineRunner pipelineRunner ;
7880 if (Strings .isNullOrEmpty (portableOptions .getOutputExecutablePath ())) {
7981 pipelineRunner =
8082 new FlinkPipelineRunner (
8183 flinkOptions ,
8284 serverConfig .getFlinkConfDir (),
83- detectClassPathResourcesToStage (
84- FlinkJobInvoker .class .getClassLoader (), flinkOptions ));
85+ detectClassPathResourcesToStage (thisClassLoader , flinkOptions ));
8586 } else {
8687 pipelineRunner = new PortablePipelineJarCreator (FlinkPipelineRunner .class );
8788 }
8889
89- flinkOptions .setRunner (null );
90-
9190 LOG .info ("Invoking job {} with pipeline runner {}" , invocationId , pipelineRunner );
9291 return createJobInvocation (
9392 invocationId , retrievalToken , executorService , pipeline , flinkOptions , pipelineRunner );
0 commit comments