Skip to content

Commit cc0479a

Browse files
committed
[FLINK-31215] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators
1 parent d8568ae commit cc0479a

File tree

9 files changed

+845
-5
lines changed

9 files changed

+845
-5
lines changed

Diff for: docs/layouts/shortcodes/generated/auto_scaler_configuration.html

+13-1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,18 @@
116116
<td>Double</td>
117117
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
118118
</tr>
119+
<tr>
120+
<td><h5>job.autoscaler.processing.rate.backpropagation.enabled</h5></td>
121+
<td style="word-wrap: break-word;">false</td>
122+
<td>Boolean</td>
123+
<td>Enable backpropagation of processing rate during autoscaling to reduce resources usage.</td>
124+
</tr>
125+
<tr>
126+
<td><h5>job.autoscaler.processing.rate.backpropagation.impact</h5></td>
127+
<td style="word-wrap: break-word;">0.0</td>
128+
<td>Double</td>
129+
<td>How strong should backpropagated values affect scaling. 0 - means no affect, 1 - use backpropagated values</td>
130+
</tr>
119131
<tr>
120132
<td><h5>job.autoscaler.quota.cpu</h5></td>
121133
<td style="word-wrap: break-word;">(none)</td>
@@ -210,7 +222,7 @@
210222
<td><h5>job.autoscaler.vertex.exclude.ids</h5></td>
211223
<td style="word-wrap: break-word;"></td>
212224
<td>List&lt;String&gt;</td>
213-
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.</td>
225+
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.</td>
214226
</tr>
215227
<tr>
216228
<td><h5>job.autoscaler.vertex.max-parallelism</h5></td>

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

+114
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.flink.annotation.VisibleForTesting;
2121
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2222
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
23+
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
2324
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
2425
import org.apache.flink.autoscaler.metrics.ScalingMetric;
26+
import org.apache.flink.autoscaler.topology.JobTopology;
2527
import org.apache.flink.autoscaler.topology.ShipStrategy;
2628
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
2729
import org.apache.flink.configuration.Configuration;
@@ -36,6 +38,7 @@
3638
import java.time.Instant;
3739
import java.time.ZoneId;
3840
import java.util.Collection;
41+
import java.util.List;
3942
import java.util.Map;
4043
import java.util.SortedMap;
4144

@@ -49,6 +52,7 @@
4952
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
5053
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
5154
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
55+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
5256
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
5357
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
5458

@@ -150,6 +154,116 @@ public int computeScaleTargetParallelism(
150154
return newParallelism;
151155
}
152156

