Skip to content

Commit d526174

Browse files
authored
[FLINK-34152] Add an option to scale memory when downscaling (#786)
This adds an option to increase heap memory when removing TaskManagers. The scaling is applied after adjusting the memory pools (heap, metaspace, network, managed) and only affects heap memory. The reason for adding this functionality is that the likelihood of running into memory constrained scenarios when downscaling is increased after applying memory tuning. As a precaution, we temporarily increase the memory up to the maximum allowed TaskManager memory.
1 parent b8f22bf commit d526174

File tree

9 files changed

+314
-21
lines changed

9 files changed

+314
-21
lines changed

Diff for: docs/layouts/shortcodes/generated/auto_scaler_configuration.html

+6
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@
8080
<td>Double</td>
8181
<td>Overhead to add to tuning decisions (0-1). This ensures spare capacity and allows the memory to grow beyond the dynamically computed limits, but never beyond the original memory limits.</td>
8282
</tr>
83+
<tr>
84+
<td><h5>job.autoscaler.memory.tuning.scale-down-compensation.enabled</h5></td>
85+
<td style="word-wrap: break-word;">true</td>
86+
<td>Boolean</td>
87+
<td>If this option is enabled and memory tuning is enabled, TaskManager memory will be increased when scaling down. This ensures that after applying memory tuning there is sufficient memory when running with fewer TaskManagers.</td>
88+
</tr>
8389
<tr>
8490
<td><h5>job.autoscaler.metrics.busy-time.aggregator</h5></td>
8591
<td style="word-wrap: break-word;">MAX</td>

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,15 @@ public boolean scaleResource(
122122
}
123123

124124
var configOverrides =
125-
MemoryTuning.tuneTaskManagerHeapMemory(
125+
MemoryTuning.tuneTaskManagerMemory(
126126
context,
127127
evaluatedMetrics,
128128
jobTopology,
129129
scalingSummaries,
130130
autoScalerEventHandler);
131131

132132
if (scalingWouldExceedClusterResources(
133-
configOverrides.applyOverrides(conf),
133+
configOverrides.newConfigWithOverrides(conf),
134134
evaluatedMetrics,
135135
scalingSummaries,
136136
context)) {

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+9
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
258258
.withDescription(
259259
"If enabled, the initial amount of memory specified for TaskManagers will be reduced/increased according to the observed needs.");
260260

261+
public static final ConfigOption<Boolean> MEMORY_SCALING_ENABLED =
262+
autoScalerConfig("memory.tuning.scale-down-compensation.enabled")
263+
.booleanType()
264+
.defaultValue(true)
265+
.withFallbackKeys(
266+
oldOperatorConfigKey("memory.tuning.scale-down-compensation.enabled"))
267+
.withDescription(
268+
"If this option is enabled and memory tuning is enabled, TaskManager memory will be increased when scaling down. This ensures that after applying memory tuning there is sufficient memory when running with fewer TaskManagers.");
269+
261270
public static final ConfigOption<Double> MEMORY_TUNING_OVERHEAD =
262271
autoScalerConfig("memory.tuning.overhead")
263272
.doubleType()

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public ConfigChanges addRemoval(ConfigOption<?> configOption) {
5656
return this;
5757
}
5858

59-
public Configuration applyOverrides(Configuration existingConfig) {
59+
public Configuration newConfigWithOverrides(Configuration existingConfig) {
6060
Configuration config = new Configuration(existingConfig);
6161
for (String key : removals) {
6262
config.removeKey(key);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.tuning;
19+
20+
import org.apache.flink.autoscaler.JobAutoScalerContext;
21+
import org.apache.flink.autoscaler.ScalingSummary;
22+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
23+
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
24+
import org.apache.flink.autoscaler.metrics.ScalingMetric;
25+
import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
26+
import org.apache.flink.configuration.Configuration;
27+
import org.apache.flink.configuration.MemorySize;
28+
import org.apache.flink.configuration.TaskManagerOptions;
29+
import org.apache.flink.runtime.jobgraph.JobVertexID;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.util.Map;
35+
36+
/**
37+
* Memory scaling ensures that memory is scaled alongside with the number of available TaskManagers.
38+
*
39+
* <p>When scaling down, TaskManagers are removed which can drastically limit the amount of
40+
* available memory. To mitigate this issue, we keep the total cluster memory constant, until we can
41+
* measure the actual needed memory usage.
42+
*
43+
* <p>When scaling up, i.e. adding more TaskManagers, we don't remove memory to ensure that we do
44+
* not run into memory-constrained scenarios. However, MemoryTuning will still be applied which can
45+
* result in a lower TaskManager memory baseline.
46+
*/
47+
public class MemoryScaling {
48+
49+
private static final Logger LOG = LoggerFactory.getLogger(MemoryScaling.class);
50+
51+
/**
52+
* Scales the amount of memory per TaskManager proportionally to the number of TaskManagers
53+
* removed/added.
54+
*/
55+
public static MemorySize applyMemoryScaling(
56+
MemorySize currentMemorySize,
57+
MemoryBudget memoryBudget,
58+
JobAutoScalerContext<?> context,
59+
Map<JobVertexID, ScalingSummary> scalingSummaries,
60+
EvaluatedMetrics evaluatedMetrics) {
61+
62+
if (!context.getConfiguration().get(AutoScalerOptions.MEMORY_SCALING_ENABLED)) {
63+
return currentMemorySize;
64+
}
65+
66+
double memScalingFactor =
67+
getMemoryScalingFactor(
68+
evaluatedMetrics, scalingSummaries, context.getConfiguration());
69+
70+
long additionalBytes =
71+
memoryBudget.budget(
72+
(long) (memScalingFactor * currentMemorySize.getBytes())
73+
- currentMemorySize.getBytes());
74+
75+
MemorySize scaledTotalMemory =
76+
new MemorySize(currentMemorySize.getBytes() + additionalBytes);
77+
78+
LOG.info(
79+
"Scaling factor: {}, Adjusting memory from {} to {}.",
80+
memScalingFactor,
81+
currentMemorySize,
82+
scaledTotalMemory);
83+
84+
return scaledTotalMemory;
85+
}
86+
87+
/**
88+
* Returns a factor for scaling the total amount of process memory when the number of
89+
* TaskManagers change.
90+
*/
91+
private static double getMemoryScalingFactor(
92+
EvaluatedMetrics evaluatedMetrics,
93+
Map<JobVertexID, ScalingSummary> scalingSummaries,
94+
Configuration config) {
95+
int numTaskSlotsUsed =
96+
(int)
97+
evaluatedMetrics
98+
.getGlobalMetrics()
99+
.get(ScalingMetric.NUM_TASK_SLOTS_USED)
100+
.getCurrent();
101+
int numTaskSlotsAfterRescale =
102+
ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
103+
evaluatedMetrics.getVertexMetrics(), scalingSummaries, numTaskSlotsUsed);
104+
105+
int numTaskSlotsPerTM = config.get(TaskManagerOptions.NUM_TASK_SLOTS);
106+
107+
int numTMsBeforeRescale = (int) Math.ceil(numTaskSlotsUsed / (double) numTaskSlotsPerTM);
108+
int numTMsAfterRescale =
109+
(int) Math.ceil(numTaskSlotsAfterRescale / (double) numTaskSlotsPerTM);
110+
111+
// Only add memory, don't remove any
112+
return Math.max(1, numTMsBeforeRescale / (double) numTMsAfterRescale);
113+
}
114+
}

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ public class MemoryTuning {
6868

6969
/**
7070
* Emits a Configuration which contains overrides for the current configuration. We are not
71-
* modifying the config directly, but we are emitting a new configuration which contains any
72-
* overrides. This config is persisted separately and applied by the autoscaler. That way we can
73-
* clear any applied overrides if auto-tuning is disabled.
71+
* modifying the config directly, but we are emitting ConfigChanges which contain any overrides
72+
* or removals. This config is persisted separately and applied by the autoscaler. That way we
73+
* can clear any applied overrides if auto-tuning is disabled.
7474
*/
75-
public static ConfigChanges tuneTaskManagerHeapMemory(
75+
public static ConfigChanges tuneTaskManagerMemory(
7676
JobAutoScalerContext<?> context,
7777
EvaluatedMetrics evaluatedMetrics,
7878
JobTopology jobTopology,
@@ -108,8 +108,9 @@ public static ConfigChanges tuneTaskManagerHeapMemory(
108108
LOG.warn("Spec TaskManager memory size could not be determined.");
109109
return EMPTY_CONFIG;
110110
}
111+
111112
MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
112-
// Add these current settings from the budget
113+
// Budget the original spec's memory settings which we do not modify
113114
memBudget.budget(memSpecs.getFlinkMemory().getFrameworkOffHeap().getBytes());
114115
memBudget.budget(memSpecs.getFlinkMemory().getTaskOffHeap().getBytes());
115116
memBudget.budget(memSpecs.getJvmOverheadSize().getBytes());
@@ -134,6 +135,10 @@ public static ConfigChanges tuneTaskManagerHeapMemory(
134135
specManagedSize,
135136
config,
136137
memBudget);
138+
// Rescale heap according to scaling decision after distributing all memory pools
139+
newHeapSize =
140+
MemoryScaling.applyMemoryScaling(
141+
newHeapSize, memBudget, context, scalingSummaries, evaluatedMetrics);
137142
LOG.info(
138143
"Optimized memory sizes: heap: {} managed: {}, network: {}, meta: {}",
139144
newHeapSize.toHumanReadableString(),
@@ -165,6 +170,7 @@ public static ConfigChanges tuneTaskManagerHeapMemory(
165170
// memory pools, there are no fractional variants for heap memory. Setting the absolute heap
166171
// memory options could cause invalid configuration states when users adapt the total amount
167172
// of memory. We also need to take care to remove any user-provided overrides for those.
173+
tuningConfig.addRemoval(TaskManagerOptions.TOTAL_FLINK_MEMORY);
168174
tuningConfig.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
169175
// Set default to zero because we already account for heap via task heap.
170176
tuningConfig.addOverride(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.ZERO);
@@ -233,7 +239,8 @@ private static MemorySize adjustManagedMemory(
233239
long maxManagedMemorySize = memBudget.budget(Long.MAX_VALUE);
234240
return new MemorySize(maxManagedMemorySize);
235241
} else {
236-
return managedMemoryConfigured;
242+
long managedMemorySize = memBudget.budget(managedMemoryConfigured.getBytes());
243+
return new MemorySize(managedMemorySize);
237244
}
238245
}
239246

@@ -322,9 +329,10 @@ static int calculateNetworkSegmentNumber(
322329

323330
private static MemorySize getUsage(
324331
ScalingMetric scalingMetric, Map<ScalingMetric, EvaluatedScalingMetric> globalMetrics) {
325-
MemorySize heapUsed = new MemorySize((long) globalMetrics.get(scalingMetric).getAverage());
326-
LOG.debug("{}: {}", scalingMetric, heapUsed);
327-
return heapUsed;
332+
MemorySize memoryUsed =
333+
new MemorySize((long) globalMetrics.get(scalingMetric).getAverage());
334+
LOG.debug("{}: {}", scalingMetric, memoryUsed);
335+
return memoryUsed;
328336
}
329337

330338
public static MemorySize getTotalMemory(Configuration config, JobAutoScalerContext<?> ctx) {

Diff for: flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,11 @@ public void testMemoryTuning() throws Exception {
362362
TaskManagerOptions.JVM_METASPACE.key(),
363363
"360 mb",
364364
TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
365-
"0.134",
365+
"0.053",
366366
TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
367367
"0 bytes",
368368
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
369-
"7864064 kb"));
369+
"20400832696 bytes"));
370370
}
371371

372372
@ParameterizedTest
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.tuning;
19+
20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.autoscaler.JobAutoScalerContext;
22+
import org.apache.flink.autoscaler.ScalingSummary;
23+
import org.apache.flink.autoscaler.TestingAutoscalerUtils;
24+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
25+
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
26+
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
27+
import org.apache.flink.autoscaler.metrics.ScalingMetric;
28+
import org.apache.flink.configuration.MemorySize;
29+
import org.apache.flink.runtime.jobgraph.JobVertexID;
30+
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
34+
import java.util.Map;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
class MemoryScalingTest {
39+
40+
JobAutoScalerContext<JobID> context = TestingAutoscalerUtils.createResourceAwareContext();
41+
42+
@BeforeEach
43+
void setup() {
44+
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
45+
}
46+
47+
@Test
48+
void testMemoryScalingDownscaling() {
49+
int currentParallelism = 20;
50+
int rescaleParallelism = 10;
51+
MemorySize currentMemory = MemorySize.parse("10 gb");
52+
MemoryBudget memoryBudget = new MemoryBudget(MemorySize.parse("30 gb").getBytes());
53+
54+
assertThat(
55+
runMemoryScaling(
56+
currentParallelism,
57+
rescaleParallelism,
58+
context,
59+
currentMemory,
60+
memoryBudget))
61+
.isEqualTo(MemorySize.parse("20 gb"));
62+
}
63+
64+
@Test
65+
void testMemoryScalingUpscaling() {
66+
int currentParallelism = 10;
67+
int rescaleParallelism = 20;
68+
MemorySize currentMemory = MemorySize.parse("10 gb");
69+
MemoryBudget memoryBudget = new MemoryBudget(MemorySize.parse("30 gb").getBytes());
70+
71+
assertThat(
72+
runMemoryScaling(
73+
currentParallelism,
74+
rescaleParallelism,
75+
context,
76+
currentMemory,
77+
memoryBudget))
78+
.isEqualTo(MemorySize.parse("10 gb"));
79+
}
80+
81+
@Test
82+
void testMemoryScalingDisabled() {
83+
context.getConfiguration().set(AutoScalerOptions.MEMORY_SCALING_ENABLED, false);
84+
MemorySize currentMemory = MemorySize.parse("10 gb");
85+
MemoryBudget memoryBudget = new MemoryBudget(MemorySize.parse("30 gb").getBytes());
86+
87+
assertThat(
88+
MemoryScaling.applyMemoryScaling(
89+
currentMemory, memoryBudget, context, Map.of(), null))
90+
.isEqualTo(currentMemory);
91+
}
92+
93+
private static MemorySize runMemoryScaling(
94+
int currentParallelism,
95+
int rescaleParallelism,
96+
JobAutoScalerContext<JobID> context,
97+
MemorySize currentMemory,
98+
MemoryBudget memoryBudget) {
99+
var globalMetrics =
100+
Map.of(
101+
ScalingMetric.NUM_TASK_SLOTS_USED,
102+
EvaluatedScalingMetric.of(currentParallelism));
103+
var jobVertex1 = new JobVertexID();
104+
var jobVertex2 = new JobVertexID();
105+
var vertexMetrics =
106+
Map.of(
107+
jobVertex1,
108+
Map.of(
109+
ScalingMetric.PARALLELISM,
110+
EvaluatedScalingMetric.of(currentParallelism)),
111+
jobVertex2,
112+
Map.of(
113+
ScalingMetric.PARALLELISM,
114+
EvaluatedScalingMetric.of(currentParallelism)));
115+
var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
116+
117+
Map<JobVertexID, ScalingSummary> scalingSummaries =
118+
Map.of(
119+
jobVertex1,
120+
new ScalingSummary(
121+
currentParallelism, rescaleParallelism, Map.of()),
122+
jobVertex2,
123+
new ScalingSummary(
124+
currentParallelism, rescaleParallelism, Map.of()));
125+
126+
return MemoryScaling.applyMemoryScaling(
127+
currentMemory, memoryBudget, context, scalingSummaries, metrics);
128+
}
129+
}

0 commit comments

Comments
 (0)