Skip to content

Commit 1e72e4b

Browse files
refactor(operation): Optimize operation timeout handling mechanism
- Removed ScheduledExecutorService from AbstractOperation and replaced it with Future[_] - Added a dedicated operationTimeoutMonitor thread in SessionManager - Updated timeout monitoring logic to use the new thread for unified management - Optimized shutdownTimeoutMonitor method by using Future.cancel instead of ScheduledExecutorService.shutdown
1 parent 4cbff4d commit 1e72e4b

2 files changed

Lines changed: 18 additions & 10 deletions

File tree

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.kyuubi.operation
1919

2020
import java.io.IOException
21-
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
21+
import java.util.concurrent.{Future, TimeUnit}
2222
import java.util.concurrent.locks.ReentrantLock
2323

2424
import scala.collection.JavaConverters._
@@ -31,8 +31,7 @@ import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
3131
import org.apache.kyuubi.operation.OperationState._
3232
import org.apache.kyuubi.operation.log.OperationLog
3333
import org.apache.kyuubi.session.Session
34-
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, TStatusCode}
35-
import org.apache.kyuubi.util.ThreadUtils
34+
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
3635

3736
abstract class AbstractOperation(session: Session) extends Operation with Logging {
3837

@@ -45,7 +44,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
4544

4645
final private[kyuubi] val statementId = handle.identifier.toString
4746

48-
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
47+
private var statementTimeoutCleaner: Option[Future[_]] = None
4948

5049
private val lock: ReentrantLock = new ReentrantLock()
5150

@@ -60,8 +59,6 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
6059

6160
protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
6261
if (queryTimeout > 0) {
63-
val timeoutExecutor =
64-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
6562
val action: Runnable = () =>
6663
// Clients less than version 2.1 have no HIVE-4924 Patch,
6764
// no queryTimeout parameter and no TIMEOUT status.
@@ -74,13 +71,13 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
7471
} else {
7572
cleanup(OperationState.TIMEOUT)
7673
}
77-
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
78-
statementTimeoutCleaner = Some(timeoutExecutor)
74+
statementTimeoutCleaner = Some(
75+
session.sessionManager.submitTimeoutMonitor(action, queryTimeout, TimeUnit.SECONDS))
7976
}
8077
}
8178

8279
protected def shutdownTimeoutMonitor(): Unit = {
83-
statementTimeoutCleaner.foreach(_.shutdown())
80+
statementTimeoutCleaner.foreach(_.cancel(true))
8481
}
8582

8683
override def getOperationLog: Option[OperationLog] = None

kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.kyuubi.session
1919

2020
import java.io.IOException
2121
import java.nio.file.{Files, Paths}
22-
import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit}
22+
import java.util.concurrent.{ConcurrentHashMap, Future, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
2323

2424
import scala.collection.JavaConverters._
2525
import scala.concurrent.duration.Duration
@@ -74,12 +74,17 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
7474
private val timeoutChecker =
7575
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker")
7676

77+
private var operationTimeoutMonitor: ScheduledExecutorService = _
78+
7779
protected def isServer: Boolean
7880

7981
private var execPool: ThreadPoolExecutor = _
8082

8183
def submitBackgroundOperation(r: Runnable): Future[_] = execPool.submit(r)
8284

85+
def submitTimeoutMonitor(r: Runnable, delay: Long, unit: TimeUnit): Future[_] =
86+
operationTimeoutMonitor.schedule(r, delay, unit)
87+
8388
def operationManager: OperationManager
8489

8590
protected def createSession(
@@ -281,6 +286,11 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
281286
waitQueueSize,
282287
keepAliveMs,
283288
s"$name-exec-pool")
289+
290+
// Initialize timeout monitor pool
291+
operationTimeoutMonitor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
292+
s"$name-operation-timeout-monitor-pool")
293+
284294
super.initialize(conf)
285295
}
286296

@@ -301,6 +311,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
301311

302312
ThreadUtils.shutdown(timeoutChecker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
303313
ThreadUtils.shutdown(execPool, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
314+
ThreadUtils.shutdown(operationTimeoutMonitor, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
304315
}
305316

306317
private def startTimeoutChecker(): Unit = {

0 commit comments

Comments
 (0)