@@ -187,32 +187,28 @@ protected void run(String[] args) throws Exception {
187
187
final CustomCommandLine activeCommandLine =
188
188
validateAndGetActiveCommandLine (checkNotNull (commandLine ));
189
189
190
- if (isDeploymentTargetApplication (activeCommandLine , commandLine )) {
190
+ final Configuration effectiveConfiguration =
191
+ getEffectiveConfiguration (activeCommandLine , commandLine );
192
+
193
+ if (isDeploymentTargetApplication (effectiveConfiguration )) {
191
194
final ApplicationDeployer deployer =
192
195
new ApplicationClusterDeployer (clusterClientServiceLoader );
193
196
194
197
final ProgramOptions programOptions ;
195
- final Configuration effectiveConfiguration ;
196
198
197
199
// No need to set a jarFile path for PyFlink job.
198
200
if (ProgramOptionsUtils .isPythonEntryPoint (commandLine )) {
199
201
programOptions = ProgramOptionsUtils .createPythonProgramOptions (commandLine );
200
- effectiveConfiguration =
201
- getEffectiveConfiguration (
202
- activeCommandLine ,
203
- commandLine ,
204
- programOptions ,
205
- Collections .emptyList ());
202
+ updateEffectiveConfiguration (
203
+ effectiveConfiguration , programOptions , Collections .emptyList ());
206
204
} else {
207
205
programOptions = new ProgramOptions (commandLine );
208
206
programOptions .validate ();
209
207
final URI uri = PackagedProgramUtils .resolveURI (programOptions .getJarFilePath ());
210
- effectiveConfiguration =
211
- getEffectiveConfiguration (
212
- activeCommandLine ,
213
- commandLine ,
214
- programOptions ,
215
- Collections .singletonList (uri .toString ()));
208
+ updateEffectiveConfiguration (
209
+ effectiveConfiguration ,
210
+ programOptions ,
211
+ Collections .singletonList (uri .toString ()));
216
212
}
217
213
218
214
final ApplicationConfiguration applicationConfiguration =
@@ -225,9 +221,7 @@ protected void run(String[] args) throws Exception {
225
221
226
222
final List <URL > jobJars = getJobJarAndDependencies (programOptions );
227
223
228
- final Configuration effectiveConfiguration =
229
- getEffectiveConfiguration (
230
- activeCommandLine , commandLine , programOptions , jobJars );
224
+ updateEffectiveConfiguration (effectiveConfiguration , programOptions , jobJars );
231
225
232
226
LOG .debug ("Effective executor configuration: {}" , effectiveConfiguration );
233
227
@@ -238,12 +232,7 @@ protected void run(String[] args) throws Exception {
238
232
}
239
233
}
240
234
241
- protected boolean isDeploymentTargetApplication (
242
- final CustomCommandLine activeCustomCommandLine , final CommandLine commandLine )
243
- throws FlinkException {
244
- final Configuration effectiveConfiguration =
245
- getEffectiveConfiguration (activeCustomCommandLine , commandLine );
246
-
235
+ protected boolean isDeploymentTargetApplication (final Configuration effectiveConfiguration ) {
247
236
final String executionTarget =
248
237
effectiveConfiguration
249
238
.getOptional (DeploymentOptions .TARGET )
@@ -300,15 +289,10 @@ private Configuration getEffectiveConfiguration(
300
289
return effectiveConfiguration ;
301
290
}
302
291
303
- private <T > Configuration getEffectiveConfiguration (
304
- final CustomCommandLine activeCustomCommandLine ,
305
- final CommandLine commandLine ,
292
+ private <T > void updateEffectiveConfiguration (
293
+ final Configuration effectiveConfiguration ,
306
294
final ProgramOptions programOptions ,
307
- final List <T > jobJars )
308
- throws FlinkException {
309
-
310
- final Configuration effectiveConfiguration =
311
- getEffectiveConfiguration (activeCustomCommandLine , commandLine );
295
+ final List <T > jobJars ) {
312
296
313
297
final ExecutionConfigAccessor executionParameters =
314
298
ExecutionConfigAccessor .fromProgramOptions (programOptions , jobJars );
@@ -318,7 +302,6 @@ private <T> Configuration getEffectiveConfiguration(
318
302
LOG .debug (
319
303
"Effective configuration after Flink conf, custom commandline, and program options: {}" ,
320
304
effectiveConfiguration );
321
- return effectiveConfiguration ;
322
305
}
323
306
324
307
/**
@@ -354,11 +337,12 @@ protected void info(String[] args) throws Exception {
354
337
validateAndGetActiveCommandLine (checkNotNull (commandLine ));
355
338
356
339
final Configuration effectiveConfiguration =
357
- getEffectiveConfiguration (
358
- activeCommandLine ,
359
- commandLine ,
360
- programOptions ,
361
- getJobJarAndDependencies (programOptions ));
340
+ getEffectiveConfiguration (activeCommandLine , commandLine );
341
+
342
+ updateEffectiveConfiguration (
343
+ effectiveConfiguration ,
344
+ programOptions ,
345
+ getJobJarAndDependencies (programOptions ));
362
346
363
347
program = buildProgram (programOptions , effectiveConfiguration );
364
348
0 commit comments