Skip to content

Commit c44f448

Browse files
committed
Add operation timeout management with configurable executors
This commit introduces a new timeout management system for operations in Kyuubi. It includes the following features: - New configuration options for operation timeout executor type, pool size, and keep-alive time. - Implementation of two timeout executor strategies: `PerOperationTimeoutExecutor` for dedicated threads per operation and `ThreadPoolTimeoutExecutor` for shared thread pools. - Refactoring of the `AbstractOperation` class to utilize the new timeout executors. - Updates to the `KyuubiOperation` class to ensure proper cleanup of timeout monitors. These changes enhance the flexibility and efficiency of operation timeout handling in Kyuubi.
1 parent 22d6d0f commit c44f448

7 files changed

Lines changed: 313 additions & 8 deletions

File tree

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2212,6 +2212,33 @@ object KyuubiConf {
22122212
.checkValue(_ >= 1000, "must >= 1s if set")
22132213
.createOptional
22142214

2215+
val OPERATION_TIMEOUT_EXECUTOR_TYPE: ConfigEntry[String] =
2216+
buildConf("kyuubi.operation.timeout.executor.type")
2217+
.doc("The type of timeout executor to use for operation timeout management. " +
2218+
"Use 'thread-pool' for a shared thread pool or 'per-operation' to create a new thread " +
2219+
"for each operation.")
2220+
.version("1.11.0")
2221+
.stringConf
2222+
.checkValues(Set("thread-pool", "per-operation"))
2223+
.createWithDefault("per-operation")
2224+
2225+
val OPERATION_TIMEOUT_POOL_SIZE: ConfigEntry[Int] =
2226+
buildConf("kyuubi.operation.timeout.pool.size")
2227+
.doc("The number of threads in the shared timeout executor pool. " +
2228+
"This is only used when 'kyuubi.operation.timeout.executor.type' is 'thread-pool'.")
2229+
.version("1.11.0")
2230+
.intConf
2231+
.createWithDefault(8)
2232+
2233+
val OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME: ConfigEntry[Long] =
2234+
buildConf("kyuubi.operation.timeout.pool.keepalive.time")
2235+
.doc("The keep-alive time in milliseconds for idle threads in the shared timeout " +
2236+
"executor pool. This is only used when 'kyuubi.operation.timeout.executor.type' " +
2237+
"is 'thread-pool'.")
2238+
.version("1.11.0")
2239+
.longConf
2240+
.createWithDefault(60000L)
2241+
22152242
val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] =
22162243
buildConf("kyuubi.operation.query.timeout.monitor.enabled")
22172244
.doc("Whether to monitor timeout query timeout check on server side.")

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.kyuubi.operation
1919

2020
import java.io.IOException
21-
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
21+
import java.util.concurrent.{Future, ScheduledFuture}
2222
import java.util.concurrent.locks.ReentrantLock
2323

2424
import scala.collection.JavaConverters._
@@ -30,9 +30,10 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
3030
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
3131
import org.apache.kyuubi.operation.OperationState._
3232
import org.apache.kyuubi.operation.log.OperationLog
33+
import org.apache.kyuubi.operation.timeout.TimeoutExecutor
34+
import org.apache.kyuubi.operation.timeout.TimeoutExecutorFactory
3335
import org.apache.kyuubi.session.Session
3436
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, TStatusCode}
35-
import org.apache.kyuubi.util.ThreadUtils
3637

3738
abstract class AbstractOperation(session: Session) extends Operation with Logging {
3839

@@ -45,7 +46,8 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
4546

4647
final private[kyuubi] val statementId = handle.identifier.toString
4748

48-
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
49+
private var timeoutExecutor: Option[TimeoutExecutor] = None
50+
private var timeoutFuture: Option[ScheduledFuture[_]] = None
4951

5052
private val lock: ReentrantLock = new ReentrantLock()
5153

@@ -60,8 +62,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
6062

6163
protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
6264
if (queryTimeout > 0) {
63-
val timeoutExecutor =
64-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
65+
val executor = TimeoutExecutorFactory.getExecutor(session.sessionManager.getConf)
6566
val action: Runnable = () =>
6667
// Clients less than version 2.1 have no HIVE-4924 Patch,
6768
// no queryTimeout parameter and no TIMEOUT status.
@@ -74,13 +75,19 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
7475
} else {
7576
cleanup(OperationState.TIMEOUT)
7677
}
77-
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
78-
statementTimeoutCleaner = Some(timeoutExecutor)
78+
79+
val future = executor.scheduleTimeout(action, queryTimeout)
80+
timeoutExecutor = Some(executor)
81+
timeoutFuture = Some(future)
7982
}
8083
}
8184

