Skip to content

Commit cd673d0

Browse files
feat(kyuubi-common):增加操作超时监控线程池配置并实现相关功能
- 在 KyuubiConf 中添加 OPERATION_TIMEOUT_MONITOR_POOL_SIZE 配置项 - 在 SessionManager 中使用新配置初始化操作超时监控线程池 - 在 ThreadUtils 中新增 newDaemonScheduledThreadPool 方法 - 在 ThreadUtilsSuite 中添加新方法的测试用例
1 parent 1e72e4b commit cd673d0

4 files changed

Lines changed: 204 additions & 4 deletions

File tree

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,12 @@ object KyuubiConf {
336336
.version("1.0.0")
337337
.timeConf
338338
.createWithDefault(Duration.ofHours(3).toMillis)
339-
339+
val OPERATION_TIMEOUT_MONITOR_POOL_SIZE: ConfigEntry[Int] =
340+
buildConf("kyuubi.operation.timeout.monitor.pool.size")
341+
.doc("Number of threads in the operation timeout monitor thread pool")
342+
.version("1.8.0")
343+
.intConf
344+
.createWithDefault(10)
340345
val CREDENTIALS_RENEWAL_INTERVAL: ConfigEntry[Long] =
341346
buildConf("kyuubi.credentials.renewal.interval")
342347
.doc("How often Kyuubi renews one user's delegation tokens")

kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
288288
s"$name-exec-pool")
289289

290290
// Initialize timeout monitor pool
291-
operationTimeoutMonitor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
291+
operationTimeoutMonitor = ThreadUtils.newDaemonScheduledThreadPool(
292+
conf.get(OPERATION_TIMEOUT_MONITOR_POOL_SIZE),
292293
s"$name-operation-timeout-monitor-pool")
293294

294295
super.initialize(conf)

kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ object ThreadUtils extends Logging {
3737
executor
3838
}
3939

