Skip to content

Commit 85675ee

Browse files
author
huyuanfeng
committed
[FLINK-37126] Add Validator for Autoscaler
1 parent 4c2c90c commit 85675ee

File tree

5 files changed

+240
-60
lines changed

5 files changed

+240
-60
lines changed

Diff for: flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.flink.autoscaler.JobAutoScaler;
2222
import org.apache.flink.autoscaler.JobAutoScalerContext;
2323
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
24+
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
25+
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
2426
import org.apache.flink.configuration.Configuration;
2527
import org.apache.flink.configuration.UnmodifiableConfiguration;
2628
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -41,6 +43,7 @@
4143
import java.util.HashSet;
4244
import java.util.List;
4345
import java.util.Map;
46+
import java.util.Optional;
4447
import java.util.Set;
4548
import java.util.concurrent.CompletableFuture;
4649
import java.util.concurrent.ConcurrentHashMap;
@@ -51,6 +54,7 @@
5154
import java.util.function.Function;
5255
import java.util.stream.Collectors;
5356

57+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5458
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
5559
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;
5660

@@ -69,6 +73,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
6973
private final ScheduledExecutorService scheduledExecutorService;
7074
private final ExecutorService scalingThreadPool;
7175
private final UnmodifiableConfiguration baseConf;
76+
private final AutoscalerValidator autoscalerValidator;
7277

