Skip to content

Commit ea9fc78

Browse files
committed
Add CPU and memory size autoscaler quota
1 parent b8f22bf commit ea9fc78

File tree

24 files changed

+936
-36
lines changed

24 files changed

+936
-36
lines changed

Diff for: .github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
- name: Build with Maven
4545
run: |
4646
set -o pipefail; mvn clean install javadoc:javadoc -Pgenerate-docs | tee ./mvn.log; set +o pipefail
47-
if [[ $(cat ./mvn.log | grep -E -v '(flink-runtime-.*.jar, flink-kubernetes-operator-.*.jar)|(flink-kubernetes-operator-.*.jar, flink-runtime-.*.jar) define 3 overlapping classes' | grep -c "overlapping classes" -) -gt 0 ]];then
47+
if [[ $(cat ./mvn.log | grep -E -v '(flink-runtime-.*.jar, flink-autoscaler-.*.jar) define 2 overlapping classes|(flink-kubernetes-operator-.*.jar, flink-runtime-.*.jar) define 3 overlapping classes' | grep -c "overlapping classes" -) -gt 0 ]];then
4848
echo "Found overlapping classes: "
4949
cat ./mvn.log | grep "overlapping classes"
5050
exit 1

Diff for: docs/layouts/shortcodes/generated/auto_scaler_configuration.html

+12
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@
110110
<td>Double</td>
111111
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
112112
</tr>
113+
<tr>
114+
<td><h5>job.autoscaler.quota.cpu</h5></td>
115+
<td style="word-wrap: break-word;">(none)</td>
116+
<td>Double</td>
117+
<td>Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.</td>
118+
</tr>
119+
<tr>
120+
<td><h5>job.autoscaler.quota.memory</h5></td>
121+
<td style="word-wrap: break-word;">(none)</td>
122+
<td>MemorySize</td>
123+
<td>Quota of the memory size. When scaling would go beyond this number the the scaling is not going to happen.</td>
124+
</tr>
113125
<tr>
114126
<td><h5>job.autoscaler.restart.time</h5></td>
115127
<td style="word-wrap: break-word;">5 min</td>

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+94
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.configuration.Configuration;
3434
import org.apache.flink.configuration.MemorySize;
3535
import org.apache.flink.configuration.TaskManagerOptions;
36+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
3637
import org.apache.flink.runtime.jobgraph.JobVertexID;
3738

3839
import org.slf4j.Logger;
@@ -67,6 +68,9 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
6768
public static final String HEAP_USAGE_MESSAGE =
6869
"Heap Usage %s is above the allowed limit for scaling operations. Please adjust the available memory manually.";
6970

71+
public static final String RESOURCE_QUOTA_REACHED_MESSAGE =
72+
"Resource usage is above the allowed limit for scaling operations. Please adjust the resource quota manually.";
73+
7074
private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
7175

7276
private final JobVertexScaler<KEY, Context> jobVertexScaler;
@@ -115,6 +119,17 @@ public boolean scaleResource(
115119
return false;
116120
}
117121

122+
if (resourceQuotaReached(conf, evaluatedMetrics, scalingSummaries, context)) {
123+
autoScalerEventHandler.handleEvent(
124+
context,
125+
AutoScalerEventHandler.Type.Warning,
126+
"ResourceQuotaReached",
127+
RESOURCE_QUOTA_REACHED_MESSAGE,
128+
null,
129+
conf.get(SCALING_EVENT_INTERVAL));
130+
return false;
131+
}
132+
118133
updateRecommendedParallelism(evaluatedMetrics.getVertexMetrics(), scalingSummaries);
119134

