Skip to content

Commit 0721d34

Browse files
committed
[FLINK-31215] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators
1 parent ffaa3dd commit 0721d34

File tree

9 files changed

+912
-5
lines changed

9 files changed

+912
-5
lines changed

Diff for: docs/layouts/shortcodes/generated/auto_scaler_configuration.html

+13-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>Duration</td>
1515
<td>Lag threshold which will prevent unnecessary scalings while removing the pending messages responsible for the lag.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>job.autoscaler.bottleneck-propagation.allow-scale-down</h5></td>
19+
<td style="word-wrap: break-word;">false</td>
20+
<td>Boolean</td>
21+
<td>Allow vertices scale down during bottleneck propagation.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>job.autoscaler.catch-up.duration</h5></td>
1925
<td style="word-wrap: break-word;">30 min</td>
@@ -116,6 +122,12 @@
116122
<td>Double</td>
117123
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
118124
</tr>
125+
<tr>
126+
<td><h5>job.autoscaler.processing.rate.backpropagation.enabled</h5></td>
127+
<td style="word-wrap: break-word;">false</td>
128+
<td>Boolean</td>
129+
<td>Enable backpropagation of processing rate during autoscaling to reduce resources usage.</td>
130+
</tr>
119131
<tr>
120132
<td><h5>job.autoscaler.quota.cpu</h5></td>
121133
<td style="word-wrap: break-word;">(none)</td>
@@ -210,7 +222,7 @@
210222
<td><h5>job.autoscaler.vertex.exclude.ids</h5></td>
211223
<td style="word-wrap: break-word;"></td>
212224
<td>List&lt;String&gt;</td>
213-
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.</td>
225+
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.</td>
214226
</tr>
215227
<tr>
216228
<td><h5>job.autoscaler.vertex.max-parallelism</h5></td>

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

+121
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.flink.annotation.VisibleForTesting;
2121
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2222
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
23+
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
2324
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
2425
import org.apache.flink.autoscaler.metrics.ScalingMetric;
26+
import org.apache.flink.autoscaler.topology.JobTopology;
2527
import org.apache.flink.autoscaler.topology.ShipStrategy;
2628
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
2729
import org.apache.flink.configuration.Configuration;
@@ -36,9 +38,11 @@
3638
import java.time.Instant;
3739
import java.time.ZoneId;
3840
import java.util.Collection;
41+
import java.util.List;
3942
import java.util.Map;
4043
import java.util.SortedMap;
4144

45+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED;
4246
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
4347
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
4448
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
@@ -49,6 +53,7 @@
4953
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
5054
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
5155
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
56+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
5257
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
5358
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
5459

@@ -150,6 +155,122 @@ public int computeScaleTargetParallelism(
150155
return newParallelism;
151156
}
152157