8285
protected def shutdownTimeoutMonitor(): Unit = {
83-
statementTimeoutCleaner.foreach(_.shutdown())
86+
timeoutFuture.foreach { future =>
87+
timeoutExecutor.foreach(_.cancelTimeout(future))
88+
}
89+
timeoutExecutor = None
90+
timeoutFuture = None
8491
}
8592

8693
override def getOperationLog: Option[OperationLog] = None
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.operation.timeout
19+
20+
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
21+
22+
import org.apache.kyuubi.Logging
23+
import org.apache.kyuubi.util.ThreadUtils
24+
25+
/**
26+
* Per-operation timeout executor that creates a dedicated thread
27+
* for each operation timeout (current implementation).
28+
*/
29+
class PerOperationTimeoutExecutor extends TimeoutExecutor with Logging {
30+
31+
private var executor: Option[ScheduledExecutorService] = None
32+
private var scheduledFuture: Option[ScheduledFuture[_]] = None
33+
34+
override def scheduleTimeout(
35+
timeoutAction: Runnable,
36+
timeoutSeconds: Long): ScheduledFuture[_] = {
37+
// Create a new executor for this operation
38+
val timeoutExecutor =
39+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
40+
executor = Some(timeoutExecutor)
41+
42+
val wrappedAction: Runnable = () => {
43+
try {
44+
timeoutAction.run()
45+
} catch {
46+
case e: Exception =>
47+
error("Error during timeout action execution", e)
48+
}
49+
}
50+
51+
val future = timeoutExecutor.schedule(wrappedAction, timeoutSeconds, TimeUnit.SECONDS)
52+
scheduledFuture = Some(future)
53+
future
54+
}
55+
56+
override def cancelTimeout(future: ScheduledFuture[_]): Unit = {
57+
scheduledFuture.foreach { f =>
58+
if (!f.isCancelled) {
59+
f.cancel(false)
60+
}
61+
}
62+
shutdown()
63+
}
64+
65+
override def shutdown(): Unit = {
66+
executor.foreach(_.shutdown())
67+
executor = None
68+
scheduledFuture = None
69+
}
70+
71+
override def isShutdown: Boolean = {
72+
executor.forall(_.isShutdown)
73+
}
74+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.operation.timeout
19+
20+
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}
21+
22+
import org.apache.kyuubi.Logging
23+
import org.apache.kyuubi.util.{NamedThreadFactory, ThreadUtils}
24+
25+
/**
26+
* Thread pool based timeout executor that shares a pool of threads
27+
* for managing operation timeouts.
28+
*
29+
* @param poolSize the number of threads in the pool
30+
* @param keepAliveMs the keep-alive time for idle threads
31+
*/
32+
class ThreadPoolTimeoutExecutor(
33+
poolSize: Int,
34+
keepAliveMs: Long) extends TimeoutExecutor with Logging {
35+
36+
private val executor: ScheduledExecutorService = {
37+
val threadFactory = new NamedThreadFactory("operation-timeout-pool", daemon = true)
38+
val scheduledExecutor = new ScheduledThreadPoolExecutor(poolSize, threadFactory)
39+
scheduledExecutor.setRemoveOnCancelPolicy(true)
40+
scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
41+
info(s"operation-timeout-pool: pool size: $poolSize, keepalive time: $keepAliveMs ms")
42+
scheduledExecutor
43+
}
44+
45+
override def scheduleTimeout(
46+
timeoutAction: Runnable,
47+
timeoutSeconds: Long): ScheduledFuture[_] = {
48+
val wrappedAction: Runnable = () => {
49+
try {
50+
timeoutAction.run()
51+
} catch {
52+
case e: Exception =>
53+
error("Error during timeout action execution", e)
54+
}
55+
}
56+
57+
executor.schedule(wrappedAction, timeoutSeconds, TimeUnit.SECONDS)
58+
}
59+
60+
override def cancelTimeout(future: ScheduledFuture[_]): Unit = {
61+
if (future != null && !future.isCancelled) {
62+
future.cancel(false)
63+
}
64+
}
65+
66+
override def shutdown(): Unit = {
67+
ThreadUtils.shutdown(executor)
68+
}
69+
70+
override def isShutdown: Boolean = {
71+
executor.isShutdown
72+
}
73+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.operation.timeout
19+
20+
import java.util.concurrent.ScheduledFuture
21+
22+
/**
23+
* Interface for managing operation timeouts.
24+
* This allows different implementations for timeout management strategies.
25+
*/
26+
trait TimeoutExecutor {
27+
28+
/**
29+
* Schedule a timeout for an operation.
30+
*
31+
* @param timeoutAction the action to execute when timeout occurs
32+
* @param timeoutSeconds the timeout duration in seconds
33+
* @return a ScheduledFuture that can be used to cancel the timeout
34+
*/
35+
def scheduleTimeout(timeoutAction: Runnable, timeoutSeconds: Long): ScheduledFuture[_]
36+
37+
/**
38+
* Cancel a scheduled timeout.
39+
*
40+
* @param future the ScheduledFuture returned by scheduleTimeout
41+
*/
42+
def cancelTimeout(future: ScheduledFuture[_]): Unit
43+
44+
/**
45+
* Shutdown the executor and release resources.
46+
*/
47+
def shutdown(): Unit
48+
49+
/**
50+
* Check if the executor is shutdown.
51+
*
52+
* @return true if shutdown, false otherwise
53+
*/
54+
def isShutdown: Boolean
55+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.operation.timeout
19+
20+
import org.apache.kyuubi.Logging
21+
import org.apache.kyuubi.config.KyuubiConf
22+
23+
/**
24+
* Factory for creating timeout executors based on configuration.
25+
*/
26+
object TimeoutExecutorFactory extends Logging {
27+
28+
private var threadPoolExecutor: ThreadPoolTimeoutExecutor = _
29+
30+
/**
31+
* Create a timeout executor based on configuration.
32+
*
33+
* @param conf the Kyuubi configuration
34+
* @return a TimeoutExecutor instance
35+
*/
36+
def getExecutor(conf: KyuubiConf): TimeoutExecutor = {
37+
val executorTypeStr = conf.get(KyuubiConf.OPERATION_TIMEOUT_EXECUTOR_TYPE)
38+
executorTypeStr.toLowerCase match {
39+
case "thread-pool" =>
40+
if (threadPoolExecutor == null) {
41+
synchronized {
42+
if (threadPoolExecutor == null) {
43+
val poolSize = conf.get(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE)
44+
val keepAliveMs = conf.get(KyuubiConf.OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME)
45+
info(s"Creating thread pool timeout executor: poolSize=$poolSize, " +
46+
s"keepAliveMs=$keepAliveMs")
47+
threadPoolExecutor = new ThreadPoolTimeoutExecutor(poolSize, keepAliveMs)
48+
}
49+
}
50+
}
51+
threadPoolExecutor
52+
case "per-operation" =>
53+
new PerOperationTimeoutExecutor
54+
case _ =>
55+
warn(s"Unknown executor type: $executorTypeStr, falling back to per-operation")
56+
new PerOperationTimeoutExecutor
57+
}
58+
}
59+
60+
def shutdown(): Unit = {
61+
if (threadPoolExecutor != null && !threadPoolExecutor.isShutdown) {
62+
threadPoolExecutor.shutdown()
63+
}
64+
}
65+
}

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
147147
}
148148
}
149149
}
150+
// Clean up timeout monitor when operation is cancelled
151+
shutdownTimeoutMonitor()
150152
}
151153

152154
override def close(): Unit = withLockRequired {
@@ -162,6 +164,8 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
162164
}
163165
}
164166
}
167+
// Clean up timeout monitor to prevent memory leaks
168+
shutdownTimeoutMonitor()
165169
try {
166170
// For launch engine operation, we use OperationLog to pass engine submit log but
167171
// at that time we do not have remoteOpHandle

0 commit comments

Comments
 (0)