120135
if (checkIfBlockedAndTriggerScalingEvent(context, scalingSummaries, conf, now)) {
@@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget(
199214
return true;
200215
}
201216

217+
protected static boolean resourceQuotaReached(
218+
Configuration conf,
219+
EvaluatedMetrics evaluatedMetrics,
220+
Map<JobVertexID, ScalingSummary> scalingSummaries,
221+
JobAutoScalerContext<?> ctx) {
222+
223+
if (evaluatedMetrics.getJobTopology() == null
224+
|| evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) {
225+
return false;
226+
}
227+
228+
var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA);
229+
var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA);
230+
var tmMemory = ctx.getTaskManagerMemory();
231+
var tmCpu = ctx.getTaskManagerCpu();
232+
233+
if (cpuQuota.isPresent() || memoryQuota.isPresent()) {
234+
var currentSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>();
235+
var newSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>();
236+
for (var e :
237+
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) {
238+
int currentMaxParallelism =
239+
e.getValue().stream()
240+
.filter(scalingSummaries::containsKey)
241+
.mapToInt(v -> scalingSummaries.get(v).getCurrentParallelism())
242+
.max()
243+
.orElse(0);
244+
currentSlotSharingGroupMaxParallelisms.put(e.getKey(), currentMaxParallelism);
245+
int newMaxParallelism =
246+
e.getValue().stream()
247+
.filter(scalingSummaries::containsKey)
248+
.mapToInt(v -> scalingSummaries.get(v).getNewParallelism())
249+
.max()
250+
.orElse(0);
251+
newSlotSharingGroupMaxParallelisms.put(e.getKey(), newMaxParallelism);
252+
}
253+
254+
var numSlotsPerTm = conf.get(TaskManagerOptions.NUM_TASK_SLOTS);
255+
var currentTotalSlots =
256+
currentSlotSharingGroupMaxParallelisms.values().stream()
257+
.mapToInt(Integer::intValue)
258+
.sum();
259+
var currentNumTms = currentTotalSlots / numSlotsPerTm;
260+
var newTotalSlots =
261+
newSlotSharingGroupMaxParallelisms.values().stream()
262+
.mapToInt(Integer::intValue)
263+
.sum();
264+
var newNumTms = newTotalSlots / numSlotsPerTm;
265+
266+
if (newNumTms <= currentNumTms) {
267+
LOG.debug(
268+
"Skipping quota check due to new resource allocation is less or equals than the current");
269+
return false;
270+
}
271+
272+
if (cpuQuota.isPresent() && tmCpu.isPresent()) {
273+
LOG.debug("CPU resource quota is {}, checking limits", cpuQuota.get());
274+
double totalCPU = tmCpu.get() * newNumTms;
275+
if (totalCPU > cpuQuota.get()) {
276+
LOG.info("CPU resource quota reached with value: {}", totalCPU);
277+
return true;
278+
}
279+
}
280+
281+
if (memoryQuota.isPresent() && tmMemory.isPresent()) {
282+
LOG.debug("Memory resource quota is {}, checking limits", memoryQuota.get());
283+
long totalMemory = tmMemory.get().getBytes() * newNumTms;
284+
if (totalMemory > memoryQuota.get().getBytes()) {
285+
LOG.info(
286+
"Memory resource quota reached with value: {}",
287+
new MemorySize(totalMemory));
288+
return true;
289+
}
290+
}
291+
}
292+
293+
return false;
294+
}
295+
202296
@VisibleForTesting
203297
Map<JobVertexID, ScalingSummary> computeScalingSummary(
204298
Context context,

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,15 @@ protected JobTopology getJobTopology(
210210
@VisibleForTesting
211211
@SneakyThrows
212212
protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
213-
Map<JobVertexID, Integer> maxParallelismMap =
213+
var slotSharingGroupIdMap =
214+
jobDetailsInfo.getJobVertexInfos().stream()
215+
.filter(e -> e.getSlotSharingGroupId() != null)
216+
.collect(
217+
Collectors.toMap(
218+
JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID,
219+
JobDetailsInfo.JobVertexDetailsInfo
220+
::getSlotSharingGroupId));
221+
var maxParallelismMap =
214222
jobDetailsInfo.getJobVertexInfos().stream()
215223
.collect(
216224
Collectors.toMap(
@@ -235,7 +243,8 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
235243
d.getJobVertexID(), IOMetrics.from(d.getJobVertexMetrics()));
236244
});
237245

238-
return JobTopology.fromJsonPlan(json, maxParallelismMap, metrics, finished);
246+
return JobTopology.fromJsonPlan(
247+
json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished);
239248
}
240249