158+
public boolean propagateBackpropScaleFactor(
159+
Configuration conf,
160+
JobVertexID vertex,
161+
JobTopology topology,
162+
EvaluatedMetrics evaluatedMetrics,
163+
Map<JobVertexID, Double> backpropScaleFactors,
164+
List<String> excludedVertices) {
165+
166+
double averageTrueProcessingRate =
167+
evaluatedMetrics
168+
.getVertexMetrics()
169+
.get(vertex)
170+
.get(TRUE_PROCESSING_RATE)
171+
.getAverage();
172+
173+
// target parallelism is not defined -> cannot propagate the bottle factor to the upstream
174+
if (Double.isNaN(averageTrueProcessingRate)
175+
|| Double.isInfinite(averageTrueProcessingRate)) {
176+
LOG.debug(
177+
"Unable to backpropagate bottleneck scale factor of vertex {}, average true processing rate is {}",
178+
vertex,
179+
averageTrueProcessingRate);
180+
return false;
181+
}
182+
183+
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
184+
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
185+
186+
double processingRateCapacity =
187+
evaluatedMetrics.getVertexMetrics().get(vertex).get(TARGET_DATA_RATE).getAverage();
188+
189+
if (Double.isNaN(processingRateCapacity)) {
190+
LOG.debug(
191+
"Unable to backpropagate bottleneck scale factor of vertex {}, processing rate capacity is {}",
192+
vertex,
193+
processingRateCapacity);
194+
return false;
195+
}
196+
197+
// if scale down is disabled, the adjusted scale factor cannot be less than the default
198+
// factor
199+
if (!conf.getBoolean(BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED)) {
200+
double scaleFactor = processingRateCapacity / averageTrueProcessingRate;
201+
scaleFactor = Math.max(scaleFactor, minScaleFactor);
202+
minScaleFactor = Math.min(1.0, scaleFactor);
203+
}
204+
205+
// we scaled processing rate capacity by upstream
206+
double currentBackPropFactor = backpropScaleFactors.getOrDefault(vertex, 1.0);
207+
processingRateCapacity *= currentBackPropFactor;
208+
209+
double targetScaleFactor = processingRateCapacity / averageTrueProcessingRate;
210+
211+
if (excludedVertices.contains(vertex.toHexString())) {
212+
LOG.debug(
213+
"Vertex {} is excluded from scaling. Target scale factor is 1.0",
214+
vertex.toHexString());
215+
targetScaleFactor = 1.0;
216+
}
217+
218+
if (targetScaleFactor < minScaleFactor) {
219+
LOG.debug(
220+
"Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
221+
targetScaleFactor,
222+
vertex,
223+
minScaleFactor);
224+
targetScaleFactor = minScaleFactor;
225+
}
226+
if (maxScaleFactor < targetScaleFactor) {
227+
LOG.debug(
228+
"Computed scale factor of {} for {} is capped by maximum scale up factor to {}",
229+
targetScaleFactor,
230+
vertex,
231+
maxScaleFactor);
232+
targetScaleFactor = maxScaleFactor;
233+
}
234+
235+
double maxVertexScaleFactor =
236+
evaluatedMetrics.getVertexMetrics().get(vertex).get(MAX_PARALLELISM).getCurrent()
237+
/ evaluatedMetrics
238+
.getVertexMetrics()
239+
.get(vertex)
240+
.get(PARALLELISM)
241+
.getCurrent();
242+
243+
// check if scaling violates max parallelism cap
244+
if (maxVertexScaleFactor < targetScaleFactor) {
245+
targetScaleFactor = maxVertexScaleFactor;
246+
}
247+
248+
double targetProcessingCapacity = targetScaleFactor * averageTrueProcessingRate;
249+
double adjustedProcessingRateCapacity =
250+
AutoScalerUtils.getInPlaceTargetProcessingCapacity(
251+
evaluatedMetrics, topology, vertex, backpropScaleFactors);
252+
if (Double.isNaN(adjustedProcessingRateCapacity)) {
253+
return false;
254+
}
255+
256+
LOG.debug(
257+
"Vertex {} has target capacity of {} and receives capacity {} from the downstream",
258+
vertex,
259+
targetProcessingCapacity,
260+
adjustedProcessingRateCapacity);
261+
262+
// if the capacity from the upstream vertices exceeds target processing rate ->
263+
// backpropagate scale factor
264+
if (targetProcessingCapacity < adjustedProcessingRateCapacity) {
265+
double adjustFactor = targetProcessingCapacity / adjustedProcessingRateCapacity;
266+
for (var input : topology.getVertexInfos().get(vertex).getInputs().keySet()) {
267+
double factor = backpropScaleFactors.getOrDefault(input, 1.0);
268+
backpropScaleFactors.put(input, factor * adjustFactor);
269+
}
270+
}
271+
return true;
272+
}
273+
153274
private boolean blockScalingBasedOnPastActions(
154275
Context context,
155276
JobVertexID vertex,

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+77
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.SortedMap;
5050

5151
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
52+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
5253
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5354
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5455
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
@@ -57,6 +58,7 @@
5758
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
5859
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
5960
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
61+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
6062
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
6163

6264
/** Class responsible for executing scaling decisions. */
@@ -221,8 +223,14 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
221223
}
222224