40+
def newDaemonScheduledThreadPool(
41+
poolSize: Int,
42+
threadName: String): ScheduledExecutorService = {
43+
val threadFactory = new NamedThreadFactory(threadName, daemon = true)
44+
val executor = new ScheduledThreadPoolExecutor(poolSize, threadFactory)
45+
executor.setRemoveOnCancelPolicy(true)
46+
executor
47+
}
48+
4049
def newDaemonQueuedThreadPool(
4150
poolSize: Int,
4251
poolQueueSize: Int,

kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala

Lines changed: 187 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,20 @@
1717

1818
package org.apache.kyuubi.util
1919

20-
import java.util.concurrent.TimeUnit
20+
import java.lang.Thread.sleep
21+
import java.util.concurrent._
22+
23+
import scala.collection.concurrent.TrieMap
24+
25+
import org.scalatest.time.{Millis, Seconds, Span}
2126

2227
import org.apache.kyuubi.KyuubiFunSuite
2328

2429
class ThreadUtilsSuite extends KyuubiFunSuite {
25-
30+
// Configure Eventually patience for retries/waits
31+
implicit override val patienceConfig: PatienceConfig = PatienceConfig(
32+
timeout = scaled(Span(5, Seconds)),
33+
interval = scaled(Span(10, Millis)))
2634
test("New daemon single thread scheduled executor for shutdown") {
2735
val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest")
2836
@volatile var threadName = ""
@@ -61,4 +69,181 @@ class ThreadUtilsSuite extends KyuubiFunSuite {
6169
service.awaitTermination(10, TimeUnit.SECONDS)
6270
assert(threadName startsWith "")
6371
}
72+
73+
// Helper function for cleanup
74+
private def shutdownAndAwaitTermination(
75+
service: ExecutorService,
76+
timeoutSeconds: Long = 5): Unit = {
77+
service.shutdown() // Disable new tasks from being submitted
78+
try {
79+
// Wait a while for existing tasks to terminate
80+
if (!service.awaitTermination(timeoutSeconds / 2, TimeUnit.SECONDS)) {
81+
service.shutdownNow() // Cancel currently executing tasks
82+
// Wait a while for tasks to respond to being cancelled
83+
if (!service.awaitTermination(timeoutSeconds / 2, TimeUnit.SECONDS)) {
84+
throw new IllegalStateException(
85+
s"Thread pool did not terminate within $timeoutSeconds seconds")
86+
}
87+
}
88+
} catch {
89+
case _: InterruptedException =>
90+
// (Re-)Cancel if current thread also interrupted
91+
service.shutdownNow()
92+
// Preserve interrupt status
93+
Thread.currentThread().interrupt()
94+
}
95+
}
96+
test("newDaemonScheduledThreadPool - thread naming and daemon status") {
97+
val poolSize = 2
98+
val threadNamePrefix = "test-pool-thread"
99+
val executor = ThreadUtils.newDaemonScheduledThreadPool(poolSize, threadNamePrefix)
100+
val latch = new CountDownLatch(poolSize)
101+
val threadNames = TrieMap[String, Boolean]() // Thread-safe map
102+
try {
103+
for (_ <- 0 until poolSize) {
104+
executor.submit(new Runnable {
105+
override def run(): Unit = {
106+
val currentThread = Thread.currentThread()
107+
threadNames.put(currentThread.getName, currentThread.isDaemon)
108+
latch.countDown()
109+
}
110+
})
111+
}
112+
// Wait for tasks to complete
113+
assert(latch.await(5, TimeUnit.SECONDS), "Tasks did not complete in time")
114+
// Verify thread names and daemon status
115+
assert(threadNames.size === poolSize)
116+
threadNames.foreach { case (name, isDaemon) =>
117+
assert(
118+
name.startsWith(threadNamePrefix),
119+
s"Thread name '$name' should start with '$threadNamePrefix'")
120+
assert(isDaemon, s"Thread '$name' should be a daemon thread")
121+
}
122+
} finally {
123+
shutdownAndAwaitTermination(executor)
124+
}
125+
}
126+
test("newDaemonScheduledThreadPool - schedule and execute tasks") {
127+
val executor = ThreadUtils.newDaemonScheduledThreadPool(1, "test-schedule")
128+
val taskRan = new java.util.concurrent.atomic.AtomicBoolean(false)
129+
val latch = new CountDownLatch(1)
130+
try {
131+
val future: ScheduledFuture[_] = executor.schedule(
132+
new Runnable {
133+
override def run(): Unit = {
134+
taskRan.set(true)
135+
latch.countDown()
136+
}
137+
},
138+
50,
139+
TimeUnit.MILLISECONDS
140+
) // Schedule with a small delay
141+
// Wait for the task to execute
142+
assert(latch.await(2, TimeUnit.SECONDS), "Scheduled task did not run in time")
143+
assert(taskRan.get(), "Scheduled task flag should be true")
144+
assert(future.isDone, "Future should be done after task completion")
145+
} finally {
146+
shutdownAndAwaitTermination(executor)
147+
}
148+
}
149+
test("newDaemonScheduledThreadPool - removeOnCancelPolicy works for scheduled tasks") {
150+
// We need the specific ScheduledThreadPoolExecutor type to access the queue
151+
val executor = ThreadUtils.newDaemonScheduledThreadPool(1, "test-cancel")
152+
.asInstanceOf[ScheduledThreadPoolExecutor]
153+
val taskRan = new java.util.concurrent.atomic.AtomicBoolean(false)
154+
try {
155+
// Schedule a task far enough in the future that we can cancel it
156+
val future: ScheduledFuture[_] = executor.schedule(
157+
new Runnable {
158+
override def run(): Unit = {
159+
taskRan.set(true)
160+
}
161+
},
162+
5,
163+
TimeUnit.SECONDS
164+
) // Long delay
165+
// Verify the task is in the queue initially
166+
eventually {
167+
assert(executor.getQueue.size() === 1, "Task should be in the queue initially")
168+
}
169+
// Cancel the task
170+
val cancelled = future.cancel(false) // false = don't interrupt if running (it shouldn't be)
171+
assert(cancelled, "Future.cancel() should return true")
172+
// Verify the task is removed from the queue due to the policy
173+
eventually {
174+
assert(
175+
executor.getQueue.isEmpty,
176+
"Task should be removed from the queue after cancellation")
177+
}
178+
// Wait a bit and verify the task never ran
179+
Thread.sleep(100) // Give some time just in case
180+
assert(!taskRan.get(), "Cancelled task should not have run")
181+
} finally {
182+
shutdownAndAwaitTermination(executor)
183+
// Final check after shutdown
184+
assert(executor.getQueue.isEmpty, "Queue should be empty after shutdown")
185+
}
186+
}
187+
test("newDaemonScheduledThreadPool - shutdown rejects new tasks") {
188+
val executor = ThreadUtils.newDaemonScheduledThreadPool(1, "test-shutdown")
189+
try {
190+
// Submit one task to ensure the pool is active
191+
val future = executor.submit(new Runnable { override def run(): Unit = Thread.sleep(50) })
192+
future.get(1, TimeUnit.SECONDS) // Wait for it to finish
193+
executor.shutdown()
194+
assert(executor.isShutdown, "Executor should be shutdown")
195+
// Try submitting after shutdown
196+
assertThrows[RejectedExecutionException] {
197+
executor.submit(new Runnable { override def run(): Unit = sleep(5) })
198+
}
199+
assertThrows[RejectedExecutionException] {
200+
executor.schedule(
201+
new Runnable { override def run(): Unit = sleep(5) },
202+
10,
203+
TimeUnit.MILLISECONDS)
204+
}
205+
} finally {
206+
// Ensure termination even if already shut down
207+
shutdownAndAwaitTermination(executor)
208+
assert(executor.isTerminated, "Executor should be terminated")
209+
}
210+
}
211+
test("newDaemonScheduledThreadPool - concurrent execution with poolSize > 1") {
212+
val poolSize = 3
213+
val taskCount = 5
214+
val executor = ThreadUtils.newDaemonScheduledThreadPool(poolSize, "test-concurrent")
215+
val latch = new CountDownLatch(taskCount)
216+
val runningThreads = TrieMap[String, Long]() // Thread Name -> Start Time
217+
val executionTimes = TrieMap[Int, Long]() // Task Index -> Completion Time
218+
try {
219+
val startTime = System.nanoTime()
220+
for (i <- 0 until taskCount) {
221+
executor.submit(new Runnable {
222+
override def run(): Unit = {
223+
val threadName = Thread.currentThread().getName
224+
runningThreads.put(threadName, System.nanoTime())
225+
try {
226+
// Simulate work
227+
Thread.sleep(100)
228+
} finally {
229+
executionTimes.put(i, System.nanoTime() - startTime)
230+
latch.countDown()
231+
}
232+
}
233+
})
234+
}
235+
// Wait for all tasks to complete
236+
assert(latch.await(5, TimeUnit.SECONDS), s"All $taskCount tasks did not complete in time")
237+
// Verify that multiple threads were used (likely up to poolSize)
238+
assert(
239+
runningThreads.size > 1,
240+
s"Expected more than 1 thread to be used, but found ${runningThreads.size}")
241+
assert(
242+
runningThreads.size <= poolSize,
243+
s"Used ${runningThreads.size} threads, which should not exceed poolSize $poolSize")
244+
} finally {
245+
shutdownAndAwaitTermination(executor)
246+
}
247+
}
248+
64249
}

0 commit comments

Comments
 (0)