Skip to content

Commit 0756f94

Browse files
author
fengfei02
committed
[FLINK-34566] Pass a FixedThreadPool to set reconciliation parallelism correctly
1 parent b8f22bf commit 0756f94

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
137137
overrider.withExecutorService(Executors.newCachedThreadPool());
138138
} else {
139139
LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
140-
overrider.withConcurrentReconciliationThreads(parallelism);
140+
overrider.withExecutorService(Executors.newFixedThreadPool(parallelism));
141141
}
142142

143143
if (operatorConf.isJosdkMetricsEnabled()) {

Diff for: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,22 @@ public void testConfigurationPassedToJOSDK() {
7070

7171
var configService = testOperator.getOperator().getConfigurationService();
7272

73-
// Test parallelism being passed
73+
// Test parallelism being passed expectedly
7474
var executorService = configService.getExecutorService();
7575
Assertions.assertInstanceOf(ThreadPoolExecutor.class, executorService);
7676
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
77+
for (int i = 0; i < testParallelism * 2; i++) {
78+
threadPoolExecutor.execute(
79+
() -> {
80+
try {
81+
Thread.sleep(1000);
82+
} catch (InterruptedException e) {
83+
e.printStackTrace();
84+
}
85+
});
86+
}
7787
Assertions.assertEquals(threadPoolExecutor.getMaximumPoolSize(), testParallelism);
88+
Assertions.assertEquals(threadPoolExecutor.getPoolSize(), testParallelism);
7889

7990
// Test label selector being passed
8091
// We have a label selector for each controller

0 commit comments

Comments
 (0)