157+
public void propagateBackpropScaleFactor(
158+
Configuration conf,
159+
JobVertexID vertex,
160+
JobTopology topology,
161+
EvaluatedMetrics evaluatedMetrics,
162+
Map<JobVertexID, Double> realDataRates,
163+
List<String> excludedVertices) {
164+
165+
double targetDataRate =
166+
evaluatedMetrics.getVertexMetrics().get(vertex).get(TARGET_DATA_RATE).getCurrent();
167+
168+
// we cannot adjust data rate of the vertex if data rate is undefined or the vertex is
169+
// excluded from scaling
170+
if (Double.isNaN(targetDataRate) || excludedVertices.contains(vertex.toString())) {
171+
LOG.debug(
172+
"Vertex {} is excluded from scaling or it's target data rate is undefined. Excluding it from backpropagation",
173+
vertex.toHexString());
174+
realDataRates.put(vertex, targetDataRate);
175+
return;
176+
}
177+
178+
// how target data rate should be lowered
179+
double dataRateDecrease = 0.0;
180+
181+
// check how the data rate of the vertex should be lowered depending on downstream
182+
for (var downstream : topology.getVertexInfos().get(vertex).getOutputs().keySet()) {
183+
double downstreamTargetDataRate =
184+
evaluatedMetrics
185+
.getVertexMetrics()
186+
.get(downstream)
187+
.get(TARGET_DATA_RATE)
188+
.getCurrent();
189+
double downstreamRealDataRate = realDataRates.getOrDefault(downstream, Double.NaN);
190+
int upstreamVertices = topology.getVertexInfos().get(downstream).getInputs().size();
191+
192+
double outputRatio =
193+
topology.getVertexInfos()
194+
.get(downstream)
195+
.getInputRatios()
196+
.getOrDefault(vertex, Double.NaN);
197+
198+
// if real data rate cannot be updated by the downstream vertex
199+
if (Double.isNaN(downstreamRealDataRate) || Double.isInfinite(downstreamRealDataRate)) {
200+
continue;
201+
}
202+
if (Double.isNaN(downstreamTargetDataRate)
203+
|| Double.isInfinite(downstreamTargetDataRate)) {
204+
continue;
205+
}
206+
if (Double.isNaN(outputRatio) || Double.isInfinite(outputRatio)) {
207+
continue;
208+
}
209+
210+
// distribute downstream's data rate delta over all it's upstream vertices
211+
double downstreamDataRateDelta =
212+
(downstreamTargetDataRate - downstreamRealDataRate) / upstreamVertices;
213+
214+
dataRateDecrease = Math.max(dataRateDecrease, downstreamDataRateDelta / outputRatio);
215+
}
216+
217+
LOG.debug(
218+
"Data rate of {} is decreased by {} from downstream",
219+
vertex.toHexString(),
220+
dataRateDecrease);
221+
222+
if (dataRateDecrease > targetDataRate) {
223+
LOG.warn(
224+
"Required data rate decrease {} for vertex {} exceeds target data of the vertex {}.",
225+
dataRateDecrease,
226+
vertex.toHexString(),
227+
targetDataRate);
228+
dataRateDecrease = targetDataRate;
229+
}
230+
231+
targetDataRate -= dataRateDecrease;
232+
233+
// check, if target data rate should be lowered even more due to scaling limitations
234+
double averageTrueProcessingRate =
235+
evaluatedMetrics
236+
.getVertexMetrics()
237+
.get(vertex)
238+
.get(TRUE_PROCESSING_RATE)
239+
.getAverage();
240+
241+
if (Double.isNaN(averageTrueProcessingRate)) {
242+
LOG.info(
243+
"True Processing Rate of {} is undefined, use target data rate adjusted by downstream",
244+
vertex);
245+
realDataRates.put(vertex, targetDataRate);
246+
return;
247+
}
248+
249+
// determine upper limit for scaling of the vertex
250+
double parallelism =
251+
evaluatedMetrics.getVertexMetrics().get(vertex).get(PARALLELISM).getCurrent();
252+
double maxParallelism =
253+
evaluatedMetrics.getVertexMetrics().get(vertex).get(MAX_PARALLELISM).getCurrent();
254+
double maxScaleFactor =
255+
Math.min(1 + conf.get(MAX_SCALE_UP_FACTOR), maxParallelism / parallelism);
256+
257+
double maxDataProcessingRate = maxScaleFactor * averageTrueProcessingRate;
258+
targetDataRate = Math.min(targetDataRate, maxDataProcessingRate);
259+
260+
LOG.debug(
261+
"Real data rate of {} after backpropagation is {}",
262+
vertex.toHexString(),
263+
targetDataRate);
264+
realDataRates.put(vertex, targetDataRate);
265+
}
266+
153267
private boolean blockScalingBasedOnPastActions(
154268
Context context,
155269
JobVertexID vertex,

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+76
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.configuration.TaskManagerOptions;
3636
import org.apache.flink.runtime.instance.SlotSharingGroupId;
3737
import org.apache.flink.runtime.jobgraph.JobVertexID;
38+
import org.apache.flink.util.Preconditions;
3839

3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
@@ -49,6 +50,7 @@
4950
import java.util.SortedMap;
5051

5152
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
53+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
5254
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5355
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5456
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
@@ -57,6 +59,7 @@
5759
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
5860
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
5961
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
62+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
6063
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
6164

6265
/** Class responsible for executing scaling decisions. */
@@ -221,8 +224,14 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
221224
}
222225