7378
/**
7479
* Maintain a set of job keys that during scaling, it should be accessed at {@link
@@ -104,6 +109,7 @@ public StandaloneAutoscalerExecutor(
104109
parallelism, new ExecutorThreadFactory("autoscaler-standalone-scaling"));
105110
this.scalingJobKeys = new HashSet<>();
106111
this.baseConf = new UnmodifiableConfiguration(conf);
112+
this.autoscalerValidator = new DefaultAutoscalerValidator();
107113
}
108114

109115
public void start() {
@@ -189,7 +195,19 @@ private void cleanupStoppedJob(Collection<Context> jobList) {
189195
protected void scalingSingleJob(Context jobContext) {
190196
try {
191197
MDC.put("job.key", jobContext.getJobKey().toString());
192-
autoScaler.scale(jobContext);
198+
Optional<String> validationError =
199+
autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration());
200+
if (validationError.isPresent()) {
201+
eventHandler.handleEvent(
202+
jobContext,
203+
AutoScalerEventHandler.Type.Warning,
204+
"AutoScaler Options Validation",
205+
validationError.get(),
206+
null,
207+
baseConf.get(SCALING_EVENT_INTERVAL));
208+
} else {
209+
autoScaler.scale(jobContext);
210+
}
193211
} catch (Throwable e) {
194212
LOG.error("Error while scaling job", e);
195213
eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.standalone;
19+
20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.autoscaler.JobAutoScaler;
23+
import org.apache.flink.autoscaler.JobAutoScalerContext;
24+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
25+
import org.apache.flink.autoscaler.event.TestingEventCollector;
26+
import org.apache.flink.configuration.Configuration;
27+
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
37+
class StandaloneAutoscalerValidatorTest {
38+
@Test
39+
public void testAutoScalerWithInvalidConfig() {
40+
var jobList = new ArrayList<JobAutoScalerContext<JobID>>();
41+
var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
42+
43+
Configuration correctConfiguration = new Configuration();
44+
correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
45+
Configuration invalidConfiguration = new Configuration();
46+
invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
47+
invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, -1.);
48+
49+
var correctConfigurationJob = createJobAutoScalerContextWithConf(correctConfiguration);
50+
var illegalConfigurationJob = createJobAutoScalerContextWithConf(invalidConfiguration);
51+
var scaleCounter = new ConcurrentHashMap<JobID, Integer>();
52+
53+
try (var autoscalerExecutor =
54+
new StandaloneAutoscalerExecutor<>(
55+
new Configuration(),
56+
baseConf -> jobList,
57+
eventCollector,
58+
new JobAutoScaler<>() {
59+
@Override
60+
public void scale(JobAutoScalerContext<JobID> context) {
61+
scaleCounter.put(
62+
context.getJobKey(),
63+
scaleCounter.getOrDefault(context.getJobKey(), 0) + 1);
64+
}
65+
66+
@Override
67+
public void cleanup(JobAutoScalerContext<JobID> context) {
68+
// do nothing
69+
}
70+
})) {
71+
jobList.add(correctConfigurationJob);
72+
jobList.add(illegalConfigurationJob);
73+
List<CompletableFuture<Void>> scaledFutures = autoscalerExecutor.scaling();
74+
75+
assertThat(scaledFutures).hasSize(2);
76+
assertThat(scaleCounter).containsOnlyKeys(correctConfigurationJob.getJobKey());
77+
78+
assertThat(eventCollector.events).size().isEqualTo(1);
79+
assertThat(eventCollector.events)
80+
.allMatch(event -> event.getContext().equals(illegalConfigurationJob));
81+
}
82+
}
83+
84+
private static JobAutoScalerContext<JobID> createJobAutoScalerContextWithConf(
85+
Configuration configuration) {
86+
var jobID = new JobID();
87+
return new JobAutoScalerContext<>(
88+
jobID, jobID, JobStatus.RUNNING, configuration, null, null);
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.autoscaler.validation;
20+
21+
import org.apache.flink.configuration.Configuration;
22+
23+
import java.util.Optional;
24+
25+
/** Validator for Autoscaler. */
26+
public interface AutoscalerValidator {
27+
28+
/**
29+
* Validate autoscaler config and return optional error.
30+
*
31+
* @param flinkConf autoscaler config
32+
* @return Optional error string, should be present iff validation resulted in an error
33+
*/
34+
Optional<String> validateAutoscalerOptions(Configuration flinkConf);
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.autoscaler.validation;
20+
21+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
22+
import org.apache.flink.autoscaler.utils.CalendarUtils;
23+
import org.apache.flink.configuration.ConfigOption;
24+
import org.apache.flink.configuration.Configuration;
25+
26+
import java.util.Optional;
27+
28+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
29+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
30+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
31+
32+
/** Default implementation of {@link AutoscalerValidator}. */
33+
public class DefaultAutoscalerValidator implements AutoscalerValidator {
34+
35+
public Optional<String> validateAutoscalerOptions(Configuration flinkConf) {
36+
37+
if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
38+
return Optional.empty();
39+
}
40+
return firstPresent(
41+
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
42+
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
43+
validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
44+
validateNumber(flinkConf, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
45+
validateNumber(flinkConf, UTILIZATION_MAX, flinkConf.get(UTILIZATION_TARGET), 1.0d),
46+
validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, flinkConf.get(UTILIZATION_TARGET)),
47+
CalendarUtils.validateExcludedPeriods(flinkConf));
48+
}
49+
50+
@SafeVarargs
51+
private static Optional<String> firstPresent(Optional<String>... errOpts) {
52+
for (Optional<String> opt : errOpts) {
53+
if (opt.isPresent()) {
54+
return opt;
55+
}
56+
}
57+
return Optional.empty();
58+
}
59+
60+
private static <T extends Number> Optional<String> validateNumber(
61+
Configuration flinkConfiguration,
62+
ConfigOption<T> autoScalerConfig,
63+
Double min,
64+
Double max) {
65+
try {
66+
var configValue = flinkConfiguration.get(autoScalerConfig);
67+
if (configValue != null) {
68+
double value = configValue.doubleValue();
69+
if ((min != null && value < min) || (max != null && value > max)) {
70+
return Optional.of(
71+
String.format(
72+
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
73+
autoScalerConfig.key(),
74+
min != null ? min.toString() : "-Infinity",
75+
max != null ? max.toString() : "+Infinity"));
76+
}
77+
}
78+
return Optional.empty();
79+
} catch (IllegalArgumentException e) {
80+
return Optional.of(
81+
String.format(
82+
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
83+
}
84+
}
85+
86+
private static <T extends Number> Optional<String> validateNumber(
87+
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
88+
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
89+
}
90+
}

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java

+6-59
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package org.apache.flink.kubernetes.operator.validation;
1919

20-
import org.apache.flink.autoscaler.config.AutoScalerOptions;
21-
import org.apache.flink.autoscaler.utils.CalendarUtils;
20+
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
21+
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
2222
import org.apache.flink.configuration.CheckpointingOptions;
23-
import org.apache.flink.configuration.ConfigOption;
2423
import org.apache.flink.configuration.Configuration;
2524
import org.apache.flink.configuration.HighAvailabilityOptions;
2625
import org.apache.flink.configuration.JobManagerOptions;
@@ -65,10 +64,6 @@
6564
import java.util.regex.Matcher;
6665
import java.util.regex.Pattern;
6766

68-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
69-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
70-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
71-
7267
/** Default validator implementation for {@link FlinkDeployment}. */
7368
public class DefaultValidator implements FlinkResourceValidator {
7469

@@ -87,9 +82,11 @@ public class DefaultValidator implements FlinkResourceValidator {
8782
Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
8883

8984
private final FlinkConfigManager configManager;
85+
private final AutoscalerValidator autoscalerValidator;
9086

9187
public DefaultValidator(FlinkConfigManager configManager) {
9288
this.configManager = configManager;
89+
this.autoscalerValidator = new DefaultAutoscalerValidator();
9390
}
9491

9592
@Override
@@ -597,62 +594,12 @@ private Optional<String> validateServiceAccount(String serviceAccount) {
597594
return Optional.empty();
598595
}
599596

600-
public static Optional<String> validateAutoScalerFlinkConfiguration(
597+
public Optional<String> validateAutoScalerFlinkConfiguration(
601598
Map<String, String> effectiveConfig) {
602599
if (effectiveConfig == null) {
603600
return Optional.empty();
604601
}
605602
Configuration flinkConfiguration = Configuration.fromMap(effectiveConfig);
606-
if (!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
607-
return Optional.empty();
608-
}
609-
return firstPresent(
610-
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
611-
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
612-
validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d),
613-
validateNumber(
614-
flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
615-
validateNumber(
616-
flinkConfiguration,
617-
UTILIZATION_MAX,
618-
flinkConfiguration.get(UTILIZATION_TARGET),
619-
1.0d),
620-
validateNumber(
621-
flinkConfiguration,
622-
UTILIZATION_MIN,
623-
0.0d,
624-
flinkConfiguration.get(UTILIZATION_TARGET)),
625-
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
626-
}
627-
628-
private static <T extends Number> Optional<String> validateNumber(
629-
Configuration flinkConfiguration,
630-
ConfigOption<T> autoScalerConfig,
631-
Double min,
632-
Double max) {
633-
try {
634-
var configValue = flinkConfiguration.get(autoScalerConfig);
635-
if (configValue != null) {
636-
double value = configValue.doubleValue();
637-
if ((min != null && value < min) || (max != null && value > max)) {
638-
return Optional.of(
639-
String.format(
640-
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
641-
autoScalerConfig.key(),
642-
min != null ? min.toString() : "-Infinity",
643-
max != null ? max.toString() : "+Infinity"));
644-
}
645-
}
646-
return Optional.empty();
647-
} catch (IllegalArgumentException e) {
648-
return Optional.of(
649-
String.format(
650-
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
651-
}
652-
}
653-
654-
private static <T extends Number> Optional<String> validateNumber(
655-
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
656-
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
603+
return autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
657604
}
658605
}

0 commit comments

Comments
 (0)