Skip to content

Commit 24edc0d

Browse files
author
huyuanfeng
committed
[FLINK-37126] Add Validator for Autoscaler
1 parent 4c2c90c commit 24edc0d

File tree

5 files changed

+254
-60
lines changed

5 files changed

+254
-60
lines changed

Diff for: flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.flink.autoscaler.JobAutoScaler;
2222
import org.apache.flink.autoscaler.JobAutoScalerContext;
2323
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
24+
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
25+
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
2426
import org.apache.flink.configuration.Configuration;
2527
import org.apache.flink.configuration.UnmodifiableConfiguration;
2628
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -41,6 +43,7 @@
4143
import java.util.HashSet;
4244
import java.util.List;
4345
import java.util.Map;
46+
import java.util.Optional;
4447
import java.util.Set;
4548
import java.util.concurrent.CompletableFuture;
4649
import java.util.concurrent.ConcurrentHashMap;
@@ -51,6 +54,7 @@
5154
import java.util.function.Function;
5255
import java.util.stream.Collectors;
5356

57+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5458
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
5559
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;
5660

@@ -69,6 +73,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
6973
private final ScheduledExecutorService scheduledExecutorService;
7074
private final ExecutorService scalingThreadPool;
7175
private final UnmodifiableConfiguration baseConf;
76+
private final AutoscalerValidator autoscalerValidator;
7277

