Skip to content

Commit 6fa9fff

Browse files
committed
[FLINK-31215] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators
1 parent ffaa3dd commit 6fa9fff

File tree

9 files changed

+820
-5
lines changed

9 files changed

+820
-5
lines changed

Diff for: docs/layouts/shortcodes/generated/auto_scaler_configuration.html

+13-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>Duration</td>
1515
<td>Lag threshold which will prevent unnecessary scalings while removing the pending messages responsible for the lag.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>job.autoscaler.bottleneck-propagation.allow-scale-down</h5></td>
19+
<td style="word-wrap: break-word;">false</td>
20+
<td>Boolean</td>
21+
<td>Allow vertices scale down during bottleneck propagation.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>job.autoscaler.catch-up.duration</h5></td>
1925
<td style="word-wrap: break-word;">30 min</td>
@@ -116,6 +122,12 @@
116122
<td>Double</td>
117123
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
118124
</tr>
125+
<tr>
126+
<td><h5>job.autoscaler.processing.rate.backpropagation.enabled</h5></td>
127+
<td style="word-wrap: break-word;">false</td>
128+
<td>Boolean</td>
129+
<td>Enable backpropagation of processing rate during autoscaling to reduce resources usage.</td>
130+
</tr>
119131
<tr>
120132
<td><h5>job.autoscaler.quota.cpu</h5></td>
121133
<td style="word-wrap: break-word;">(none)</td>
@@ -210,7 +222,7 @@
210222
<td><h5>job.autoscaler.vertex.exclude.ids</h5></td>
211223
<td style="word-wrap: break-word;"></td>
212224
<td>List&lt;String&gt;</td>
213-
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.</td>
225+
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.</td>
214226
</tr>
215227
<tr>
216228
<td><h5>job.autoscaler.vertex.max-parallelism</h5></td>

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

+93
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.flink.annotation.VisibleForTesting;
2121
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2222
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
23+
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
2324
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
2425
import org.apache.flink.autoscaler.metrics.ScalingMetric;
26+
import org.apache.flink.autoscaler.topology.JobTopology;
2527
import org.apache.flink.autoscaler.topology.ShipStrategy;
2628
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
2729
import org.apache.flink.configuration.Configuration;
@@ -36,9 +38,11 @@
3638
import java.time.Instant;
3739
import java.time.ZoneId;
3840
import java.util.Collection;
41+
import java.util.List;
3942
import java.util.Map;
4043
import java.util.SortedMap;
4144

45+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED;
4246
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
4347
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
4448
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
@@ -49,6 +53,7 @@
4953
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
5054
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
5155
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
56+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
5257
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
5358
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
5459

@@ -150,6 +155,94 @@ public int computeScaleTargetParallelism(
150155
return newParallelism;
151156
}
152157

