|
20 | 20 | import org.apache.flink.annotation.VisibleForTesting;
|
21 | 21 | import org.apache.flink.autoscaler.config.AutoScalerOptions;
|
22 | 22 | import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
|
| 23 | +import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; |
23 | 24 | import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
|
24 | 25 | import org.apache.flink.autoscaler.metrics.ScalingMetric;
|
| 26 | +import org.apache.flink.autoscaler.topology.JobTopology; |
25 | 27 | import org.apache.flink.autoscaler.topology.ShipStrategy;
|
26 | 28 | import org.apache.flink.autoscaler.utils.AutoScalerUtils;
|
27 | 29 | import org.apache.flink.configuration.Configuration;
|
|
36 | 38 | import java.time.Instant;
|
37 | 39 | import java.time.ZoneId;
|
38 | 40 | import java.util.Collection;
|
| 41 | +import java.util.List; |
39 | 42 | import java.util.Map;
|
40 | 43 | import java.util.SortedMap;
|
41 | 44 |
|
|
49 | 52 | import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
|
50 | 53 | import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
|
51 | 54 | import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
|
| 55 | +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; |
52 | 56 | import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
|
53 | 57 | import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
|
54 | 58 |
|
@@ -150,6 +154,116 @@ public int computeScaleTargetParallelism(
|
150 | 154 | return newParallelism;
|
151 | 155 | }
|
152 | 156 |
|
| 157 | + public void propagateBackpropScaleFactor( |
| 158 | + Configuration conf, |
| 159 | + JobVertexID vertex, |
| 160 | + JobTopology topology, |
| 161 | + EvaluatedMetrics evaluatedMetrics, |
| 162 | + Map<JobVertexID, Double> realDataRates, |
| 163 | + List<String> excludedVertices) { |
| 164 | + |
| 165 | + double targetDataRate = |
| 166 | + evaluatedMetrics.getVertexMetrics().get(vertex).get(TARGET_DATA_RATE).getCurrent(); |
| 167 | + |
| 168 | + // we cannot adjust data rate of the vertex if data rate is undefined or the vertex is |
| 169 | + // excluded from scaling |
| 170 | + if (Double.isNaN(targetDataRate) || excludedVertices.contains(vertex.toString())) { |
| 171 | + LOG.debug( |
| 172 | + "Vertex {} is excluded from scaling or it's target data rate is undefined. Excluding it from backpropagation", |
| 173 | + vertex.toHexString()); |
| 174 | + realDataRates.put(vertex, targetDataRate); |
| 175 | + return; |
| 176 | + } |
| 177 | + |
| 178 | + // how target data rate should be lowered |
| 179 | + double dataRateDecrease = 0.0; |
| 180 | + |
| 181 | + // check how the data rate of the vertex should be lowered depending on downstream |
| 182 | + for (var downstream : topology.getVertexInfos().get(vertex).getOutputs().keySet()) { |
| 183 | + double downstreamTargetDataRate = |
| 184 | + evaluatedMetrics |
| 185 | + .getVertexMetrics() |
| 186 | + .get(downstream) |
| 187 | + .get(TARGET_DATA_RATE) |
| 188 | + .getCurrent(); |
| 189 | + double downstreamRealDataRate = realDataRates.getOrDefault(downstream, Double.NaN); |
| 190 | + int upstreamVertices = topology.getVertexInfos().get(downstream).getInputs().size(); |
| 191 | + |
| 192 | + double outputRatio = |
| 193 | + topology.getVertexInfos() |
| 194 | + .get(downstream) |
| 195 | + .getInputRatios() |
| 196 | + .getOrDefault(vertex, Double.NaN); |
| 197 | + |
| 198 | + // if real data rate cannot be updated by the downstream vertex |
| 199 | + if (Double.isNaN(downstreamRealDataRate) || Double.isInfinite(downstreamRealDataRate)) { |
| 200 | + continue; |
| 201 | + } |
| 202 | + if (Double.isNaN(downstreamTargetDataRate) |
| 203 | + || Double.isInfinite(downstreamTargetDataRate)) { |
| 204 | + continue; |
| 205 | + } |
| 206 | + if (Double.isNaN(outputRatio) || Double.isInfinite(outputRatio)) { |
| 207 | + continue; |
| 208 | + } |
| 209 | + |
| 210 | + // distribute downstream's data rate delta over all it's upstream vertices |
| 211 | + double downstreamDataRateDelta = |
| 212 | + (downstreamTargetDataRate - downstreamRealDataRate) / upstreamVertices; |
| 213 | + |
| 214 | + dataRateDecrease = Math.max(dataRateDecrease, downstreamDataRateDelta / outputRatio); |
| 215 | + } |
| 216 | + |
| 217 | + LOG.debug( |
| 218 | + "Data rate of {} is decreased by {} from downstream", |
| 219 | + vertex.toHexString(), |
| 220 | + dataRateDecrease); |
| 221 | + |
| 222 | + if (dataRateDecrease > targetDataRate) { |
| 223 | + LOG.warn( |
| 224 | + "Required data rate decrease {} for vertex {} exceeds target data of the vertex {}.", |
| 225 | + dataRateDecrease, |
| 226 | + vertex.toHexString(), |
| 227 | + targetDataRate); |
| 228 | + dataRateDecrease = targetDataRate; |
| 229 | + } |
| 230 | + |
| 231 | + targetDataRate -= dataRateDecrease; |
| 232 | + |
| 233 | + // check, if target data rate should be lowered even more due to scaling limitations |
| 234 | + double averageTrueProcessingRate = |
| 235 | + evaluatedMetrics |
| 236 | + .getVertexMetrics() |
| 237 | + .get(vertex) |
| 238 | + .get(TRUE_PROCESSING_RATE) |
| 239 | + .getAverage(); |
| 240 | + |
| 241 | + if (Double.isNaN(averageTrueProcessingRate)) { |
| 242 | + LOG.info( |
| 243 | + "True Processing Rate of {} is undefined, use target data rate adjusted by downstream", |
| 244 | + vertex); |
| 245 | + realDataRates.put(vertex, targetDataRate); |
| 246 | + return; |
| 247 | + } |
| 248 | + |
| 249 | + // determine upper limit for scaling of the vertex |
| 250 | + double parallelism = |
| 251 | + evaluatedMetrics.getVertexMetrics().get(vertex).get(PARALLELISM).getCurrent(); |
| 252 | + double maxParallelism = |
| 253 | + evaluatedMetrics.getVertexMetrics().get(vertex).get(MAX_PARALLELISM).getCurrent(); |
| 254 | + double maxScaleFactor = |
| 255 | + Math.min(1 + conf.get(MAX_SCALE_UP_FACTOR), maxParallelism / parallelism); |
| 256 | + |
| 257 | + double maxDataProcessingRate = maxScaleFactor * averageTrueProcessingRate; |
| 258 | + targetDataRate = Math.min(targetDataRate, maxDataProcessingRate); |
| 259 | + |
| 260 | + LOG.debug( |
| 261 | + "Real data rate of {} after backpropagation is {}", |
| 262 | + vertex.toHexString(), |
| 263 | + targetDataRate); |
| 264 | + realDataRates.put(vertex, targetDataRate); |
| 265 | + } |
| 266 | + |
153 | 267 | private boolean blockScalingBasedOnPastActions(
|
154 | 268 | Context context,
|
155 | 269 | JobVertexID vertex,
|
|
0 commit comments