Skip to content

Commit f4504f6

Browse files
committed
Harden multik8s capacity executor
1 parent cfcdddb commit f4504f6

2 files changed

Lines changed: 116 additions & 4 deletions

File tree

extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/AutoscalableThreadPoolExecutor.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.druid.java.util.emitter.EmittingLogger;
2727
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
2828

29+
import javax.annotation.Nullable;
2930
import java.util.List;
3031
import java.util.concurrent.LinkedBlockingQueue;
3132
import java.util.concurrent.ThreadPoolExecutor;
@@ -36,11 +37,12 @@ public class AutoscalableThreadPoolExecutor extends ThreadPoolExecutor
3637
{
3738
private static final EmittingLogger log = new EmittingLogger(AutoscalableThreadPoolExecutor.class);
3839

40+
@Nullable
3941
private final ConfigManager configManager;
4042
private final String listenerKey;
4143
private final Consumer<KubernetesTaskRunnerDynamicConfig> configListener;
4244

43-
public AutoscalableThreadPoolExecutor(int initialCapacity, ConfigManager configManager)
45+
public AutoscalableThreadPoolExecutor(int initialCapacity, @Nullable ConfigManager configManager)
4446
{
4547
super(
4648
initialCapacity,
@@ -56,7 +58,7 @@ public AutoscalableThreadPoolExecutor(int initialCapacity, ConfigManager configM
5658
this.configListener = this::onConfigurationChange;
5759

5860
// Monitor the configuration change
59-
if (!configManager.addListener(
61+
if (configManager != null && !configManager.addListener(
6062
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
6163
listenerKey,
6264
configListener)) {
@@ -80,6 +82,10 @@ public List<Runnable> shutdownNow()
8082

8183
private void removeConfigListener()
8284
{
85+
if (configManager == null) {
86+
return;
87+
}
88+
8389
if (!configManager.removeListener(
8490
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
8591
listenerKey,
@@ -91,8 +97,12 @@ private void removeConfigListener()
9197

9298
private void onConfigurationChange(KubernetesTaskRunnerDynamicConfig config)
9399
{
94-
int curCapacity = this.getCorePoolSize();
95-
int newCapacity = config.getCapacity();
100+
if (config.getCapacity() == null) {
101+
return;
102+
}
103+
104+
final int curCapacity = this.getCorePoolSize();
105+
final int newCapacity = config.getCapacity();
96106
if (newCapacity == curCapacity) {
97107
return;
98108
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.k8s.overlord;
21+
22+
import org.apache.druid.common.config.ConfigManager;
23+
import org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
24+
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
25+
import org.easymock.Capture;
26+
import org.easymock.EasyMock;
27+
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.function.Consumer;
32+
33+
public class AutoscalableThreadPoolExecutorTest
34+
{
35+
@Test
36+
public void testConstructorWithNullConfigManager()
37+
{
38+
final AutoscalableThreadPoolExecutor executor = new AutoscalableThreadPoolExecutor(2, null);
39+
40+
Assertions.assertEquals(2, executor.getCorePoolSize());
41+
executor.shutdownNow();
42+
}
43+
44+
@Test
45+
public void testDynamicConfigWithNullCapacityDoesNotChangePoolSize()
46+
{
47+
final ConfigManager configManager = EasyMock.createMock(ConfigManager.class);
48+
final Capture<Consumer<KubernetesTaskRunnerDynamicConfig>> listenerCapture = EasyMock.newCapture();
49+
50+
EasyMock.expect(configManager.addListener(
51+
EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
52+
EasyMock.anyString(),
53+
EasyMock.capture(listenerCapture)
54+
)).andReturn(true);
55+
EasyMock.expect(configManager.removeListener(
56+
EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
57+
EasyMock.anyString(),
58+
EasyMock.anyObject()
59+
)).andReturn(true).anyTimes();
60+
EasyMock.replay(configManager);
61+
62+
final AutoscalableThreadPoolExecutor executor = new AutoscalableThreadPoolExecutor(2, configManager);
63+
listenerCapture.getValue().accept(new DefaultKubernetesTaskRunnerDynamicConfig(null, null));
64+
65+
Assertions.assertEquals(2, executor.getCorePoolSize());
66+
Assertions.assertEquals(2, executor.getMaximumPoolSize());
67+
68+
executor.shutdownNow();
69+
EasyMock.verify(configManager);
70+
}
71+
72+
@Test
73+
public void testDynamicConfigWithCapacityChangesPoolSize() throws InterruptedException
74+
{
75+
final ConfigManager configManager = EasyMock.createMock(ConfigManager.class);
76+
final Capture<Consumer<KubernetesTaskRunnerDynamicConfig>> listenerCapture = EasyMock.newCapture();
77+
78+
EasyMock.expect(configManager.addListener(
79+
EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
80+
EasyMock.anyString(),
81+
EasyMock.capture(listenerCapture)
82+
)).andReturn(true);
83+
EasyMock.expect(configManager.removeListener(
84+
EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
85+
EasyMock.anyString(),
86+
EasyMock.anyObject()
87+
)).andReturn(true).anyTimes();
88+
EasyMock.replay(configManager);
89+
90+
final AutoscalableThreadPoolExecutor executor = new AutoscalableThreadPoolExecutor(2, configManager);
91+
listenerCapture.getValue().accept(new DefaultKubernetesTaskRunnerDynamicConfig(null, 4));
92+
93+
Assertions.assertEquals(4, executor.getCorePoolSize());
94+
Assertions.assertEquals(4, executor.getMaximumPoolSize());
95+
96+
executor.shutdown();
97+
Assertions.assertTrue(executor.isShutdown());
98+
Assertions.assertTrue(executor.getQueue().isEmpty());
99+
Assertions.assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
100+
EasyMock.verify(configManager);
101+
}
102+
}

0 commit comments

Comments
 (0)