diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index 49ff71efa2..23b4fc81b8 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -116,6 +116,18 @@ Double Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup. + +
job.autoscaler.processing.rate.backpropagation.enabled
+ false + Boolean + Enable backpropagation of processing rate during autoscaling to reduce resources usage. + + +
job.autoscaler.processing.rate.backpropagation.impact
+ 0.0 + Double + How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8 +
job.autoscaler.quota.cpu
(none) @@ -210,7 +222,7 @@
job.autoscaler.vertex.exclude.ids
List<String> - A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented. + A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.
job.autoscaler.vertex.max-parallelism
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 1d32b1aab6..c9178dc43b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -20,8 +20,10 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.ShipStrategy; import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; @@ -37,6 +39,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.SortedMap; @@ -52,8 +55,10 @@ import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS; import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH; +import static org.apache.flink.autoscaler.utils.AutoScalerUtils.getTargetDataRateFromUpstream; import static org.apache.flink.util.Preconditions.checkArgument; /** Component responsible for computing vertex parallelism based on the scaling metrics. */ @@ -148,6 +153,67 @@ public static ParallelismChange noChange() { } } + public void backpropagateRate( + Configuration conf, + JobVertexID vertex, + JobTopology topology, + EvaluatedMetrics evaluatedMetrics, + Map backpropagationRate, + List excludedVertices) { + + if (excludedVertices.contains(vertex.toHexString())) { + return; + } + + var vertexMetrics = evaluatedMetrics.getVertexMetrics().get(vertex); + + // vertex scale factor is limited by max scale factor + double scaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR); + + // vertex scale factor is limited by max parallelism scale factor + scaleFactor = + Math.min( + scaleFactor, + vertexMetrics.get(MAX_PARALLELISM).getCurrent() + / vertexMetrics.get(PARALLELISM).getCurrent()); + + double maxProcessingRateAfterScale = + Math.min( + vertexMetrics.get(TARGET_DATA_RATE).getAverage() + * backpropagationRate.getOrDefault(vertex, 1.0), + vertexMetrics.get(TRUE_PROCESSING_RATE).getAverage() * scaleFactor); + + // evaluating partially updated target data rate from upstream + double targetDataRate = + getTargetDataRateFromUpstream( + evaluatedMetrics, topology, vertex, backpropagationRate); + + // if cannot derive finite value, then assume full processing + if (Double.isNaN(targetDataRate) || Double.isInfinite(targetDataRate)) { + return; + } + + // if cannot derive finite value, then assume full processing + if (Double.isNaN(maxProcessingRateAfterScale) + || Double.isInfinite(maxProcessingRateAfterScale)) { + return; + } + + // if all input stream can be processed, skip propagation + if (targetDataRate < maxProcessingRateAfterScale) { + return; + } + + // propagation coefficient + double adjustmentRate = maxProcessingRateAfterScale / targetDataRate; + + // update rate of direct upstream vertices + for (var v : topology.getVertexInfos().get(vertex).getInputs().keySet()) { + double vertexRate = backpropagationRate.getOrDefault(v, 1.0); + backpropagationRate.put(v, vertexRate * adjustmentRate); + } + } + public ParallelismChange computeScaleTargetParallelism( Context context, JobVertexID vertex, diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index 02e5ad4f15..3462aa5436 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -36,6 +36,8 @@ import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.shaded.curator5.com.google.common.base.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +55,7 @@ import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE; import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE; import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON; @@ -61,6 +64,7 @@ import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore; import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Class responsible for executing scaling decisions. */ @@ -238,6 +242,11 @@ Map computeScalingSummary( var excludeVertexIdList = context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); + + if (context.getConfiguration().get(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) { + backpropagateProcessingRate(context, evaluatedMetrics, jobTopology); + } + evaluatedMetrics .getVertexMetrics() .forEach( @@ -284,6 +293,73 @@ Map computeScalingSummary( return out; } + private void backpropagateProcessingRate( + Context context, EvaluatedMetrics evaluatedMetrics, JobTopology jobTopology) { + var conf = context.getConfiguration(); + double impact = conf.get(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT); + Preconditions.checkState( + 0 <= impact && impact <= 1.0, "Backpropagation impact should be in range [0, 1]"); + var propagationRate = new HashMap(); + var excludeVertexIdList = + context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); + var vertexIterator = + jobTopology + .getStrongTopologicalOrder() + .listIterator(jobTopology.getStrongTopologicalOrder().size()); + + // backpropagate scale factors + while (vertexIterator.hasPrevious()) { + var vertex = vertexIterator.previous(); + jobVertexScaler.backpropagateRate( + conf, + vertex, + jobTopology, + evaluatedMetrics, + propagationRate, + excludeVertexIdList); + } + + // use an extra map to not lose precision + Map adjustedDataRate = new HashMap<>(); + + // re-evaluating vertices capacity + // Target data rate metric is rewritten for parallelism evaluation + for (var vertex : jobTopology.getVerticesInTopologicalOrder()) { + double newTargetDataRate = 0.0; + + if (jobTopology.isSource(vertex)) { + double targetDateRate = + evaluatedMetrics + .getVertexMetrics() + .get(vertex) + .get(TARGET_DATA_RATE) + .getAverage(); + + // linear interpolation between adjusted value and initial + newTargetDataRate = + targetDateRate + * (impact * propagationRate.getOrDefault(vertex, 1.0) + + 1.0 + - impact); + } else { + for (var input : jobTopology.getVertexInfos().get(vertex).getInputs().keySet()) { + newTargetDataRate += + adjustedDataRate.get(input) + * jobTopology + .getVertexInfos() + .get(vertex) + .getInputRatios() + .get(input); + } + } + adjustedDataRate.put(vertex, newTargetDataRate); + evaluatedMetrics + .getVertexMetrics() + .get(vertex) + .put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(newTargetDataRate)); + } + } + private boolean isJobUnderMemoryPressure( Context ctx, Map evaluatedMetrics) { diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index 5bbc09a3e2..9ff5ec70b4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -352,6 +352,7 @@ private void computeTargetDataRate( var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE); var outputRatio = computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory); + topology.get(vertex).getInputRatios().put(inputVertex, outputRatio); LOG.debug( "Computed output ratio for edge ({} -> {}) : {}", inputVertex, diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index e1ea6a8695..d6bc73d884 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -58,6 +58,24 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs."); + public static final ConfigOption PROCESSING_RATE_BACKPROPAGATION_ENABLED = + autoScalerConfig("processing.rate.backpropagation.enabled") + .booleanType() + .defaultValue(false) + .withFallbackKeys( + oldOperatorConfigKey("processing.rate.backpropagation.enabled")) + .withDescription( + "Enable backpropagation of processing rate during autoscaling to reduce resources usage."); + + public static final ConfigOption PROCESSING_RATE_BACKPROPAGATION_IMPACT = + autoScalerConfig("processing.rate.backpropagation.impact") + .doubleType() + .defaultValue(0.0) + .withFallbackKeys( + oldOperatorConfigKey("processing.rate.backpropagation.impact")) + .withDescription( + "How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8"); + public static final ConfigOption METRICS_WINDOW = autoScalerConfig("metrics.window") .durationType() @@ -320,7 +338,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .defaultValues() .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( - "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); + "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java index 851db6b056..104db529a6 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; /** Structure representing information about the jobgraph that is relevant for scaling. */ @@ -52,6 +53,7 @@ public class JobTopology { @Getter private final Map> slotSharingGroupMapping; @Getter private final Set finishedVertices; @Getter private final List verticesInTopologicalOrder; + @Getter private final List strongTopologicalOrder; public JobTopology(Collection vertexInfo) { this(new HashSet<>(vertexInfo)); @@ -99,6 +101,7 @@ public JobTopology(Set vertexInfo) { this.slotSharingGroupMapping = ImmutableMap.copyOf(vertexSlotSharingGroupMapping); this.finishedVertices = finishedVertices.build(); this.verticesInTopologicalOrder = returnVerticesInTopologicalOrder(); + this.strongTopologicalOrder = returnStrongTopologicalOrder(); } public VertexInfo get(JobVertexID jvi) { @@ -112,9 +115,9 @@ public boolean isSource(JobVertexID jobVertexID) { private List returnVerticesInTopologicalOrder() { List sorted = new ArrayList<>(vertexInfos.size()); - Map> remainingInputs = new HashMap<>(vertexInfos.size()); + Map> remainingInputs = new HashMap<>(vertexInfos.size()); vertexInfos.forEach( - (id, v) -> remainingInputs.put(id, new ArrayList<>(v.getInputs().keySet()))); + (id, v) -> remainingInputs.put(id, new HashSet<>(v.getInputs().keySet()))); while (!remainingInputs.isEmpty()) { List verticesWithZeroIndegree = new ArrayList<>(); @@ -140,6 +143,71 @@ private List returnVerticesInTopologicalOrder() { return sorted; } + /** + * Strong topological order is a topological order, where vertices are also sorted by their + * distance to the most distant sources. + * + * @return vertices in the enhanced topological order + */ + public List returnStrongTopologicalOrder() { + List sorted = new ArrayList<>(vertexInfos.size()); + + Map> remainingInputs = new HashMap<>(vertexInfos.size()); + vertexInfos.forEach( + (id, v) -> remainingInputs.put(id, new HashSet<>(v.getInputs().keySet()))); + + Map distances = new HashMap<>(vertexInfos.size()); + TreeMap> order = new TreeMap<>(); + + while (!remainingInputs.isEmpty()) { + List verticesWithZeroIndegree = new ArrayList<>(); + + // storing + remainingInputs.forEach( + (v, inputs) -> { + if (inputs.isEmpty()) { + int dist = distances.getOrDefault(v, 0); + if (!order.containsKey(dist)) { + order.put(dist, new ArrayList<>()); + } + order.get(dist).add(v); + verticesWithZeroIndegree.add(v); + } + }); + + verticesWithZeroIndegree.forEach( + v -> { + remainingInputs.remove(v); + vertexInfos + .get(v) + .getOutputs() + .keySet() + .forEach(o -> remainingInputs.get(o).remove(v)); + }); + + List layer = order.firstEntry().getValue(); + order.remove(order.firstKey()); + + layer.forEach( + v -> { + final int dist = distances.getOrDefault(v, 0); + vertexInfos + .get(v) + .getOutputs() + .keySet() + .forEach( + o -> { + remainingInputs.get(o).remove(v); + int dist1 = distances.getOrDefault(o, 0); + distances.put(o, Math.max(dist1, dist + 1)); + }); + }); + + sorted.addAll(layer); + } + return sorted; + } + public static JobTopology fromJsonPlan( String jsonPlan, Map slotSharingGroupIdMap, diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java index 6016d70ecc..3c6106a213 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java @@ -25,6 +25,7 @@ import lombok.Data; import lombok.Setter; +import java.util.HashMap; import java.util.Map; /** Job vertex information. */ @@ -33,6 +34,9 @@ public class VertexInfo { private final JobVertexID id; + // Output ratios from input vertices. Used for backpropagation + private final Map inputRatios; + // All input vertices and the ship_strategy private final Map inputs; @@ -67,6 +71,7 @@ public VertexInfo( this.maxParallelism = maxParallelism; this.finished = finished; this.ioMetrics = ioMetrics; + this.inputRatios = new HashMap<>(); } @VisibleForTesting diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java index 411ab9b20d..4120d8c388 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java @@ -18,8 +18,10 @@ package org.apache.flink.autoscaler.utils; import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -94,4 +96,41 @@ public static boolean excludeVerticesFromScaling( conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new ArrayList<>(excludedIds)); return anyAdded; } + + public static double getTargetDataRateFromUpstream( + EvaluatedMetrics evaluatedMetrics, + JobTopology jobTopology, + JobVertexID vertex, + Map backpropagationRate) { + if (jobTopology.isSource(vertex)) { + return evaluatedMetrics + .getVertexMetrics() + .get(vertex) + .get(TARGET_DATA_RATE) + .getAverage() + * backpropagationRate.getOrDefault(vertex, 1.0); + } + + double targetDataRate = 0.0; + + for (var input : jobTopology.getVertexInfos().get(vertex).getInputs().keySet()) { + double inputDataRate = + evaluatedMetrics + .getVertexMetrics() + .get(input) + .get(TARGET_DATA_RATE) + .getAverage(); + + inputDataRate *= backpropagationRate.getOrDefault(input, 1.0); + inputDataRate *= + jobTopology + .getVertexInfos() + .get(vertex) + .getInputRatios() + .getOrDefault(input, 1.0); + targetDataRate += inputDataRate; + } + + return targetDataRate; + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index 102586a3ef..ac8608c5d2 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -181,6 +181,10 @@ public void testEndToEnd() throws Exception { assertTrue(collectedMetrics.isFullyCollected()); var evaluation = evaluator.evaluate(conf, collectedMetrics, restartTime); + assertEquals(Map.of(map, 2.0), topology.getVertexInfos().get(sink).getInputRatios()); + assertEquals( + Map.of(source1, 2.0, source2, 2.0), + topology.getVertexInfos().get(map).getInputRatios()); scalingExecutor.scaleResource( context, evaluation, diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorBackpropagationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorBackpropagationTest.java new file mode 100644 index 0000000000..9b83406e93 --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorBackpropagationTest.java @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.TestingEventCollector; +import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL; +import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for backpropagation in {@link ScalingExecutor}. */ +public class ScalingExecutorBackpropagationTest { + + private static final int MAX_PARALLELISM = 720; + + private JobAutoScalerContext context; + private TestingEventCollector> eventCollector; + private ScalingExecutor> scalingExecutor; + + private InMemoryAutoScalerStateStore> stateStore; + + private static final Map dummyGlobalMetrics = + Map.of( + ScalingMetric.GC_PRESSURE, EvaluatedScalingMetric.of(Double.NaN), + ScalingMetric.HEAP_MAX_USAGE_RATIO, EvaluatedScalingMetric.of(Double.NaN)); + + @BeforeEach + public void setup() { + eventCollector = new TestingEventCollector<>(); + context = createDefaultJobAutoScalerContext(); + stateStore = new InMemoryAutoScalerStateStore<>(); + + scalingExecutor = + new ScalingExecutor<>(eventCollector, stateStore) { + @Override + protected boolean scalingWouldExceedMaxResources( + Configuration tunedConfig, + JobTopology jobTopology, + EvaluatedMetrics evaluatedMetrics, + Map scalingSummaries, + JobAutoScalerContext ctx) { + return super.scalingWouldExceedMaxResources( + tunedConfig, jobTopology, evaluatedMetrics, scalingSummaries, ctx); + } + }; + Configuration conf = context.getConfiguration(); + conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO); + conf.set(AutoScalerOptions.SCALING_ENABLED, true); + conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); + conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 1.0); + conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true); + } + + @Test + public void testMetricsNotUpdateOnNoPropagation() throws Exception { + var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source = JobVertexID.fromHexString(sourceHexString); + var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e"; + var filterOperator = JobVertexID.fromHexString(filterOperatorHexString); + var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink = JobVertexID.fromHexString(sinkHexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 20, false, null), + new VertexInfo(filterOperator, Map.of(source, HASH), 10, 20, false, null), + new VertexInfo(sink, Map.of(filterOperator, HASH), 10, 20, false, null)); + + jobTopology.get(filterOperator).getInputRatios().put(source, 1.0); + jobTopology.get(sink).getInputRatios().put(filterOperator, 1.0); + + var conf = context.getConfiguration(); + conf.set(SCALE_DOWN_INTERVAL, Duration.ofMillis(0)); + var metrics = + new EvaluatedMetrics( + Map.of( + source, + evaluated(10, 50, 100), + filterOperator, + evaluated(10, 50, 100), + sink, + evaluated(10, 50, 100)), + dummyGlobalMetrics); + + // there is no bottlenecks, so scaling should not change target data rate + var now = Instant.now(); + assertTrue( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology, + new DelayedScaleDown())); + assertEquals( + 50.0, + metrics.getVertexMetrics() + .get(source) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 50.0, + metrics.getVertexMetrics() + .get(filterOperator) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 50.0, + metrics.getVertexMetrics() + .get(sink) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + + metrics = + new EvaluatedMetrics( + Map.of( + source, + evaluated(10, 100, 100), + filterOperator, + evaluated(10, 200, 100), + sink, + evaluated(10, 400, 100)), + dummyGlobalMetrics); + jobTopology.get(filterOperator).getInputRatios().put(source, 2.0); + jobTopology.get(sink).getInputRatios().put(filterOperator, 2.0); + assertTrue( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology, + new DelayedScaleDown())); + assertEquals( + 100, + metrics.getVertexMetrics() + .get(source) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 200.0, + metrics.getVertexMetrics() + .get(filterOperator) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 400.0, + metrics.getVertexMetrics() + .get(sink) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + } + + @Test + public void testMetricsUpdateOnPropagation() throws Exception { + var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source = JobVertexID.fromHexString(sourceHexString); + var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e"; + var filterOperator = JobVertexID.fromHexString(filterOperatorHexString); + var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink = JobVertexID.fromHexString(sinkHexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 20, false, null), + new VertexInfo(filterOperator, Map.of(source, HASH), 10, 20, false, null), + new VertexInfo(sink, Map.of(filterOperator, HASH), 10, 20, false, null)); + + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 1.0); + var metrics = + new EvaluatedMetrics( + Map.of( + source, + evaluated(10, 100, 100), + filterOperator, + evaluated(10, 400, 100), + sink, + evaluated(10, 400, 100)), + dummyGlobalMetrics); + jobTopology.get(filterOperator).getInputRatios().put(source, 4.0); + jobTopology.get(sink).getInputRatios().put(filterOperator, 1.0); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT, 1.0); + var now = Instant.now(); + assertTrue( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology, + new DelayedScaleDown())); + assertEquals( + 50.0, + metrics.getVertexMetrics() + .get(source) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 200.0, + metrics.getVertexMetrics() + .get(filterOperator) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 200.0, + metrics.getVertexMetrics() + .get(sink) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + } + + @Test + public void testMetricsUpdateOnPropagationMaxParallelismLimit() throws Exception { + var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source = JobVertexID.fromHexString(sourceHexString); + var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e"; + var filterOperator = JobVertexID.fromHexString(filterOperatorHexString); + var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink = JobVertexID.fromHexString(sinkHexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 20, false, null), + new VertexInfo(filterOperator, Map.of(source, HASH), 10, 20, false, null), + new VertexInfo(sink, Map.of(filterOperator, HASH), 10, 10, false, null)); + + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + var metrics = + new EvaluatedMetrics( + Map.of( + source, + evaluated(10, 100, 100), + filterOperator, + evaluated(10, 400, 100), + sink, + evaluated(10, 400, 100)), + dummyGlobalMetrics); + jobTopology.get(filterOperator).getInputRatios().put(source, 4.0); + jobTopology.get(sink).getInputRatios().put(filterOperator, 1.0); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT, 1.0); + + metrics.getVertexMetrics() + .get(sink) + .put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(10)); + + var now = Instant.now(); + assertFalse( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology, + new DelayedScaleDown())); + assertEquals( + 25.0, + metrics.getVertexMetrics() + .get(source) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 100.0, + metrics.getVertexMetrics() + .get(filterOperator) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 100.0, + metrics.getVertexMetrics() + .get(sink) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + } + + @Test + public void testMetricsUpdateOnPropagationExcludedVertices() throws Exception { + var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source = JobVertexID.fromHexString(sourceHexString); + var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e"; + var filterOperator = JobVertexID.fromHexString(filterOperatorHexString); + var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink = JobVertexID.fromHexString(sinkHexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 20, false, null), + new VertexInfo(filterOperator, Map.of(source, HASH), 10, 20, false, null), + new VertexInfo(sink, Map.of(filterOperator, HASH), 10, 20, false, null)); + + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(sinkHexString)); + var metrics = + new EvaluatedMetrics( + Map.of( + source, + evaluated(10, 100, 100), + filterOperator, + evaluated(10, 400, 100), + sink, + evaluated(10, 400, 100)), + dummyGlobalMetrics); + jobTopology.get(filterOperator).getInputRatios().put(source, 4.0); + jobTopology.get(sink).getInputRatios().put(filterOperator, 1.0); + var now = Instant.now(); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT, 1.0); + assertTrue( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology, + new DelayedScaleDown())); + assertEquals( + 50.0, + metrics.getVertexMetrics() + .get(source) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 200.0, + metrics.getVertexMetrics() + .get(filterOperator) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 200.0, + metrics.getVertexMetrics() + .get(sink) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + } + + @Test + public void testDisconnectedJobMetricsUpdate() throws Exception { + var source1HexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source1 = JobVertexID.fromHexString(source1HexString); + var sink1HexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink1 = JobVertexID.fromHexString(sink1HexString); + var source2HexString = "74b8b4bd0762e2177f8d71481f79f07c"; + var source2 = JobVertexID.fromHexString(source2HexString); + var sink2HexString = "c044d7d1304e32489deb16b4e0b080ef"; + var sink2 = JobVertexID.fromHexString(sink2HexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source1, Map.of(), 10, 20, false, null), + new VertexInfo(source2, Map.of(), 10, 20, false, null), + new VertexInfo(sink1, Map.of(source1, HASH), 10, 20, false, null), + new VertexInfo(sink2, Map.of(source2, HASH), 10, 20, false, null)); + + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 1.0); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true); + var metrics = + new EvaluatedMetrics( + Map.of( + source1, + evaluated(10, 100, 100), + source2, + evaluated(10, 200, 100), + sink1, + evaluated(10, 800, 100), + sink2, + evaluated(10, 500, 100)), + dummyGlobalMetrics); + jobTopology.get(sink1).getInputRatios().put(source1, 8.0); + jobTopology.get(sink2).getInputRatios().put(source2, 2.5); + var now = Instant.now(); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT, 1.0); + assertTrue( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology, + new DelayedScaleDown())); + assertEquals( + 25, + metrics.getVertexMetrics() + .get(source1) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 80.0, + metrics.getVertexMetrics() + .get(source2) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 200.0, + metrics.getVertexMetrics() + .get(sink1) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 200.0, + metrics.getVertexMetrics() + .get(sink2) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + } + + @Test + public void testComplexJobMetricsUpdate() throws Exception { + var source1HexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source1 = JobVertexID.fromHexString(source1HexString); + var op1HexString = "b155797987c1d39a2c13f083476bce0d"; + var op1 = JobVertexID.fromHexString(op1HexString); + var sink1HexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink1 = JobVertexID.fromHexString(sink1HexString); + + var source2HexString = "74b8b4bd0762e2177f8d71481f79f07c"; + var source2 = JobVertexID.fromHexString(source2HexString); + var op2HexString = "958f423134cae6985a59e2d16bde444e"; + var op2 = JobVertexID.fromHexString(op2HexString); + var sink2HexString = "c044d7d1304e32489deb16b4e0b080ef"; + var sink2 = JobVertexID.fromHexString(sink2HexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source1, Map.of(), 10, 20, false, null), + new VertexInfo(source2, Map.of(), 10, 20, false, null), + new VertexInfo( + op1, Map.of(source1, HASH, source2, HASH), 10, 20, false, null), + new VertexInfo( + op2, Map.of(source1, HASH, source2, HASH), 10, 20, false, null), + new VertexInfo(sink1, Map.of(op1, HASH, op2, HASH), 10, 20, false, null), + new VertexInfo(sink2, Map.of(op1, HASH, op2, HASH), 10, 20, false, null)); + + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 1.0); + var metrics = + new EvaluatedMetrics( + Map.of( + source1, + evaluated(10, 100, 100), + source2, + evaluated(10, 100, 100), + op1, + evaluated(10, 200, 100), + op2, + evaluated(10, 200, 100), + sink1, + evaluated(10, 600, 100), + sink2, + evaluated(10, 500, 100)), + dummyGlobalMetrics); + jobTopology.get(op1).getInputRatios().put(source1, 1.0); + jobTopology.get(op1).getInputRatios().put(source2, 1.0); + jobTopology.get(op2).getInputRatios().put(source1, 1.0); + jobTopology.get(op2).getInputRatios().put(source2, 1.0); + + jobTopology.get(sink1).getInputRatios().put(op1, 1.0); + jobTopology.get(sink1).getInputRatios().put(op2, 2.0); + jobTopology.get(sink2).getInputRatios().put(op1, 2.0); + jobTopology.get(sink2).getInputRatios().put(op2, 0.5); + + var now = Instant.now(); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT, 0.8); + assertTrue( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology, + new DelayedScaleDown())); + assertEquals( + 46.667, + metrics.getVertexMetrics() + .get(source1) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 46.667, + metrics.getVertexMetrics() + .get(source2) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 93.333, + metrics.getVertexMetrics() + .get(op1) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 93.333, + metrics.getVertexMetrics() + .get(op2) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + + // since aggressive scaling down is disabled, the vertex is still a bottleneck + assertEquals( + 280.0, + metrics.getVertexMetrics() + .get(sink1) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + assertEquals( + 233.333, + metrics.getVertexMetrics() + .get(sink2) + .get(ScalingMetric.TARGET_DATA_RATE) + .getAverage()); + } + + private Map evaluated( + int parallelism, double target, double trueProcessingRate) { + var metrics = new HashMap(); + metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism)); + metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(MAX_PARALLELISM)); + metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target)); + metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(0.0)); + metrics.put( + ScalingMetric.TRUE_PROCESSING_RATE, + new EvaluatedScalingMetric(trueProcessingRate, trueProcessingRate)); + metrics.put(ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(150)); + + var restartTime = context.getConfiguration().get(AutoScalerOptions.RESTART_TIME); + ScalingMetricEvaluator.computeProcessingRateThresholds( + metrics, context.getConfiguration(), false, restartTime); + return metrics; + } +}