Skip to content

Commit 7140244

Browse files
committed
Try Catch logic for openBatchSession
1 parent 73a1af1 commit 7140244

3 files changed

Lines changed: 155 additions & 45 deletions

File tree

kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala

Lines changed: 85 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -60,57 +60,97 @@ class KyuubiBatchService(
6060
}
6161

6262
override def start(): Unit = {
63+
val UNINITIALIZED_BATCH_ID = "UNINITIALIZED_BATCH_ID"
6364
assert(running.compareAndSet(false, true))
6465
val submitTask: Runnable = () => {
6566
restFrontend.waitForServerStarted()
6667
while (running.get) {
67-
metadataManager.pickBatchForSubmitting(kyuubiInstance) match {
68-
case None => Thread.sleep(1000)
69-
case Some(metadata) =>
70-
val batchId = metadata.identifier
71-
info(s"$batchId is picked for submission.")
72-
val batchSession = sessionManager.createBatchSession(
73-
metadata.username,
74-
"anonymous",
75-
metadata.ipAddress,
76-
metadata.requestConf,
77-
metadata.engineType,
78-
Option(metadata.requestName),
79-
metadata.resource,
80-
metadata.className,
81-
metadata.requestArgs,
82-
Some(metadata),
83-
fromRecovery = false)
84-
sessionManager.openBatchSession(batchSession)
85-
var submitted = false
86-
while (!submitted) { // block until batch job submitted
87-
submitted = metadataManager.getBatchSessionMetadata(batchId) match {
88-
case Some(metadata) if OperationState.isTerminal(metadata.opState) =>
89-
true
90-
case Some(metadata) if metadata.opState == OperationState.RUNNING =>
91-
metadata.appState match {
92-
// app that is not submitted to resource manager
93-
case None | Some(ApplicationState.NOT_FOUND) => false
94-
// app that is pending in resource manager while the local startup
95-
// process is alive. For example, in Spark YARN cluster mode, if set
96-
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
97-
// process exits immediately once Application goes ACCEPTED status,
98-
// even no resource could be allocated for the AM container.
99-
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
100-
false
101-
// not sure, added for safe
102-
case Some(ApplicationState.UNKNOWN) => false
103-
case _ => true
104-
}
105-
case Some(_) =>
106-
false
107-
case None =>
108-
error(s"$batchId does not existed in metastore, assume it is finished")
109-
true
68+
var batchId = UNINITIALIZED_BATCH_ID
69+
try {
70+
metadataManager.pickBatchForSubmitting(kyuubiInstance) match {
71+
case None => Thread.sleep(1000)
72+
case Some(metadata) =>
73+
batchId = metadata.identifier
74+
info(s"$batchId is picked for submission.")
75+
val batchSession = sessionManager.createBatchSession(
76+
metadata.username,
77+
"anonymous",
78+
metadata.ipAddress,
79+
metadata.requestConf,
80+
metadata.engineType,
81+
Option(metadata.requestName),
82+
metadata.resource,
83+
metadata.className,
84+
metadata.requestArgs,
85+
Some(metadata),
86+
fromRecovery = false)
87+
sessionManager.openBatchSession(batchSession)
88+
var submitted = false
89+
while (!submitted) { // block until batch job submitted
90+
submitted = metadataManager.getBatchSessionMetadata(batchId) match {
91+
case Some(metadata) if OperationState.isTerminal(metadata.opState) =>
92+
true
93+
case Some(metadata) if metadata.opState == OperationState.RUNNING =>
94+
metadata.appState match {
95+
// app that is not submitted to resource manager
96+
case None | Some(ApplicationState.NOT_FOUND) => false
97+
// app that is pending in resource manager while the local startup
98+
// process is alive. For example, in Spark YARN cluster mode, if set
99+
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
100+
// process exits immediately once Application goes ACCEPTED status,
101+
// even no resource could be allocated for the AM container.
102+
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
103+
false
104+
// not sure, added for safe
105+
case Some(ApplicationState.UNKNOWN) => false
106+
case _ => true
107+
}
108+
case Some(_) =>
109+
false
110+
case None =>
111+
error(s"$batchId does not exist in metastore, assume it is finished")
112+
true
113+
}
114+
if (!submitted) Thread.sleep(1000)
115+
}
116+
info(s"$batchId is submitted or finished.")
117+
}
118+
} catch {
119+
case e: InterruptedException =>
120+
if (batchId == UNINITIALIZED_BATCH_ID) {
121+
error(s"Interrupted while picking batch for submission", e)
122+
} else {
123+
error(s"Interrupted while opening batch session for $batchId", e)
124+
try {
125+
metadataManager.failScheduledBatch(batchId)
126+
} catch {
127+
case ex: Exception =>
128+
error(
129+
s"Unable to modify metadata for $batchId to ERROR; " +
130+
"an administrator may need to reset the batch state manually.",
131+
ex)
132+
}
133+
}
134+
throw e
135+
// If the batch session failed to open, reinitialize the batch state to ERROR
136+
// This can be due to a DB error or batch_connection_limits exceeded
137+
case e: Exception =>
138+
if (batchId == UNINITIALIZED_BATCH_ID) {
139+
error(s"Error picking batch for submission", e)
140+
} else {
141+
error(s"Error opening batch session for $batchId", e)
142+
try {
143+
metadataManager.failScheduledBatch(batchId)
144+
} catch {
145+
case ex: Exception =>
146+
error(
147+
s"Unable to modify metadata for $batchId to ERROR; " +
148+
"an administrator may need to reset the batch state manually.",
149+
ex)
110150
}
111-
if (!submitted) Thread.sleep(1000)
112151
}
113-
info(s"$batchId is submitted or finished.")
152+
// sleep 1 second to avoid excessive retries during transient network/DB failures
153+
Thread.sleep(1000)
114154
}
115155
}
116156
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ class MetadataManager extends AbstractService("MetadataManager") {
181181
_metadataStore.transformMetadataState(batchId, "INITIALIZED", "CANCELED")
182182
}
183183

184+
def failScheduledBatch(batchId: String): Boolean = {
185+
_metadataStore.transformMetadataState(batchId, "PENDING", "ERROR")
186+
}
187+
184188
def getBatchesRecoveryMetadata(
185189
state: String,
186190
kyuubiInstance: String,

kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
4444
import org.apache.kyuubi.operation.OperationState.OperationState
4545
import org.apache.kyuubi.server.KyuubiRestFrontendService
4646
import org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader, AUTHORIZATION_HEADER}
47+
import org.apache.kyuubi.server.metadata.MetadataManager
4748
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
4849
import org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl, AuthUtils}
4950
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType}
@@ -72,6 +73,71 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase {
7273
Utils.tryLogNonFatalError { sessionManager.closeSession(session.handle) }
7374
}
7475
}
76+
77+
test("KyuubiBatchService catch block when openBatchSession fails during metadata update") {
78+
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
79+
val realMetadataManager = sessionManager.metadataManager.get
80+
81+
val wrapperMetadataManager = new MetadataManager {
82+
override def updateMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
83+
throw new RuntimeException("test metadata update failure")
84+
}
85+
override def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = {
86+
realMetadataManager.insertMetadata(metadata, asyncRetryOnError)
87+
}
88+
override def getBatch(batchId: String) = realMetadataManager.getBatch(batchId)
89+
}
90+
91+
val originalMetadataManager = sessionManager.metadataManager
92+
try {
93+
sessionManager.metadataManager = Some(wrapperMetadataManager)
94+
95+
val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
96+
val response = webTarget.path("api/v1/batches")
97+
.request(MediaType.APPLICATION_JSON_TYPE)
98+
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
99+
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
100+
assert(response.getStatus == 200)
101+
val batch = response.readEntity(classOf[Batch])
102+
val batchId = batch.getId
103+
assert(batch.getState === "INITIALIZED")
104+
105+
eventually(timeout(15.seconds), interval(1.second)) {
106+
val batchInfoResponse = webTarget.path(s"api/v1/batches/$batchId")
107+
.request(MediaType.APPLICATION_JSON_TYPE)
108+
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
109+
.get()
110+
assert(batchInfoResponse.getStatus == 200)
111+
val batchInfo = batchInfoResponse.readEntity(classOf[Batch])
112+
assert(
113+
batchInfo.getState === "ERROR",
114+
"Batch should eventually become ERROR after being picked and failed by the " +
115+
"catch path, rather than remaining stuck in PENDING")
116+
}
117+
118+
sessionManager.metadataManager = originalMetadataManager
119+
120+
val requestObj2 = newSparkBatchRequest(Map("spark.master" -> "local"))
121+
val response2 = webTarget.path("api/v1/batches")
122+
.request(MediaType.APPLICATION_JSON_TYPE)
123+
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
124+
.post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE))
125+
assert(response2.getStatus == 200)
126+
val batch2Id = response2.readEntity(classOf[Batch]).getId
127+
eventually(timeout(30.seconds), interval(1.second)) {
128+
val batch2InfoResponse = webTarget.path(s"api/v1/batches/$batch2Id")
129+
.request(MediaType.APPLICATION_JSON_TYPE)
130+
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
131+
.get()
132+
val batch2Info = batch2InfoResponse.readEntity(classOf[Batch])
133+
assert(
134+
batch2Info.getState === "PENDING" || batch2Info.getState === "RUNNING",
135+
"Second batch should be processed (batch service still running after catch)")
136+
}
137+
} finally {
138+
sessionManager.metadataManager = originalMetadataManager
139+
}
140+
}
75141
}
76142

77143
abstract class BatchesResourceSuiteBase extends KyuubiFunSuite

0 commit comments

Comments
 (0)