Skip to content

Commit 2817a92

Browse files
Add reset method for TimeoutExecutorFactory and implement unit tests
This commit introduces a new `reset` method in the `TimeoutExecutorFactory` class, which allows for resetting the factory state primarily for testing purposes. Additionally, a comprehensive suite of unit tests has been added to validate the functionality of the timeout executors, including their creation, behavior, and shutdown processes. These changes enhance the testability and reliability of the timeout management system in Kyuubi.
1 parent cc3415f commit 2817a92

2 files changed

Lines changed: 217 additions & 0 deletions

File tree

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/timeout/TimeoutExecutorFactory.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.kyuubi.operation.timeout
1919

20+
import com.google.common.annotations.VisibleForTesting
21+
2022
import org.apache.kyuubi.Logging
2123
import org.apache.kyuubi.config.KyuubiConf
2224

@@ -62,4 +64,20 @@ object TimeoutExecutorFactory extends Logging {
6264
threadPoolExecutor.shutdown()
6365
}
6466
}
67+
68+
/**
69+
* Reset the factory state. This is mainly for testing purposes.
70+
* Should only be called when no operations are using the executors.
71+
*/
72+
@VisibleForTesting
73+
private[kyuubi] def reset(): Unit = {
74+
synchronized {
75+
if (threadPoolExecutor != null) {
76+
if (!threadPoolExecutor.isShutdown) {
77+
threadPoolExecutor.shutdown()
78+
}
79+
threadPoolExecutor = null
80+
}
81+
}
82+
}
6583
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.operation.timeout
19+
20+
import java.util.concurrent.{CountDownLatch, TimeUnit}
21+
22+
import org.apache.kyuubi.KyuubiFunSuite
23+
import org.apache.kyuubi.config.KyuubiConf
24+
25+
class TimeoutExecutorFactorySuite extends KyuubiFunSuite {
26+
27+
override def beforeEach(): Unit = {
28+
super.beforeEach()
29+
// Ensure clean state before each test
30+
TimeoutExecutorFactory.reset()
31+
}
32+
33+
override def afterEach(): Unit = {
34+
super.afterEach()
35+
// Clean up after each test
36+
TimeoutExecutorFactory.reset()
37+
}
38+
39+
test("get per-operation timeout executor by default") {
40+
val conf = new KyuubiConf()
41+
val executor = TimeoutExecutorFactory.getExecutor(conf)
42+
assert(executor.isInstanceOf[PerOperationTimeoutExecutor])
43+
// PerOperationTimeoutExecutor is not shutdown by default
44+
}
45+
46+
test("get per-operation timeout executor explicitly") {
47+
val conf = new KyuubiConf()
48+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "per-operation")
49+
val executor = TimeoutExecutorFactory.getExecutor(conf)
50+
assert(executor.isInstanceOf[PerOperationTimeoutExecutor])
51+
// PerOperationTimeoutExecutor is not shutdown by default
52+
}
53+
54+
test("get thread-pool timeout executor") {
55+
val conf = new KyuubiConf()
56+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "thread-pool")
57+
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE, 2)
58+
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME, 30000L)
59+
60+
val executor = TimeoutExecutorFactory.getExecutor(conf)
61+
assert(executor.isInstanceOf[ThreadPoolTimeoutExecutor])
62+
assert(!executor.isShutdown)
63+
}
64+
65+
test("thread-pool executor singleton behavior") {
66+
val conf = new KyuubiConf()
67+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "thread-pool")
68+
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE, 2)
69+
70+
val executor1 = TimeoutExecutorFactory.getExecutor(conf)
71+
val executor2 = TimeoutExecutorFactory.getExecutor(conf)
72+
73+
// Should return the same instance for thread-pool type
74+
assert(executor1 eq executor2)
75+
assert(executor1.isInstanceOf[ThreadPoolTimeoutExecutor])
76+
}
77+
78+
test("per-operation executor creates new instances") {
79+
val conf = new KyuubiConf()
80+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "per-operation")
81+
82+
val executor1 = TimeoutExecutorFactory.getExecutor(conf)
83+
val executor2 = TimeoutExecutorFactory.getExecutor(conf)
84+
85+
// Should return different instances for per-operation type
86+
assert(executor1 ne executor2)
87+
assert(executor1.isInstanceOf[PerOperationTimeoutExecutor])
88+
assert(executor2.isInstanceOf[PerOperationTimeoutExecutor])
89+
}
90+
91+
test("invalid executor type throws exception") {
92+
val conf = new KyuubiConf()
93+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "unknown-type")
94+
95+
// Should throw IllegalArgumentException for invalid executor type
96+
intercept[IllegalArgumentException] {
97+
TimeoutExecutorFactory.getExecutor(conf)
98+
}
99+
}
100+
101+
test("timeout executor factory shutdown - thread-pool type") {
102+
val conf = new KyuubiConf()
103+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "thread-pool")
104+
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE, 2)
105+
106+
// Get the thread-pool executor to initialize it
107+
val executor = TimeoutExecutorFactory.getExecutor(conf)
108+
assert(executor.isInstanceOf[ThreadPoolTimeoutExecutor])
109+
assert(!executor.isShutdown)
110+
111+
// Schedule a timeout action to verify the executor is working
112+
val latch = new CountDownLatch(1)
113+
val timeoutAction = new Runnable {
114+
override def run(): Unit = latch.countDown()
115+
}
116+
117+
val future = executor.scheduleTimeout(timeoutAction, 1)
118+
119+
// Wait for the timeout action to execute
120+
assert(latch.await(5, TimeUnit.SECONDS), "Timeout action should have executed")
121+
122+
// Now shutdown the factory
123+
TimeoutExecutorFactory.shutdown()
124+
125+
// Verify the executor is shutdown
126+
assert(
127+
executor.isShutdown,
128+
"ThreadPoolTimeoutExecutor should be shutdown after factory shutdown")
129+
}
130+
131+
test("timeout executor factory shutdown - per-operation type") {
132+
val conf = new KyuubiConf()
133+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "per-operation")
134+
135+
// Get a per-operation executor
136+
val executor = TimeoutExecutorFactory.getExecutor(conf)
137+
assert(executor.isInstanceOf[PerOperationTimeoutExecutor])
138+
139+
// For per-operation type, factory shutdown should be safe (no-op)
140+
TimeoutExecutorFactory.shutdown()
141+
142+
// Per-operation executors are independent, so factory shutdown doesn't affect them
143+
// (they manage their own lifecycle)
144+
}
145+
146+
test("timeout executor factory shutdown - multiple calls are safe") {
147+
val conf = new KyuubiConf()
148+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "thread-pool")
149+
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE, 2)
150+
151+
// Initialize the thread-pool executor
152+
val executor = TimeoutExecutorFactory.getExecutor(conf)
153+
assert(!executor.isShutdown)
154+
155+
// Multiple shutdown calls should be safe
156+
TimeoutExecutorFactory.shutdown()
157+
assert(executor.isShutdown)
158+
159+
// Additional shutdown calls should not cause issues
160+
TimeoutExecutorFactory.shutdown()
161+
TimeoutExecutorFactory.shutdown()
162+
163+
assert(executor.isShutdown)
164+
}
165+
166+
test("timeout executor factory shutdown - no executor initialized") {
167+
// Calling shutdown when no executor has been initialized should be safe
168+
TimeoutExecutorFactory.shutdown()
169+
// No assertion needed, just verify no exception is thrown
170+
}
171+
172+
test("thread-pool executor functionality after factory creation") {
173+
val conf = new KyuubiConf()
174+
conf.set(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE, "thread-pool")
175+
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE, 3)
176+
conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME, 60000L)
177+
178+
val executor = TimeoutExecutorFactory.getExecutor(conf)
179+
180+
// Test scheduling multiple timeout actions
181+
val latch = new CountDownLatch(2)
182+
val timeoutAction = new Runnable {
183+
override def run(): Unit = latch.countDown()
184+
}
185+
186+
val future1 = executor.scheduleTimeout(timeoutAction, 1)
187+
val future2 = executor.scheduleTimeout(timeoutAction, 1)
188+
189+
// Wait for both actions to execute
190+
assert(latch.await(5, TimeUnit.SECONDS), "Both timeout actions should have executed")
191+
192+
// Test cancellation (before shutdown)
193+
val future3 = executor.scheduleTimeout(timeoutAction, 10) // Long timeout
194+
executor.cancelTimeout(future3)
195+
assert(future3.isCancelled, "Future should be cancelled")
196+
197+
// The factory shutdown will be called in afterEach
198+
}
199+
}

0 commit comments

Comments
 (0)