7378
/**
7479
* Maintain a set of job keys that during scaling, it should be accessed at {@link
@@ -104,6 +109,7 @@ public StandaloneAutoscalerExecutor(
104109
parallelism, new ExecutorThreadFactory("autoscaler-standalone-scaling"));
105110
this.scalingJobKeys = new HashSet<>();
106111
this.baseConf = new UnmodifiableConfiguration(conf);
112+
this.autoscalerValidator = new DefaultAutoscalerValidator();
107113
}
108114

109115
public void start() {
@@ -189,7 +195,19 @@ private void cleanupStoppedJob(Collection<Context> jobList) {
189195
protected void scalingSingleJob(Context jobContext) {
190196
try {
191197
MDC.put("job.key", jobContext.getJobKey().toString());
192-
autoScaler.scale(jobContext);
198+
Optional<String> validationError =
199+
autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration());
200+
if (validationError.isPresent()) {
201+
eventHandler.handleEvent(
202+
jobContext,
203+
AutoScalerEventHandler.Type.Warning,
204+
"AutoScaler Options Validation",
205+
validationError.get(),
206+
null,
207+
baseConf.get(SCALING_EVENT_INTERVAL));
208+
} else {
209+
autoScaler.scale(jobContext);
210+
}
193211
} catch (Throwable e) {
194212
LOG.error("Error while scaling job", e);
195213
eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.standalone;
19+
20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.autoscaler.JobAutoScaler;
23+
import org.apache.flink.autoscaler.JobAutoScalerContext;
24+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
25+
import org.apache.flink.autoscaler.event.TestingEventCollector;
26+
import org.apache.flink.configuration.Configuration;
27+
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
class StandaloneAutoscalerValidatorTest {
39+
private List<JobAutoScalerContext<JobID>> jobList;
40+
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
41+
private ConcurrentHashMap<JobID, Integer> scaleCounter;
42+
private Configuration correctConfiguration;
43+
private Configuration invalidConfiguration;
44+
45+
@BeforeEach
46+
void setUp() {
47+
jobList = new ArrayList<>();
48+
eventCollector = new TestingEventCollector<>();
49+
scaleCounter = new ConcurrentHashMap<>();
50+
51+
correctConfiguration = new Configuration();
52+
correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
53+
54+
invalidConfiguration = new Configuration();
55+
invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
56+
invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, -1.0);
57+
}
58+
59+
@Test
60+
void testAutoScalerWithInvalidConfig() {
61+
JobAutoScalerContext<JobID> validJob = createJobAutoScalerContext(correctConfiguration);
62+
JobAutoScalerContext<JobID> invalidJob = createJobAutoScalerContext(invalidConfiguration);
63+
64+
jobList.add(validJob);
65+
jobList.add(invalidJob);
66+
67+
try (StandaloneAutoscalerExecutor<JobID, JobAutoScalerContext<JobID>> autoscalerExecutor =
68+
new StandaloneAutoscalerExecutor<>(
69+
new Configuration(),
70+
baseConf -> jobList,
71+
eventCollector,
72+
new JobAutoScaler<>() {
73+
@Override
74+
public void scale(JobAutoScalerContext<JobID> context) {
75+
scaleCounter.merge(context.getJobKey(), 1, Integer::sum);
76+
}
77+
78+
@Override
79+
public void cleanup(JobAutoScalerContext<JobID> context) {
80+
// No cleanup required for the test
81+
}
82+
})) {
83+
84+
List<CompletableFuture<Void>> scaledFutures = autoscalerExecutor.scaling();
85+
86+
// Verification triggers two scaling tasks
87+
assertThat(scaledFutures).hasSize(2);
88+
89+
// Only legally configured tasks are scaled
90+
assertThat(scaleCounter).hasSize(1).containsKey(validJob.getJobKey());
91+
92+
// Verification Event Collector captures an event
93+
assertThat(eventCollector.events).hasSize(1);
94+
assertThat(eventCollector.events)
95+
.allMatch(event -> event.getContext().equals(invalidJob));
96+
}
97+
}
98+
99+
private JobAutoScalerContext<JobID> createJobAutoScalerContext(Configuration configuration) {
100+
JobID jobID = new JobID();
101+
return new JobAutoScalerContext<>(
102+
jobID, jobID, JobStatus.RUNNING, configuration, null, null);
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.autoscaler.validation;
20+
21+
import org.apache.flink.configuration.Configuration;
22+
23+
import java.util.Optional;
24+
25+
/** Validator for Autoscaler. */
26+
public interface AutoscalerValidator {
27+
28+
/**
29+
* Validate autoscaler config and return optional error.
30+
*
31+
* @param flinkConf autoscaler config
32+
* @return Optional error string, should be present iff validation resulted in an error
33+
*/
34+
Optional<String> validateAutoscalerOptions(Configuration flinkConf);
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.autoscaler.validation;
20+
21+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
22+
import org.apache.flink.autoscaler.utils.CalendarUtils;
23+
import org.apache.flink.configuration.ConfigOption;
24+
import org.apache.flink.configuration.Configuration;
25+
26+
import java.util.Optional;
27+
28+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
29+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
30+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
31+
32+
/** Default implementation of {@link AutoscalerValidator}. */
33+
public class DefaultAutoscalerValidator implements AutoscalerValidator {
34+
35+
public Optional<String> validateAutoscalerOptions(Configuration flinkConf) {
36+
37+
if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
38+
return Optional.empty();
39+
}
40+
return firstPresent(
41+
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
42+
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
43+
validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
44+
validateNumber(flinkConf, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
45+
validateNumber(flinkConf, UTILIZATION_MAX, flinkConf.get(UTILIZATION_TARGET), 1.0d),
46+
validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, flinkConf.get(UTILIZATION_TARGET)),
47+
CalendarUtils.validateExcludedPeriods(flinkConf));
48+
}
49+
50+
@SafeVarargs
51+
private static Optional<String> firstPresent(Optional<String>... errOpts) {
52+
for (Optional<String> opt : errOpts) {
53+
if (opt.isPresent()) {
54+
return opt;
55+
}
56+
}
57+
return Optional.empty();
58+
}
59+
60+
private static <T extends Number> Optional<String> validateNumber(
61+
Configuration flinkConfiguration,
62+
ConfigOption<T> autoScalerConfig,
63+
Double min,
64+
Double max) {
65+
try {
66+
var configValue = flinkConfiguration.get(autoScalerConfig);
67+
if (configValue != null) {
68+
double value = configValue.doubleValue();
69+
if ((min != null && value < min) || (max != null && value > max)) {
70+
return Optional.of(
71+
String.format(
72+
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
73+
autoScalerConfig.key(),
74+
min != null ? min.toString() : "-Infinity",
75+
max != null ? max.toString() : "+Infinity"));
76+
}
77+
}
78+
return Optional.empty();
79+
} catch (IllegalArgumentException e) {
80+
return Optional.of(
81+
String.format(
82+
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
83+
}
84+
}
85+
86+
private static <T extends Number> Optional<String> validateNumber(
87+
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
88+
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
89+
}
90+
}

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java

+6-59
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package org.apache.flink.kubernetes.operator.validation;
1919

20-
import org.apache.flink.autoscaler.config.AutoScalerOptions;
21-
import org.apache.flink.autoscaler.utils.CalendarUtils;
20+
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
21+
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
2222
import org.apache.flink.configuration.CheckpointingOptions;
23-
import org.apache.flink.configuration.ConfigOption;
2423
import org.apache.flink.configuration.Configuration;
2524
import org.apache.flink.configuration.HighAvailabilityOptions;
2625
import org.apache.flink.configuration.JobManagerOptions;
@@ -65,10 +64,6 @@
6564
import java.util.regex.Matcher;
6665
import java.util.regex.Pattern;
6766

68-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
69-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
70-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
71-
7267
/** Default validator implementation for {@link FlinkDeployment}. */
7368
public class DefaultValidator implements FlinkResourceValidator {
7469

@@ -87,9 +82,11 @@ public class DefaultValidator implements FlinkResourceValidator {
8782
Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
8883

8984
private final FlinkConfigManager configManager;
85+
private final AutoscalerValidator autoscalerValidator;
9086

9187
public DefaultValidator(FlinkConfigManager configManager) {
9288
this.configManager = configManager;
89+
this.autoscalerValidator = new DefaultAutoscalerValidator();
9390
}
9491

9592
@Override
@@ -597,62 +594,12 @@ private Optional<String> validateServiceAccount(String serviceAccount) {
597594
return Optional.empty();
598595
}
599596

600-
public static Optional<String> validateAutoScalerFlinkConfiguration(
597+
public Optional<String> validateAutoScalerFlinkConfiguration(
601598
Map<String, String> effectiveConfig) {
602599
if (effectiveConfig == null) {
603600
return Optional.empty();
604601
}
605602
Configuration flinkConfiguration = Configuration.fromMap(effectiveConfig);
606-
if (!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
607-
return Optional.empty();
608-
}
609-
return firstPresent(
610-
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
611-
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
612-
validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d),
613-
validateNumber(
614-
flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
615-
validateNumber(
616-
flinkConfiguration,
617-
UTILIZATION_MAX,
618-
flinkConfiguration.get(UTILIZATION_TARGET),
619-
1.0d),
620-
validateNumber(
621-
flinkConfiguration,
622-
UTILIZATION_MIN,
623-
0.0d,
624-
flinkConfiguration.get(UTILIZATION_TARGET)),
625-
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
626-
}
627-
628-
private static <T extends Number> Optional<String> validateNumber(
629-
Configuration flinkConfiguration,
630-
ConfigOption<T> autoScalerConfig,
631-
Double min,
632-
Double max) {
633-
try {
634-
var configValue = flinkConfiguration.get(autoScalerConfig);
635-
if (configValue != null) {
636-
double value = configValue.doubleValue();
637-
if ((min != null && value < min) || (max != null && value > max)) {
638-
return Optional.of(
639-
String.format(
640-
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
641-
autoScalerConfig.key(),
642-
min != null ? min.toString() : "-Infinity",
643-
max != null ? max.toString() : "+Infinity"));
644-
}
645-
}
646-
return Optional.empty();
647-
} catch (IllegalArgumentException e) {
648-
return Optional.of(
649-
String.format(
650-
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
651-
}
652-
}
653-
654-
private static <T extends Number> Optional<String> validateNumber(
655-
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
656-
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
603+
return autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
657604
}
658605
}

0 commit comments

Comments
 (0)