223226
var out = new HashMap<JobVertexID, ScalingSummary>();
227+
228+
if (context.getConfiguration().get(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
229+
backpropagateProcessingRate(context, evaluatedMetrics, restartTime, jobTopology);
230+
}
231+
224232
var excludeVertexIdList =
225233
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
234+
226235
evaluatedMetrics
227236
.getVertexMetrics()
228237
.forEach(
@@ -255,6 +264,73 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
255264
return out;
256265
}
257266

267+
private void backpropagateProcessingRate(
268+
Context context,
269+
EvaluatedMetrics evaluatedMetrics,
270+
Duration restartTime,
271+
JobTopology jobTopology) {
272+
var conf = context.getConfiguration();
273+
double backpropagationImpact =
274+
conf.get(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT);
275+
Preconditions.checkState(
276+
0 <= backpropagationImpact && backpropagationImpact <= 1.0,
277+
"Backpropagation impact should be in range [0, 1]");
278+
var realDataRates = new HashMap<JobVertexID, Double>();
279+
var excludeVertexIdList =
280+
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
281+
var vertexIterator =
282+
jobTopology
283+
.getVerticesInTopologicalOrder()
284+
.listIterator(jobTopology.getVerticesInTopologicalOrder().size());
285+
286+
// backpropagate scale factors
287+
while (vertexIterator.hasPrevious()) {
288+
var vertex = vertexIterator.previous();
289+
jobVertexScaler.propagateBackpropScaleFactor(
290+
conf,
291+
vertex,
292+
jobTopology,
293+
evaluatedMetrics,
294+
realDataRates,
295+
excludeVertexIdList);
296+
}
297+
298+
// use an extra map to not lose precision
299+
Map<JobVertexID, Double> adjustedDataRate = new HashMap<>();
300+
301+
// re-evaluating vertices capacity
302+
// Target data rate metric is rewritten for parallelism evaluation
303+
for (var vertex : jobTopology.getVerticesInTopologicalOrder()) {
304+
double adjustedCapacity = 0.0;
305+
306+
if (jobTopology.isSource(vertex)) {
307+
adjustedCapacity +=
308+
evaluatedMetrics
309+
.getVertexMetrics()
310+
.get(vertex)
311+
.get(TARGET_DATA_RATE)
312+
.getAverage()
313+
* (1.0 - backpropagationImpact)
314+
+ backpropagationImpact * realDataRates.get(vertex);
315+
} else {
316+
for (var input : jobTopology.getVertexInfos().get(vertex).getInputs().keySet()) {
317+
adjustedCapacity +=
318+
adjustedDataRate.get(input)
319+
* jobTopology
320+
.getVertexInfos()
321+
.get(vertex)
322+
.getInputRatios()
323+
.get(input);
324+
}
325+
}
326+
adjustedDataRate.put(vertex, adjustedCapacity);
327+
evaluatedMetrics
328+
.getVertexMetrics()
329+
.get(vertex)
330+
.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(adjustedCapacity));
331+
}
332+
}
333+
258334
private boolean isJobUnderMemoryPressure(
259335
Context ctx, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics) {
260336

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

+1
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ private void computeTargetDataRate(
346346
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
347347
var outputRatio =
348348
computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
349+
topology.get(vertex).getInputRatios().put(inputVertex, outputRatio);
349350
LOG.debug(
350351
"Computed output ratio for edge ({} -> {}) : {}",
351352
inputVertex,

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,24 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
5858
.withDescription(
5959
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");
6060

61+
public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
62+
autoScalerConfig("processing.rate.backpropagation.enabled")
63+
.booleanType()
64+
.defaultValue(false)
65+
.withFallbackKeys(
66+
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
67+
.withDescription(
68+
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");
69+
70+
public static final ConfigOption<Double> PROCESSING_RATE_BACKPROPAGATION_IMPACT =
71+
autoScalerConfig("processing.rate.backpropagation.impact")
72+
.doubleType()
73+
.defaultValue(0.0)
74+
.withFallbackKeys(
75+
oldOperatorConfigKey("processing.rate.backpropagation.impact"))
76+
.withDescription(
77+
"How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8");
78+
6179
public static final ConfigOption<Duration> METRICS_WINDOW =
6280
autoScalerConfig("metrics.window")
6381
.durationType()
@@ -313,7 +331,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
313331
.defaultValues()
314332
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
315333
.withDescription(
316-
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
334+
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.");
317335

318336
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
319337
autoScalerConfig("scaling.event.interval")

Diff for: flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import lombok.Data;
2626
import lombok.Setter;
2727

28+
import java.util.HashMap;
2829
import java.util.Map;
2930

3031
/** Job vertex information. */
@@ -36,6 +37,9 @@ public class VertexInfo {
3637
// All input vertices and the ship_strategy
3738
private final Map<JobVertexID, ShipStrategy> inputs;
3839

40+
// Output ratios from input vertices. Used for backpropagation
41+
private final Map<JobVertexID, Double> inputRatios;
42+
3943
private final SlotSharingGroupId slotSharingGroupId;
4044

4145
// All output vertices and the ship_strategy
@@ -68,6 +72,7 @@ public VertexInfo(
6872
this.originalMaxParallelism = maxParallelism;
6973
this.finished = finished;
7074
this.ioMetrics = ioMetrics;
75+
this.inputRatios = new HashMap<>();
7176
}
7277

7378
@VisibleForTesting

0 commit comments

Comments
 (0)