@@ -22,6 +22,7 @@ import (
22
22
"reflect"
23
23
"strings"
24
24
25
+ "github.com/hashicorp/go-version"
25
26
"k8s.io/apimachinery/pkg/api/resource"
26
27
27
28
corev1 "k8s.io/api/core/v1"
@@ -47,6 +48,15 @@ func (v *Validator) ValidateCreate(cluster *FlinkCluster) error {
47
48
if err != nil {
48
49
return err
49
50
}
51
+
52
+ var flinkVersion * version.Version
53
+ if len (cluster .Spec .FlinkVersion ) != 0 {
54
+ flinkVersion , err = version .NewVersion (cluster .Spec .FlinkVersion )
55
+ if err != nil {
56
+ return err
57
+ }
58
+ }
59
+
50
60
err = v .validateHadoopConfig (cluster .Spec .HadoopConfig )
51
61
if err != nil {
52
62
return err
@@ -59,11 +69,11 @@ func (v *Validator) ValidateCreate(cluster *FlinkCluster) error {
59
69
if err != nil {
60
70
return err
61
71
}
62
- err = v .validateJobManager (& cluster .Spec .JobManager )
72
+ err = v .validateJobManager (flinkVersion , & cluster .Spec .JobManager )
63
73
if err != nil {
64
74
return err
65
75
}
66
- err = v .validateTaskManager (& cluster .Spec .TaskManager )
76
+ err = v .validateTaskManager (flinkVersion , & cluster .Spec .TaskManager )
67
77
if err != nil {
68
78
return err
69
79
}
@@ -282,7 +292,7 @@ func (v *Validator) validateImage(imageSpec *ImageSpec) error {
282
292
return nil
283
293
}
284
294
285
- func (v * Validator ) validateJobManager (jmSpec * JobManagerSpec ) error {
295
+ func (v * Validator ) validateJobManager (flinkVersion * version. Version , jmSpec * JobManagerSpec ) error {
286
296
var err error
287
297
288
298
// Replicas.
@@ -330,22 +340,39 @@ func (v *Validator) validateJobManager(jmSpec *JobManagerSpec) error {
330
340
return err
331
341
}
332
342
333
- // MemoryOffHeapRatio
334
- err = v . validateMemoryOffHeapRatio ( jmSpec .MemoryOffHeapRatio , "jobmanager" )
335
- if err != nil {
336
- return err
337
- }
343
+ if flinkVersion == nil || flinkVersion . LessThan ( v10 ) {
344
+ if jmSpec .MemoryProcessRatio != nil {
345
+ return fmt . Errorf ( "MemoryProcessRatio config cannot be used with flinkVersion < 1.11', use " +
346
+ "memoryOffHeapRatio instead" )
347
+ }
338
348
339
- // MemoryOffHeapMin
340
- err = v .validateMemoryOffHeapMin (& jmSpec .MemoryOffHeapMin , jmSpec .Resources .Limits .Memory (), "jobmanager" )
341
- if err != nil {
342
- return err
349
+ // MemoryOffHeapRatio
350
+ err = v .validateRatio (jmSpec .MemoryOffHeapRatio , "jobmanager" , "memoryOffHeapRatio" )
351
+ if err != nil {
352
+ return err
353
+ }
354
+
355
+ // MemoryOffHeapMin
356
+ err = v .validateMemoryOffHeapMin (& jmSpec .MemoryOffHeapMin , jmSpec .Resources .Limits .Memory (), "jobmanager" )
357
+ if err != nil {
358
+ return err
359
+ }
360
+ } else {
361
+ if jmSpec .MemoryOffHeapRatio != nil || ! jmSpec .MemoryOffHeapMin .IsZero () {
362
+ return fmt .Errorf ("MemoryOffHeapRatio or MemoryOffHeapMin config cannot be used with flinkVersion >= 1.11'; " +
363
+ "use memoryProcessRatio istead" )
364
+ }
365
+ // MemoryProcessRatio
366
+ err = v .validateRatio (jmSpec .MemoryProcessRatio , "jobmanager" , "memoryProcessRatio" )
367
+ if err != nil {
368
+ return err
369
+ }
343
370
}
344
371
345
372
return nil
346
373
}
347
374
348
- func (v * Validator ) validateTaskManager (tmSpec * TaskManagerSpec ) error {
375
+ func (v * Validator ) validateTaskManager (flinkVersion * version. Version , tmSpec * TaskManagerSpec ) error {
349
376
// Replicas.
350
377
if tmSpec .Replicas < 1 {
351
378
return fmt .Errorf ("invalid TaskManager replicas, it must >= 1" )
@@ -376,16 +403,33 @@ func (v *Validator) validateTaskManager(tmSpec *TaskManagerSpec) error {
376
403
return err
377
404
}
378
405
379
- // MemoryOffHeapRatio
380
- err = v . validateMemoryOffHeapRatio ( tmSpec .MemoryOffHeapRatio , "taskmanager" )
381
- if err != nil {
382
- return err
383
- }
406
+ if flinkVersion == nil || flinkVersion . LessThan ( v10 ) {
407
+ if tmSpec .MemoryProcessRatio != nil {
408
+ return fmt . Errorf ( "MemoryProcessRatio config cannot be used with flinkVersion < 1.11', use " +
409
+ "memoryOffHeapRatio instead" )
410
+ }
384
411
385
- // MemoryOffHeapMin
386
- err = v .validateMemoryOffHeapMin (& tmSpec .MemoryOffHeapMin , tmSpec .Resources .Limits .Memory (), "taskmanager" )
387
- if err != nil {
388
- return err
412
+ // MemoryOffHeapRatio
413
+ err = v .validateRatio (tmSpec .MemoryOffHeapRatio , "taskmanager" , "memoryOffHeapRatio" )
414
+ if err != nil {
415
+ return err
416
+ }
417
+
418
+ // MemoryOffHeapMin
419
+ err = v .validateMemoryOffHeapMin (& tmSpec .MemoryOffHeapMin , tmSpec .Resources .Limits .Memory (), "taskmanager" )
420
+ if err != nil {
421
+ return err
422
+ }
423
+ } else {
424
+ if tmSpec .MemoryOffHeapRatio != nil || ! tmSpec .MemoryOffHeapMin .IsZero () {
425
+ return fmt .Errorf ("MemoryOffHeapRatio or MemoryOffHeapMin config cannot be used with flinkVersion >= 1.11'; " +
426
+ "use memoryProcessRatio istead" )
427
+ }
428
+ // MemoryProcessRatio
429
+ err = v .validateRatio (tmSpec .MemoryProcessRatio , "taskmanager" , "memoryProcessRatio" )
430
+ if err != nil {
431
+ return err
432
+ }
389
433
}
390
434
391
435
return nil
@@ -487,10 +531,9 @@ func (v *Validator) validateCleanupAction(
487
531
return nil
488
532
}
489
533
490
- func (v * Validator ) validateMemoryOffHeapRatio (
491
- offHeapRatio * int32 , component string ) error {
492
- if offHeapRatio == nil || * offHeapRatio > 100 || * offHeapRatio < 0 {
493
- return fmt .Errorf ("invalid %v memoryOffHeapRatio, it must be between 0 and 100" , component )
534
+ func (v * Validator ) validateRatio (ratio * int32 , component , property string ) error {
535
+ if ratio == nil || * ratio > 100 || * ratio < 0 {
536
+ return fmt .Errorf ("invalid %v %v, it must be between 0 and 100" , component , property )
494
537
}
495
538
return nil
496
539
}
0 commit comments