Skip to content

Commit baad900

Browse files
[FLINK-34574] Add CPU and memory size autoscaler quota
1 parent 4293d58 commit baad900

File tree

18 files changed

+977
-46
lines changed

18 files changed

+977
-46
lines changed

Diff for: docs/layouts/shortcodes/generated/auto_scaler_configuration.html

+12
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,18 @@
116116
<td>Double</td>
117117
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
118118
</tr>
119+
<tr>
120+
<td><h5>job.autoscaler.quota.cpu</h5></td>
121+
<td style="word-wrap: break-word;">(none)</td>
122+
<td>Double</td>
123+
<td>Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.</td>
124+
</tr>
125+
<tr>
126+
<td><h5>job.autoscaler.quota.memory</h5></td>
127+
<td style="word-wrap: break-word;">(none)</td>
128+
<td>MemorySize</td>
129+
<td>Quota of the memory size. When scaling would go beyond this number the the scaling is not going to happen.</td>
130+
</tr>
119131
<tr>
120132
<td><h5>job.autoscaler.restart.time</h5></td>
121133
<td style="word-wrap: break-word;">5 min</td>

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+110-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.configuration.Configuration;
3434
import org.apache.flink.configuration.MemorySize;
3535
import org.apache.flink.configuration.TaskManagerOptions;
36+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
3637
import org.apache.flink.runtime.jobgraph.JobVertexID;
3738

3839
import org.slf4j.Logger;
@@ -67,6 +68,9 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
6768
public static final String HEAP_USAGE_MESSAGE =
6869
"Heap Usage %s is above the allowed limit for scaling operations. Please adjust the available memory manually.";
6970

71+
public static final String RESOURCE_QUOTA_REACHED_MESSAGE =
72+
"Resource usage is above the allowed limit for scaling operations. Please adjust the resource quota manually.";
73+
7074
private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
7175

7276
private final JobVertexScaler<KEY, Context> jobVertexScaler;
@@ -129,8 +133,10 @@ public boolean scaleResource(
129133
scalingSummaries,
130134
autoScalerEventHandler);
131135