158+
public boolean propagateBackpropScaleFactor(
159+
Configuration conf,
160+
JobVertexID vertex,
161+
JobTopology topology,
162+
EvaluatedMetrics evaluatedMetrics,
163+
Map<JobVertexID, Double> backpropScaleFactors,
164+
List<String> excludedVertices) {
165+
166+
double averageTrueProcessingRate =
167+
evaluatedMetrics
168+
.getVertexMetrics()
169+
.get(vertex)
170+
.get(TRUE_PROCESSING_RATE)
171+
.getAverage();
172+
if (Double.isNaN(averageTrueProcessingRate)) {
173+
return false;
174+
}
175+
176+
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
177+
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
178+
179+
double processingRateCapacity =
180+
evaluatedMetrics.getVertexMetrics().get(vertex).get(TARGET_DATA_RATE).getAverage();
181+
182+
if (Double.isNaN(processingRateCapacity)) {
183+
return false;
184+
}
185+
186+
// if scale down is disabled, the adjusted scale factor cannot be less than the default
187+
// factor
188+
if (!conf.getBoolean(BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED)) {
189+
double scaleFactor = processingRateCapacity / averageTrueProcessingRate;
190+
scaleFactor = Math.max(scaleFactor, minScaleFactor);
191+
minScaleFactor = Math.min(1.0, scaleFactor);
192+
}
193+
194+
// we scaled processing rate capacity by upstream
195+
double currentBackPropFactor = backpropScaleFactors.getOrDefault(vertex, 1.0);
196+
processingRateCapacity *= currentBackPropFactor;
197+
198+
double targetScaleFactor = processingRateCapacity / averageTrueProcessingRate;
199+
200+
if (excludedVertices.contains(vertex.toHexString())) {
201+
LOG.debug(
202+
"Vertex {} is excluded from scaling. Target scale factor is 1.0",
203+
vertex.toHexString());
204+
targetScaleFactor = 1.0;
205+
}
206+
207+
if (targetScaleFactor < minScaleFactor) {
208+
LOG.debug(
209+
"Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
210+
targetScaleFactor,
211+
vertex,
212+
minScaleFactor);
213+
targetScaleFactor = minScaleFactor;
214+
}
215+
if (maxScaleFactor < targetScaleFactor) {
216+
LOG.debug(
217+
"Computed scale factor of {} for {} is capped by maximum scale up factor to {}",
218+
targetScaleFactor,
219+
vertex,
220+
maxScaleFactor);
221+
targetScaleFactor = maxScaleFactor;
222+
}
223+
224+
// capacity the vertex can process with target scale factor
225+
double limitedProcessingCapacity = targetScaleFactor * averageTrueProcessingRate;
226+
227+
double adjustedProcessingRateCapacity =
228+
AutoScalerUtils.getInPlaceTargetProcessingCapacity(
229+
evaluatedMetrics, topology, vertex, backpropScaleFactors);
230+
if (Double.isNaN(adjustedProcessingRateCapacity)) {
231+
return false;
232+
}
233+
234+
// if the capacity from the upstream vertices exceeds target processing rate ->
235+
// backpropagate scale factor
236+
if (limitedProcessingCapacity < adjustedProcessingRateCapacity) {
237+
double adjustFactor = limitedProcessingCapacity / adjustedProcessingRateCapacity;
238+
for (var input : topology.getVertexInfos().get(vertex).getInputs().keySet()) {
239+
double factor = backpropScaleFactors.getOrDefault(input, 1.0);
240+
backpropScaleFactors.put(input, factor * adjustFactor);
241+
}
242+
}
243+
return true;
244+
}
245+
153246
private boolean blockScalingBasedOnPastActions(
154247
Context context,
155248
JobVertexID vertex,

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+76
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.SortedMap;
5050

5151
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
52+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
5253
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5354
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5455
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
@@ -57,6 +58,7 @@
5758
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
5859
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
5960
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
61+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
6062
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
6163

6264
/** Class responsible for executing scaling decisions. */
@@ -221,8 +223,14 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
221223
}
222224

