33
33
import org .apache .flink .configuration .Configuration ;
34
34
import org .apache .flink .configuration .MemorySize ;
35
35
import org .apache .flink .configuration .TaskManagerOptions ;
36
+ import org .apache .flink .runtime .instance .SlotSharingGroupId ;
36
37
import org .apache .flink .runtime .jobgraph .JobVertexID ;
37
38
38
39
import org .slf4j .Logger ;
@@ -67,6 +68,9 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
67
68
public static final String HEAP_USAGE_MESSAGE =
68
69
"Heap Usage %s is above the allowed limit for scaling operations. Please adjust the available memory manually." ;
69
70
71
+ public static final String RESOURCE_QUOTA_REACHED_MESSAGE =
72
+ "Resource usage is above the allowed limit for scaling operations. Please adjust the resource quota manually." ;
73
+
70
74
private static final Logger LOG = LoggerFactory .getLogger (ScalingExecutor .class );
71
75
72
76
private final JobVertexScaler <KEY , Context > jobVertexScaler ;
@@ -129,8 +133,10 @@ public boolean scaleResource(
129
133
scalingSummaries ,
130
134
autoScalerEventHandler );
131
135
132
- if (scalingWouldExceedClusterResources (
133
- configOverrides .newConfigWithOverrides (conf ),
136
+ var memoryTuningEnabled = conf .get (AutoScalerOptions .MEMORY_TUNING_ENABLED );
137
+ if (scalingWouldExceedMaxResources (
138
+ memoryTuningEnabled ? configOverrides .newConfigWithOverrides (conf ) : conf ,
139
+ jobTopology ,
134
140
evaluatedMetrics ,
135
141
scalingSummaries ,
136
142
context )) {
@@ -280,6 +286,29 @@ private boolean isJobUnderMemoryPressure(
280
286
return false ;
281
287
}
282
288
289
+ private boolean scalingWouldExceedMaxResources (
290
+ Configuration tunedConfig ,
291
+ JobTopology jobTopology ,
292
+ EvaluatedMetrics evaluatedMetrics ,
293
+ Map <JobVertexID , ScalingSummary > scalingSummaries ,
294
+ Context ctx ) {
295
+ if (scalingWouldExceedClusterResources (
296
+ tunedConfig , evaluatedMetrics , scalingSummaries , ctx )) {
297
+ return true ;
298
+ }
299
+ if (scalingWouldExceedResourceQuota (tunedConfig , jobTopology , scalingSummaries , ctx )) {
300
+ autoScalerEventHandler .handleEvent (
301
+ ctx ,
302
+ AutoScalerEventHandler .Type .Warning ,
303
+ "ResourceQuotaReached" ,
304
+ RESOURCE_QUOTA_REACHED_MESSAGE ,
305
+ null ,
306
+ tunedConfig .get (SCALING_EVENT_INTERVAL ));
307
+ return true ;
308
+ }
309
+ return false ;
310
+ }
311
+
283
312
private boolean scalingWouldExceedClusterResources (
284
313
Configuration tunedConfig ,
285
314
EvaluatedMetrics evaluatedMetrics ,
@@ -306,7 +335,7 @@ private boolean scalingWouldExceedClusterResources(
306
335
ResourceCheckUtils .estimateNumTaskSlotsAfterRescale (
307
336
evaluatedMetrics .getVertexMetrics (), scalingSummaries , numTaskSlotsUsed );
308
337
309
- int taskSlotsPerTm = ctx . getConfiguration () .get (TaskManagerOptions .NUM_TASK_SLOTS );
338
+ int taskSlotsPerTm = tunedConfig .get (TaskManagerOptions .NUM_TASK_SLOTS );
310
339
311
340
int currentNumTms = (int ) Math .ceil (numTaskSlotsUsed / (double ) taskSlotsPerTm );
312
341
int newNumTms = (int ) Math .ceil (numTaskSlotsAfterRescale / (double ) taskSlotsPerTm );
@@ -315,6 +344,83 @@ private boolean scalingWouldExceedClusterResources(
315
344
currentNumTms , newNumTms , taskManagerCpu , taskManagerMemory );
316
345
}
317
346
347
+ protected static boolean scalingWouldExceedResourceQuota (
348
+ Configuration tunedConfig ,
349
+ JobTopology jobTopology ,
350
+ Map <JobVertexID , ScalingSummary > scalingSummaries ,
351
+ JobAutoScalerContext <?> ctx ) {
352
+
353
+ if (jobTopology == null || jobTopology .getSlotSharingGroupMapping ().isEmpty ()) {
354
+ return false ;
355
+ }
356
+
357
+ var cpuQuota = tunedConfig .getOptional (AutoScalerOptions .CPU_QUOTA );
358
+ var memoryQuota = tunedConfig .getOptional (AutoScalerOptions .MEMORY_QUOTA );
359
+ var tmMemory = MemoryTuning .getTotalMemory (tunedConfig , ctx );
360
+ var tmCpu = ctx .getTaskManagerCpu ().orElse (0. );
361
+
362
+ if (cpuQuota .isPresent () || memoryQuota .isPresent ()) {
363
+ var currentSlotSharingGroupMaxParallelisms = new HashMap <SlotSharingGroupId , Integer >();
364
+ var newSlotSharingGroupMaxParallelisms = new HashMap <SlotSharingGroupId , Integer >();
365
+ for (var e : jobTopology .getSlotSharingGroupMapping ().entrySet ()) {
366
+ int currentMaxParallelism =
367
+ e .getValue ().stream ()
368
+ .filter (scalingSummaries ::containsKey )
369
+ .mapToInt (v -> scalingSummaries .get (v ).getCurrentParallelism ())
370
+ .max ()
371
+ .orElse (0 );
372
+ currentSlotSharingGroupMaxParallelisms .put (e .getKey (), currentMaxParallelism );
373
+ int newMaxParallelism =
374
+ e .getValue ().stream ()
375
+ .filter (scalingSummaries ::containsKey )
376
+ .mapToInt (v -> scalingSummaries .get (v ).getNewParallelism ())
377
+ .max ()
378
+ .orElse (0 );
379
+ newSlotSharingGroupMaxParallelisms .put (e .getKey (), newMaxParallelism );
380
+ }
381
+
382
+ var numSlotsPerTm = tunedConfig .get (TaskManagerOptions .NUM_TASK_SLOTS );
383
+ var currentTotalSlots =
384
+ currentSlotSharingGroupMaxParallelisms .values ().stream ()
385
+ .mapToInt (Integer ::intValue )
386
+ .sum ();
387
+ var currentNumTms = currentTotalSlots / numSlotsPerTm ;
388
+ var newTotalSlots =
389
+ newSlotSharingGroupMaxParallelisms .values ().stream ()
390
+ .mapToInt (Integer ::intValue )
391
+ .sum ();
392
+ var newNumTms = newTotalSlots / numSlotsPerTm ;
393
+
394
+ if (newNumTms <= currentNumTms ) {
395
+ LOG .debug (
396
+ "Skipping quota check due to new resource allocation is less or equals than the current" );
397
+ return false ;
398
+ }
399
+
400
+ if (cpuQuota .isPresent ()) {
401
+ LOG .debug ("CPU resource quota is {}, checking limits" , cpuQuota .get ());
402
+ double totalCPU = tmCpu * newNumTms ;
403
+ if (totalCPU > cpuQuota .get ()) {
404
+ LOG .info ("CPU resource quota reached with value: {}" , totalCPU );
405
+ return true ;
406
+ }
407
+ }
408
+
409
+ if (memoryQuota .isPresent ()) {
410
+ LOG .debug ("Memory resource quota is {}, checking limits" , memoryQuota .get ());
411
+ long totalMemory = tmMemory .getBytes () * newNumTms ;
412
+ if (totalMemory > memoryQuota .get ().getBytes ()) {
413
+ LOG .info (
414
+ "Memory resource quota reached with value: {}" ,
415
+ new MemorySize (totalMemory ));
416
+ return true ;
417
+ }
418
+ }
419
+ }
420
+
421
+ return false ;
422
+ }
423
+
318
424
private static Map <String , String > getVertexParallelismOverrides (
319
425
Map <JobVertexID , Map <ScalingMetric , EvaluatedScalingMetric >> evaluatedMetrics ,
320
426
Map <JobVertexID , ScalingSummary > summaries ) {
0 commit comments