132-
if (scalingWouldExceedClusterResources(
133-
configOverrides.newConfigWithOverrides(conf),
136+
var memoryTuningEnabled = conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED);
137+
if (scalingWouldExceedMaxResources(
138+
memoryTuningEnabled ? configOverrides.newConfigWithOverrides(conf) : conf,
139+
jobTopology,
134140
evaluatedMetrics,
135141
scalingSummaries,
136142
context)) {
@@ -280,6 +286,30 @@ private boolean isJobUnderMemoryPressure(
280286
return false;
281287
}
282288

289+
@VisibleForTesting
290+
protected boolean scalingWouldExceedMaxResources(
291+
Configuration tunedConfig,
292+
JobTopology jobTopology,
293+
EvaluatedMetrics evaluatedMetrics,
294+
Map<JobVertexID, ScalingSummary> scalingSummaries,
295+
Context ctx) {
296+
if (scalingWouldExceedClusterResources(
297+
tunedConfig, evaluatedMetrics, scalingSummaries, ctx)) {
298+
return true;
299+
}
300+
if (scalingWouldExceedResourceQuota(tunedConfig, jobTopology, scalingSummaries, ctx)) {
301+
autoScalerEventHandler.handleEvent(
302+
ctx,
303+
AutoScalerEventHandler.Type.Warning,
304+
"ResourceQuotaReached",
305+
RESOURCE_QUOTA_REACHED_MESSAGE,
306+
null,
307+
tunedConfig.get(SCALING_EVENT_INTERVAL));
308+
return true;
309+
}
310+
return false;
311+
}
312+
283313
private boolean scalingWouldExceedClusterResources(
284314
Configuration tunedConfig,
285315
EvaluatedMetrics evaluatedMetrics,
@@ -306,7 +336,7 @@ private boolean scalingWouldExceedClusterResources(
306336
ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
307337
evaluatedMetrics.getVertexMetrics(), scalingSummaries, numTaskSlotsUsed);
308338

309-
int taskSlotsPerTm = ctx.getConfiguration().get(TaskManagerOptions.NUM_TASK_SLOTS);
339+
int taskSlotsPerTm = tunedConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
310340

311341
int currentNumTms = (int) Math.ceil(numTaskSlotsUsed / (double) taskSlotsPerTm);
312342
int newNumTms = (int) Math.ceil(numTaskSlotsAfterRescale / (double) taskSlotsPerTm);
@@ -315,6 +345,83 @@ private boolean scalingWouldExceedClusterResources(
315345
currentNumTms, newNumTms, taskManagerCpu, taskManagerMemory);
316346
}
317347

348+
protected static boolean scalingWouldExceedResourceQuota(
349+
Configuration tunedConfig,
350+
JobTopology jobTopology,
351+
Map<JobVertexID, ScalingSummary> scalingSummaries,
352+
JobAutoScalerContext<?> ctx) {
353+
354+
if (jobTopology == null || jobTopology.getSlotSharingGroupMapping().isEmpty()) {
355+
return false;
356+
}
357+
358+
var cpuQuota = tunedConfig.getOptional(AutoScalerOptions.CPU_QUOTA);
359+
var memoryQuota = tunedConfig.getOptional(AutoScalerOptions.MEMORY_QUOTA);
360+
var tmMemory = MemoryTuning.getTotalMemory(tunedConfig, ctx);
361+
var tmCpu = ctx.getTaskManagerCpu().orElse(0.);
362+
363+
if (cpuQuota.isPresent() || memoryQuota.isPresent()) {
364+
var currentSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>();
365+
var newSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>();
366+
for (var e : jobTopology.getSlotSharingGroupMapping().entrySet()) {
367+
int currentMaxParallelism =
368+
e.getValue().stream()
369+
.filter(scalingSummaries::containsKey)
370+
.mapToInt(v -> scalingSummaries.get(v).getCurrentParallelism())
371+
.max()
372+
.orElse(0);
373+
currentSlotSharingGroupMaxParallelisms.put(e.getKey(), currentMaxParallelism);
374+
int newMaxParallelism =
375+
e.getValue().stream()
376+
.filter(scalingSummaries::containsKey)
377+
.mapToInt(v -> scalingSummaries.get(v).getNewParallelism())
378+
.max()
379+
.orElse(0);
380+
newSlotSharingGroupMaxParallelisms.put(e.getKey(), newMaxParallelism);
381+
}
382+
383+
var numSlotsPerTm = tunedConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
384+
var currentTotalSlots =
385+
currentSlotSharingGroupMaxParallelisms.values().stream()
386+
.mapToInt(Integer::intValue)
387+
.sum();
388+
var currentNumTms = currentTotalSlots / numSlotsPerTm;
389+
var newTotalSlots =
390+
newSlotSharingGroupMaxParallelisms.values().stream()
391+
.mapToInt(Integer::intValue)
392+
.sum();
393+
var newNumTms = newTotalSlots / numSlotsPerTm;
394+
395+
if (newNumTms <= currentNumTms) {
396+
LOG.debug(
397+
"Skipping quota check due to new resource allocation is less or equals than the current");
398+
return false;
399+
}
400+
401+
if (cpuQuota.isPresent()) {
402+
LOG.debug("CPU resource quota is {}, checking limits", cpuQuota.get());
403+
double totalCPU = tmCpu * newNumTms;
404+
if (totalCPU > cpuQuota.get()) {
405+
LOG.info("CPU resource quota reached with value: {}", totalCPU);
406+
return true;
407+
}
408+
}
409+
410+
if (memoryQuota.isPresent()) {
411+
LOG.debug("Memory resource quota is {}, checking limits", memoryQuota.get());
412+
long totalMemory = tmMemory.getBytes() * newNumTms;
413+
if (totalMemory > memoryQuota.get().getBytes()) {
414+
LOG.info(
415+
"Memory resource quota reached with value: {}",
416+
new MemorySize(totalMemory));
417+
return true;
418+
}
419+
}
420+
}
421+
422+
return false;
423+
}
424+
318425
private static Map<String, String> getVertexParallelismOverrides(
319426
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
320427
Map<JobVertexID, ScalingSummary> summaries) {

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,15 @@ protected JobTopology getJobTopology(
210210
@VisibleForTesting
211211
@SneakyThrows
212212
protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
213-
Map<JobVertexID, Integer> maxParallelismMap =
213+
var slotSharingGroupIdMap =
214+
jobDetailsInfo.getJobVertexInfos().stream()
215+
.filter(e -> e.getSlotSharingGroupId() != null)
216+
.collect(
217+
Collectors.toMap(
218+
JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID,
219+
JobDetailsInfo.JobVertexDetailsInfo
220+
::getSlotSharingGroupId));
221+
var maxParallelismMap =
214222
jobDetailsInfo.getJobVertexInfos().stream()
215223
.collect(
216224
Collectors.toMap(
@@ -235,7 +243,8 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
235243
d.getJobVertexID(), IOMetrics.from(d.getJobVertexMetrics()));
236244
});
237245

238-
return JobTopology.fromJsonPlan(json, maxParallelismMap, metrics, finished);
246+
return JobTopology.fromJsonPlan(
247+
json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished);
239248
}
240249

241250
private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology)
@@ -254,7 +263,7 @@ private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopol
254263
"Updating source {} max parallelism based on available partitions to {}",
255264
sourceVertex,
256265
numPartitions);
257-
topology.updateMaxParallelism(sourceVertex, (int) numPartitions);
266+
topology.get(sourceVertex).updateMaxParallelism((int) numPartitions);
258267
}
259268
}
260269
}

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+17
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2121
import org.apache.flink.configuration.ConfigOption;
2222
import org.apache.flink.configuration.ConfigOptions;
23+
import org.apache.flink.configuration.MemorySize;
2324

