Skip to content

Commit 5c443c0

Browse files
authored
Standardize executor limit enforcement for dynamic and static allocation (#426)
* Standardize executor limit enforcement for dynamic and static allocation * Fix linting
1 parent 345818f commit 5c443c0

File tree

1 file changed

+16
-8
lines changed

1 file changed

+16
-8
lines changed

core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,34 +260,42 @@ class RayAppMaster(host: String,
260260
.coresPerExecutor
261261
.getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR)
262262
val rayActorCPU = this.appInfo.desc.rayActorCPU
263-
264263
val memory = appInfo.desc.memoryPerExecutorMB
265264
val executorId = s"${appInfo.getNextExecutorId()}"
266265

267-
logInfo(s"Requesting Spark executor with Ray logical resource " +
268-
s"{ CPU: ${rayActorCPU}, " +
269-
s"${appInfo.desc.resourceReqsPerExecutor
270-
.map{ case (name, amount) => s"${name}: ${amount}"}.mkString(", ")} }..")
266+
logInfo(
267+
s"Requesting Spark executor with Ray logical resource " +
268+
s"{ CPU: $rayActorCPU, " +
269+
s"${appInfo.desc.resourceReqsPerExecutor
270+
.map { case (name, amount) => s"$name: $amount" }.mkString(", ")} }..")
271271
// TODO: Support generic fractional logical resources using prefix spark.ray.actor.resource.*
272272

273273
// This will check with dynamic auto scale no additional pending executor actor added more
274274
// than max executors count as this result in executor even running after job completion
275275
val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
276+
// FIX: Check total executors (current + restarted) against maxExecutor, executorInstances
276277
if (dynamicAllocationEnabled) {
277278
val maxExecutor = conf.getInt("spark.dynamicAllocation.maxExecutors", 0)
278-
if (restartedExecutors.size >= maxExecutor) {
279+
if ((appInfo.executors.size + restartedExecutors.size) >= maxExecutor) {
280+
return
281+
}
282+
} else {
283+
val executorInstances = conf.getInt("spark.executor.instances", 0)
284+
if (executorInstances != 0 &&
285+
(appInfo.executors.size + restartedExecutors.size) >= executorInstances) {
279286
return
280287
}
281288
}
282289

283290
val handler = RayExecutorUtils.createExecutorActor(
284-
executorId, getAppMasterEndpointUrl(),
291+
executorId,
292+
getAppMasterEndpointUrl(),
285293
rayActorCPU,
286294
memory,
287295
// This won't work, Spark expect integer in custom resources,
288296
// please see python test test_spark_on_fractional_custom_resource
289297
appInfo.desc.resourceReqsPerExecutor
290-
.map{ case (name, amount) => (name, Double.box(amount))}.asJava,
298+
.map { case (name, amount) => (name, Double.box(amount)) }.asJava,
291299
placementGroup,
292300
getNextBundleIndex,
293301
seqAsJavaList(appInfo.desc.command.javaOpts))

0 commit comments

Comments
 (0)