223225
var out = new HashMap<JobVertexID, ScalingSummary>();
226+
227+
if (context.getConfiguration().getBoolean(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
228+
backpropagateProcessingRate(context, evaluatedMetrics, restartTime, jobTopology);
229+
}
230+
224231
var excludeVertexIdList =
225232
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
233+
226234
evaluatedMetrics
227235
.getVertexMetrics()
228236
.forEach(
@@ -255,6 +263,75 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
255263
return out;
256264
}
257265

266+
private void backpropagateProcessingRate(
267+
Context context,
268+
EvaluatedMetrics evaluatedMetrics,
269+
Duration restartTime,
270+
JobTopology jobTopology) {
271+
var conf = context.getConfiguration();
272+
var backpropScaleFactors = new HashMap<JobVertexID, Double>();
273+
var excludeVertexIdList =
274+
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
275+
var vertexIterator =
276+
jobTopology
277+
.getVerticesInTopologicalOrder()
278+
.listIterator(jobTopology.getVerticesInTopologicalOrder().size());
279+
280+
boolean canPropagate = true;
281+
282+
// backpropagate scale factors
283+
while (canPropagate && vertexIterator.hasPrevious()) {
284+
var vertex = vertexIterator.previous();
285+
canPropagate =
286+
jobVertexScaler.propagateBackpropScaleFactor(
287+
conf,
288+
vertex,
289+
jobTopology,
290+
evaluatedMetrics,
291+
backpropScaleFactors,
292+
excludeVertexIdList);
293+
}
294+
295+
if (!canPropagate) {
296+
LOG.debug("Cannot properly perform backpropagation because metrics are incomplete");
297+
return;
298+
}
299+
300+
// use an extra map to not lose precision
301+
Map<JobVertexID, Double> adjustedDataRate = new HashMap<>();
302+
303+
// re-evaluating vertices capacity
304+
// Target data rate metric is rewritten for parallelism evaluation
305+
for (var vertex : jobTopology.getVerticesInTopologicalOrder()) {
306+
double adjustedCapacity = 0.0;
307+
308+
if (jobTopology.isSource(vertex)) {
309+
adjustedCapacity +=
310+
evaluatedMetrics
311+
.getVertexMetrics()
312+
.get(vertex)
313+
.get(TARGET_DATA_RATE)
314+
.getAverage()
315+
* backpropScaleFactors.getOrDefault(vertex, 1.0);
316+
} else {
317+
for (var input : jobTopology.getVertexInfos().get(vertex).getInputs().keySet()) {
318+
adjustedCapacity +=
319+
adjustedDataRate.get(input)
320+
* jobTopology
321+
.getVertexInfos()
322+
.get(vertex)
323+
.getInputRatios()
324+
.get(input);
325+
}
326+
}
327+
adjustedDataRate.put(vertex, adjustedCapacity);
328+
evaluatedMetrics
329+
.getVertexMetrics()
330+
.get(vertex)
331+
.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(adjustedCapacity));
332+
}
333+
}
334+
258335
private boolean isJobUnderMemoryPressure(
259336
Context ctx, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics) {
260337

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

+1
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ private void computeTargetDataRate(
346346
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
347347
var outputRatio =
348348
computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
349+
topology.get(vertex).getInputRatios().put(inputVertex, outputRatio);
349350
LOG.debug(
350351
"Computed output ratio for edge ({} -> {}) : {}",
351352
inputVertex,

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,23 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
5858
.withDescription(
5959
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");
6060

61+
public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
62+
autoScalerConfig("processing.rate.backpropagation.enabled")
63+
.booleanType()
64+
.defaultValue(false)
65+
.withFallbackKeys(
66+
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
67+
.withDescription(
68+
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");
69+
70+
public static final ConfigOption<Boolean> BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED =
71+
autoScalerConfig("bottleneck-propagation.allow-scale-down")
72+
.booleanType()
73+
.defaultValue(false)
74+
.withFallbackKeys(
75+
oldOperatorConfigKey("bottleneck-propagation.allow-scale-down"))
76+
.withDescription("Allow vertices scale down during bottleneck propagation.");
77+
6178
public static final ConfigOption<Duration> METRICS_WINDOW =
6279
autoScalerConfig("metrics.window")
6380
.durationType()
@@ -313,7 +330,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
313330
.defaultValues()
314331
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
315332
.withDescription(
316-
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
333+
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.");
317334

318335
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
319336
autoScalerConfig("scaling.event.interval")

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import lombok.Data;
2626
import lombok.Setter;
2727

28+
import java.util.HashMap;
2829
import java.util.Map;
2930

3031
/** Job vertex information. */
@@ -36,6 +37,9 @@ public class VertexInfo {
3637
// All input vertices and the ship_strategy
3738
private final Map<JobVertexID, ShipStrategy> inputs;
3839

40+
// Output ratios from input vertices. Used for backpropagation
41+
private final Map<JobVertexID, Double> inputRatios;
42+
3943
private final SlotSharingGroupId slotSharingGroupId;
4044

4145
// All output vertices and the ship_strategy
@@ -68,6 +72,7 @@ public VertexInfo(
6872
this.originalMaxParallelism = maxParallelism;
6973
this.finished = finished;
7074
this.ioMetrics = ioMetrics;
75+
this.inputRatios = new HashMap<>();
7176
}
7277

7378
@VisibleForTesting

0 commit comments

Comments
 (0)