17
17
*/
18
18
package org .apache .beam .runners .flink ;
19
19
20
+ import static org .apache .beam .sdk .util .Preconditions .checkStateNotNull ;
20
21
import static org .apache .beam .sdk .util .construction .resources .PipelineResources .detectClassPathResourcesToStage ;
21
22
22
23
import java .util .UUID ;
31
32
import org .apache .beam .vendor .grpc .v1p60p1 .com .google .protobuf .Struct ;
32
33
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Strings ;
33
34
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ListeningExecutorService ;
34
- import org .checkerframework .checker .nullness .qual .Nullable ;
35
35
import org .slf4j .Logger ;
36
36
import org .slf4j .LoggerFactory ;
37
37
38
38
/** Job Invoker for the {@link FlinkRunner}. */
39
- @ SuppressWarnings ({
40
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
41
- })
42
39
public class FlinkJobInvoker extends JobInvoker {
43
40
private static final Logger LOG = LoggerFactory .getLogger (FlinkJobInvoker .class );
44
41
@@ -57,7 +54,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo
57
54
protected JobInvocation invokeWithExecutor (
58
55
RunnerApi .Pipeline pipeline ,
59
56
Struct options ,
60
- @ Nullable String retrievalToken ,
57
+ String retrievalToken ,
61
58
ListeningExecutorService executorService ) {
62
59
63
60
// TODO: How to make Java/Python agree on names of keys and their values?
@@ -74,20 +71,22 @@ protected JobInvocation invokeWithExecutor(
74
71
75
72
PortablePipelineOptions portableOptions = flinkOptions .as (PortablePipelineOptions .class );
76
73
74
+ ClassLoader thisClassLoader =
75
+ checkStateNotNull (
76
+ FlinkJobInvoker .class .getClassLoader (),
77
+ "FlinkJobInvoker class loader is null - this means it was loaded by the bootstrap classloader, which should be impossible" );
78
+
77
79
PortablePipelineRunner pipelineRunner ;
78
80
if (Strings .isNullOrEmpty (portableOptions .getOutputExecutablePath ())) {
79
81
pipelineRunner =
80
82
new FlinkPipelineRunner (
81
83
flinkOptions ,
82
84
serverConfig .getFlinkConfDir (),
83
- detectClassPathResourcesToStage (
84
- FlinkJobInvoker .class .getClassLoader (), flinkOptions ));
85
+ detectClassPathResourcesToStage (thisClassLoader , flinkOptions ));
85
86
} else {
86
87
pipelineRunner = new PortablePipelineJarCreator (FlinkPipelineRunner .class );
87
88
}
88
89
89
- flinkOptions .setRunner (null );
90
-
91
90
LOG .info ("Invoking job {} with pipeline runner {}" , invocationId , pipelineRunner );
92
91
return createJobInvocation (
93
92
invocationId , retrievalToken , executorService , pipeline , flinkOptions , pipelineRunner );
0 commit comments