2425
import java.time.Duration;
2526
import java.util.List;
@@ -327,4 +328,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
327328
.defaultValue(Duration.ofSeconds(10))
328329
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
329330
.withDescription("The timeout for waiting the flink rest client to return.");
331+
332+
public static final ConfigOption<MemorySize> MEMORY_QUOTA =
333+
autoScalerConfig("quota.memory")
334+
.memoryType()
335+
.noDefaultValue()
336+
.withFallbackKeys(oldOperatorConfigKey("quota.memory"))
337+
.withDescription(
338+
"Quota of the memory size. When scaling would go beyond this number the the scaling is not going to happen.");
339+
340+
public static final ConfigOption<Double> CPU_QUOTA =
341+
autoScalerConfig("quota.cpu")
342+
.doubleType()
343+
.noDefaultValue()
344+
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
345+
.withDescription(
346+
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");
330347
}

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.autoscaler.topology;
1919

20+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
2021
import org.apache.flink.runtime.jobgraph.JobVertexID;
2122

2223
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -48,6 +49,7 @@ public class JobTopology {
4849
private static final ObjectMapper objectMapper = new ObjectMapper();
4950

5051
@Getter private final Map<JobVertexID, VertexInfo> vertexInfos;
52+
@Getter private final Map<SlotSharingGroupId, Set<JobVertexID>> slotSharingGroupMapping;
5153
@Getter private final Set<JobVertexID> finishedVertices;
5254
@Getter private final List<JobVertexID> verticesInTopologicalOrder;
5355

@@ -66,6 +68,7 @@ public JobTopology(Set<VertexInfo> vertexInfo) {
6668
ImmutableMap.copyOf(
6769
vertexInfo.stream().collect(Collectors.toMap(VertexInfo::getId, v -> v)));
6870

71+
Map<SlotSharingGroupId, Set<JobVertexID>> vertexSlotSharingGroupMapping = new HashMap<>();
6972
var finishedVertices = ImmutableSet.<JobVertexID>builder();
7073

7174
vertexInfo.forEach(
@@ -79,12 +82,21 @@ public JobTopology(Set<VertexInfo> vertexInfo) {
7982
vertexOutputs
8083
.computeIfAbsent(inputId, id -> new HashMap<>())
8184
.put(vertexId, shipStrategy));
85+
86+
var slotSharingGroupId = info.getSlotSharingGroupId();
87+
if (slotSharingGroupId != null) {
88+
vertexSlotSharingGroupMapping
89+
.computeIfAbsent(slotSharingGroupId, id -> new HashSet<>())
90+
.add(vertexId);
91+
}
92+
8293
if (info.isFinished()) {
8394
finishedVertices.add(vertexId);
8495
}
8596
});
8697
vertexOutputs.forEach((v, outputs) -> vertexInfos.get(v).setOutputs(outputs));
8798

99+
this.slotSharingGroupMapping = ImmutableMap.copyOf(vertexSlotSharingGroupMapping);
88100
this.finishedVertices = finishedVertices.build();
89101
this.verticesInTopologicalOrder = returnVerticesInTopologicalOrder();
90102
}
@@ -97,10 +109,6 @@ public boolean isSource(JobVertexID jobVertexID) {
97109
return get(jobVertexID).getInputs().isEmpty();
98110
}
99111