223225
var out = new HashMap<JobVertexID, ScalingSummary>();
226+
227+
if (context.getConfiguration().getBoolean(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
228+
backpropagateProcessingRate(context, evaluatedMetrics, restartTime, jobTopology);
229+
}
230+
224231
var excludeVertexIdList =
225232
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
233+
226234
evaluatedMetrics
227235
.getVertexMetrics()
228236
.forEach(
@@ -255,6 +263,74 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
255263
return out;
256264
}
257265

266+
private void backpropagateProcessingRate(
267+
Context context,
268+
EvaluatedMetrics evaluatedMetrics,
269+
Duration restartTime,
270+
JobTopology jobTopology) {
271+
var conf = context.getConfiguration();
272+
var backpropScaleFactors = new HashMap<JobVertexID, Double>();
273+
var excludeVertexIdList =
274+
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
275+
var vertexIterator =
276+
jobTopology
277+
.getVerticesInTopologicalOrder()
278+
.listIterator(jobTopology.getVerticesInTopologicalOrder().size());
279+
280+
boolean canPropagate = true;
281+
282+
// backpropagate scale factors
283+
while (canPropagate && vertexIterator.hasPrevious()) {
284+
var vertex = vertexIterator.previous();
285+
canPropagate =
286+
jobVertexScaler.propagateBackpropScaleFactor(
287+
conf,
288+
vertex,
289+
jobTopology,
290+
evaluatedMetrics,
291+
backpropScaleFactors,
292+
excludeVertexIdList);
293+
}
294+
295+
if (!canPropagate) {
296+
return;
297+
}
298+
299+
// use an extra map to not lose precision
300+
Map<JobVertexID, Double> adjustedDataRate = new HashMap<>();
301+
302+
// re-evaluating vertices capacity
303+
// Target data rate metric is rewritten for parallelism evaluation
304+
for (var vertex : jobTopology.getVerticesInTopologicalOrder()) {
305+
double adjustedCapacity = 0.0;
306+
307+
if (jobTopology.isSource(vertex)) {
308+
adjustedCapacity +=
309+
evaluatedMetrics
310+
.getVertexMetrics()
311+
.get(vertex)
312+
.get(TARGET_DATA_RATE)
313+
.getAverage()
314+
* backpropScaleFactors.getOrDefault(vertex, 1.0);
315+
} else {
316+
for (var input : jobTopology.getVertexInfos().get(vertex).getInputs().keySet()) {
317+
adjustedCapacity +=
318+
adjustedDataRate.get(input)
319+
* jobTopology
320+
.getVertexInfos()
321+
.get(vertex)
322+
.getInputRatios()
323+
.get(input);
324+
}
325+
}
326+
adjustedDataRate.put(vertex, adjustedCapacity);
327+
evaluatedMetrics
328+
.getVertexMetrics()
329+
.get(vertex)
330+
.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(adjustedCapacity));
331+
}
332+
}
333+
258334
private boolean isJobUnderMemoryPressure(
259335
Context ctx, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics) {
260336

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

+1
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ private void computeTargetDataRate(
346346
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
347347
var outputRatio =
348348
computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
349+
topology.get(vertex).getInputRatios().put(inputVertex, outputRatio);
349350
LOG.debug(
350351
"Computed output ratio for edge ({} -> {}) : {}",
351352
inputVertex,

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,23 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
5858
.withDescription(
5959
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");
6060

61+
public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
62+
autoScalerConfig("processing.rate.backpropagation.enabled")
63+
.booleanType()
64+
.defaultValue(false)
65+
.withFallbackKeys(
66+
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
67+
.withDescription(
68+
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");
69+
70+
public static final ConfigOption<Boolean> BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED =
71+
autoScalerConfig("bottleneck-propagation.allow-scale-down")
72+
.booleanType()
73+
.defaultValue(false)
74+
.withFallbackKeys(
75+
oldOperatorConfigKey("bottleneck-propagation.allow-scale-down"))
76+
.withDescription("Allow vertices scale down during bottleneck propagation.");
77+
6178
public static final ConfigOption<Duration> METRICS_WINDOW =
6279
autoScalerConfig("metrics.window")
6380
.durationType()
@@ -313,7 +330,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
313330
.defaultValues()
314331
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
315332
.withDescription(
316-
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
333+
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.");
317334

318335
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
319336
autoScalerConfig("scaling.event.interval")

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import lombok.Data;
2626
import lombok.Setter;
2727

28+
import java.util.HashMap;
2829
import java.util.Map;
2930

3031
/** Job vertex information. */
@@ -36,6 +37,9 @@ public class VertexInfo {
3637
// All input vertices and the ship_strategy
3738
private final Map<JobVertexID, ShipStrategy> inputs;
3839

40+
// Output ratios from input vertices. Used for backpropagation
41+
private final Map<JobVertexID, Double> inputRatios;
42+
3943
private final SlotSharingGroupId slotSharingGroupId;
4044

4145
// All output vertices and the ship_strategy
@@ -68,6 +72,7 @@ public VertexInfo(
6872
this.originalMaxParallelism = maxParallelism;
6973
this.finished = finished;
7074
this.ioMetrics = ioMetrics;
75+
this.inputRatios = new HashMap<>();
7176
}
7277

7378
@VisibleForTesting

0 commit comments

Comments
 (0)