Skip to content

Commit ff2e93b

Browse files
s0nskarSteNicholas
authored andcommitted
[CELEBORN-1577][BUG] Quota cancel shuffle should use app shuffle id
### What changes were proposed in this pull request? - Added a new mapping for celebornShuffleId -> appShuffleId - cancelAllActiveStages should passing appShuffleId not celebornShuffleId ### Why are the changes needed? `shuffleAllocatedWorkers` worker contains celebornShuffleId, we need to use `appShuffleId` because DAGScheduler only understand app shuffle id. ### Does this PR resolve a correctness bug? No ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? NA Closes #3662 from s0nskar/fix_quota_shuffle_id. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: SteNicholas <programgeek@163.com> (cherry picked from commit 149f3b9) Signed-off-by: SteNicholas <programgeek@163.com>
1 parent 31492b1 commit ff2e93b

1 file changed

Lines changed: 7 additions & 3 deletions

File tree

client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
109109
private val shuffleIdMapping = JavaUtils.newConcurrentHashMap[
110110
Int,
111111
scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]()
112+
private val celebornShuffleIdToAppShuffleIdMap = JavaUtils.newConcurrentHashMap[Int, Int]()
112113
private val shuffleIdGenerator = new AtomicInteger(0)
113114
// app shuffle id -> whether shuffle is determinate, rerun of a indeterminate shuffle gets different result
114115
private val appShuffleDeterminateMap = JavaUtils.newConcurrentHashMap[Int, Boolean]();
@@ -977,6 +978,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
977978
: scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)] = {
978979
val newShuffleId = shuffleIdGenerator.getAndIncrement()
979980
logInfo(s"generate new shuffleId $newShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
981+
celebornShuffleIdToAppShuffleIdMap.put(newShuffleId, appShuffleId)
980982
scala.collection.mutable.LinkedHashMap(appShuffleIdentifier -> (newShuffleId, true))
981983
}
982984
})
@@ -1033,6 +1035,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
10331035
}
10341036
val newShuffleId = shuffleIdGenerator.getAndIncrement()
10351037
logInfo(s"generate new shuffleId $newShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
1038+
celebornShuffleIdToAppShuffleIdMap.put(newShuffleId, appShuffleId)
10361039
validateCelebornShuffleIdForClean.foreach(callback =>
10371040
callback.accept(appShuffleIdentifier))
10381041
shuffleIds.put(appShuffleIdentifier, (newShuffleId, true))
@@ -1238,7 +1241,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
12381241
}
12391242
}
12401243
}
1241-
1244+
celebornShuffleIdToAppShuffleIdMap.remove(shuffleId)
12421245
// add shuffleKey to delay shuffle removal set
12431246
unregisterShuffleTime.put(shuffleId, System.currentTimeMillis())
12441247

@@ -2026,8 +2029,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
20262029
.asScala
20272030
.keys
20282031
.filter(!commitManager.isStageEnd(_))
2029-
.foreach(c.accept(_, reason))
2030-
2032+
.flatMap(shuffleId => Option(celebornShuffleIdToAppShuffleIdMap.get(shuffleId)))
2033+
.toSet
2034+
.foreach((shuffleId: Int) => c.accept(shuffleId, reason))
20312035
case _ =>
20322036
}
20332037

0 commit comments

Comments
 (0)