100-
public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) {
101-
get(vertexID).updateMaxParallelism(maxParallelism);
102-
}
103-
104112
private List<JobVertexID> returnVerticesInTopologicalOrder() {
105113
List<JobVertexID> sorted = new ArrayList<>(vertexInfos.size());
106114

@@ -134,6 +142,7 @@ private List<JobVertexID> returnVerticesInTopologicalOrder() {
134142

135143
public static JobTopology fromJsonPlan(
136144
String jsonPlan,
145+
Map<JobVertexID, SlotSharingGroupId> slotSharingGroupIdMap,
137146
Map<JobVertexID, Integer> maxParallelismMap,
138147
Map<JobVertexID, IOMetrics> metrics,
139148
Set<JobVertexID> finishedVertices)
@@ -151,6 +160,7 @@ public static JobTopology fromJsonPlan(
151160
vertexInfo.add(
152161
new VertexInfo(
153162
vertexId,
163+
slotSharingGroupIdMap.get(vertexId),
154164
inputs,
155165
node.get("parallelism").asInt(),
156166
maxParallelismMap.get(vertexId),

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package org.apache.flink.autoscaler.topology;
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
2122
import org.apache.flink.runtime.jobgraph.JobVertexID;
2223

24+
import lombok.AccessLevel;
2325
import lombok.Data;
26+
import lombok.Setter;
2427

2528
import java.util.Map;
2629

@@ -33,11 +36,14 @@ public class VertexInfo {
3336
// All input vertices and the ship_strategy
3437
private final Map<JobVertexID, ShipStrategy> inputs;
3538

39+
private final SlotSharingGroupId slotSharingGroupId;
40+
3641
// All output vertices and the ship_strategy
3742
private Map<JobVertexID, ShipStrategy> outputs;
3843

3944
private final int parallelism;
4045

46+
@Setter(AccessLevel.NONE)
4147
private int maxParallelism;
4248

4349
private final int originalMaxParallelism;
@@ -48,12 +54,14 @@ public class VertexInfo {
4854

4955
public VertexInfo(
5056
JobVertexID id,
57+
SlotSharingGroupId slotSharingGroupId,
5158
Map<JobVertexID, ShipStrategy> inputs,
5259
int parallelism,
5360
int maxParallelism,
5461
boolean finished,
5562
IOMetrics ioMetrics) {
5663
this.id = id;
64+
this.slotSharingGroupId = slotSharingGroupId;
5765
this.inputs = inputs;
5866
this.parallelism = parallelism;
5967
this.maxParallelism = maxParallelism;
@@ -69,7 +77,18 @@ public VertexInfo(
6977
int parallelism,
7078
int maxParallelism,
7179
IOMetrics ioMetrics) {
72-
this(id, inputs, parallelism, maxParallelism, false, ioMetrics);
80+
this(id, null, inputs, parallelism, maxParallelism, false, ioMetrics);
81+
}
82+
83+
@VisibleForTesting
84+
public VertexInfo(
85+
JobVertexID id,
86+
Map<JobVertexID, ShipStrategy> inputs,
87+
int parallelism,
88+
int maxParallelism,
89+
boolean finished,
90+
IOMetrics ioMetrics) {
91+
this(id, null, inputs, parallelism, maxParallelism, finished, ioMetrics);
7392
}
7493

7594
@VisibleForTesting
@@ -82,6 +101,6 @@ public VertexInfo(
82101
}
83102

84103
public void updateMaxParallelism(int maxParallelism) {
85-
setMaxParallelism(Math.min(originalMaxParallelism, maxParallelism));
104+
this.maxParallelism = Math.min(originalMaxParallelism, maxParallelism);
86105
}
87106
}

0 commit comments

Comments
 (0)