241250
private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology)
@@ -254,7 +263,7 @@ private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopol
254263
"Updating source {} max parallelism based on available partitions to {}",
255264
sourceVertex,
256265
numPartitions);
257-
topology.updateMaxParallelism(sourceVertex, (int) numPartitions);
266+
topology.get(sourceVertex).setMaxParallelism((int) numPartitions);
258267
}
259268
}
260269
}

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public EvaluatedMetrics evaluate(
9191
}
9292

9393
var globalMetrics = evaluateGlobalMetrics(metricsHistory);
94-
return new EvaluatedMetrics(scalingOutput, globalMetrics);
94+
return new EvaluatedMetrics(
95+
collectedMetrics.getJobTopology(), scalingOutput, globalMetrics);
9596
}
9697

9798
@VisibleForTesting

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+17
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.autoscaler.metrics.MetricAggregator;
2121
import org.apache.flink.configuration.ConfigOption;
2222
import org.apache.flink.configuration.ConfigOptions;
23+
import org.apache.flink.configuration.MemorySize;
2324

2425
import java.time.Duration;
2526
import java.util.List;
@@ -319,4 +320,20 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
319320
.defaultValue(Duration.ofSeconds(10))
320321
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
321322
.withDescription("The timeout for waiting the flink rest client to return.");
323+
324+
public static final ConfigOption<MemorySize> MEMORY_QUOTA =
325+
autoScalerConfig("quota.memory")
326+
.memoryType()
327+
.noDefaultValue()
328+
.withFallbackKeys(oldOperatorConfigKey("quota.memory"))
329+
.withDescription(
330+
"Quota of the memory size. When scaling would go beyond this number the the scaling is not going to happen.");
331+
332+
public static final ConfigOption<Double> CPU_QUOTA =
333+
autoScalerConfig("quota.cpu")
334+
.doubleType()
335+
.noDefaultValue()
336+
.withFallbackKeys(oldOperatorConfigKey("quota.cpu"))
337+
.withDescription(
338+
"Quota of the CPU count. When scaling would go beyond this number the the scaling is not going to happen.");
322339
}

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedMetrics.java

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.autoscaler.metrics;
1919

20+
import org.apache.flink.autoscaler.topology.JobTopology;
2021
import org.apache.flink.runtime.jobgraph.JobVertexID;
2122

2223
import lombok.AllArgsConstructor;
@@ -30,6 +31,7 @@
3031
@NoArgsConstructor
3132
@AllArgsConstructor
3233
public class EvaluatedMetrics {
34+
private JobTopology jobTopology;
3335
private Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> vertexMetrics;
3436
private Map<ScalingMetric, EvaluatedScalingMetric> globalMetrics;
3537
}

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.autoscaler.topology;
1919

20+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
2021
import org.apache.flink.runtime.jobgraph.JobVertexID;
2122

2223
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -48,6 +49,7 @@ public class JobTopology {
4849
private static final ObjectMapper objectMapper = new ObjectMapper();
4950

5051
@Getter private final Map<JobVertexID, VertexInfo> vertexInfos;
52+
@Getter private final Map<SlotSharingGroupId, Set<JobVertexID>> slotSharingGroupMapping;
5153
@Getter private final Set<JobVertexID> finishedVertices;
5254
@Getter private final List<JobVertexID> verticesInTopologicalOrder;
5355

@@ -66,6 +68,7 @@ public JobTopology(Set<VertexInfo> vertexInfo) {
6668
ImmutableMap.copyOf(
6769
vertexInfo.stream().collect(Collectors.toMap(VertexInfo::getId, v -> v)));
6870

71+
Map<SlotSharingGroupId, Set<JobVertexID>> vertexSlotSharingGroupMapping = new HashMap<>();
6972
var finishedVertices = ImmutableSet.<JobVertexID>builder();
7073

7174
vertexInfo.forEach(
@@ -79,12 +82,21 @@ public JobTopology(Set<VertexInfo> vertexInfo) {
7982
vertexOutputs
8083
.computeIfAbsent(inputId, id -> new HashMap<>())
8184
.put(vertexId, shipStrategy));
85+
86+
var slotSharingGroupId = info.getSlotSharingGroupId();
87+
if (slotSharingGroupId != null) {
88+
vertexSlotSharingGroupMapping
89+
.computeIfAbsent(slotSharingGroupId, id -> new HashSet<>())
90+
.add(vertexId);
91+
}
92+
8293
if (info.isFinished()) {
8394
finishedVertices.add(vertexId);
8495
}
8596
});
8697
vertexOutputs.forEach((v, outputs) -> vertexInfos.get(v).setOutputs(outputs));
8798

99+
this.slotSharingGroupMapping = ImmutableMap.copyOf(vertexSlotSharingGroupMapping);
88100
this.finishedVertices = finishedVertices.build();
89101
this.verticesInTopologicalOrder = returnVerticesInTopologicalOrder();
90102
}
@@ -97,10 +109,6 @@ public boolean isSource(JobVertexID jobVertexID) {
97109
return get(jobVertexID).getInputs().isEmpty();
98110
}
99111

100-
public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) {
101-
get(vertexID).updateMaxParallelism(maxParallelism);
102-
}
103-
104112
private List<JobVertexID> returnVerticesInTopologicalOrder() {
105113
List<JobVertexID> sorted = new ArrayList<>(vertexInfos.size());
106114

@@ -134,6 +142,7 @@ private List<JobVertexID> returnVerticesInTopologicalOrder() {
134142

135143
public static JobTopology fromJsonPlan(
136144
String jsonPlan,
145+
Map<JobVertexID, SlotSharingGroupId> slotSharingGroupIdMap,
137146
Map<JobVertexID, Integer> maxParallelismMap,
138147
Map<JobVertexID, IOMetrics> metrics,
139148
Set<JobVertexID> finishedVertices)
@@ -151,6 +160,7 @@ public static JobTopology fromJsonPlan(
151160
vertexInfo.add(
152161
new VertexInfo(
153162
vertexId,
163+
slotSharingGroupIdMap.get(vertexId),
154164
inputs,
155165
node.get("parallelism").asInt(),
156166
maxParallelismMap.get(vertexId),

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.autoscaler.topology;
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
2122
import org.apache.flink.runtime.jobgraph.JobVertexID;
2223

2324
import lombok.Data;
@@ -33,6 +34,8 @@ public class VertexInfo {
3334
// All input vertices and the ship_strategy
3435
private final Map<JobVertexID, ShipStrategy> inputs;
3536

37+
private final SlotSharingGroupId slotSharingGroupId;
38+
3639
// All output vertices and the ship_strategy
3740
private Map<JobVertexID, ShipStrategy> outputs;
3841

@@ -48,12 +51,14 @@ public class VertexInfo {
4851

4952
public VertexInfo(
5053
JobVertexID id,
54+
SlotSharingGroupId slotSharingGroupId,
5155
Map<JobVertexID, ShipStrategy> inputs,
5256
int parallelism,
5357
int maxParallelism,
5458
boolean finished,
5559
IOMetrics ioMetrics) {
5660
this.id = id;
61+
this.slotSharingGroupId = slotSharingGroupId;
5762
this.inputs = inputs;
5863
this.parallelism = parallelism;
5964
this.maxParallelism = maxParallelism;
@@ -69,19 +74,26 @@ public VertexInfo(
6974
int parallelism,
7075
int maxParallelism,
7176
IOMetrics ioMetrics) {
72-
this(id, inputs, parallelism, maxParallelism, false, ioMetrics);
77+
this(id, null, inputs, parallelism, maxParallelism, false, ioMetrics);
7378
}
7479

7580
@VisibleForTesting
7681
public VertexInfo(
7782
JobVertexID id,
7883
Map<JobVertexID, ShipStrategy> inputs,
7984
int parallelism,
80-
int maxParallelism) {
81-
this(id, inputs, parallelism, maxParallelism, null);
85+
int maxParallelism,
86+
boolean finished,
87+
IOMetrics ioMetrics) {
88+
this(id, null, inputs, parallelism, maxParallelism, finished, ioMetrics);
8289
}
8390

84-
public void updateMaxParallelism(int maxParallelism) {
85-
setMaxParallelism(Math.min(originalMaxParallelism, maxParallelism));
91+
@VisibleForTesting
92+
public VertexInfo(
93+
JobVertexID id,
94+
Map<JobVertexID, ShipStrategy> inputs,
95+
int parallelism,
96+
int maxParallelism) {
97+
this(id, inputs, parallelism, maxParallelism, null);
8698
}
8799
}

0 commit comments

Comments
 (0)