We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 3094c92 commit ac8d572Copy full SHA for ac8d572
1 file changed
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -151,7 +151,9 @@ class ReducePartitionCommitHandler(
151
override def markShuffleDataLost(shuffleId: Int): Unit = {
152
logWarning(s"Marking shuffle $shuffleId data as lost due to unknown/crashed worker.")
153
dataLostShuffleSet.add(shuffleId)
154
- setStageEnd(shuffleId) // unblocks all pending GetReducerFileGroup waiters immediately
+ if (!isStageEnd(shuffleId)) {
155
+ setStageEnd(shuffleId)
156
+ }
157
}
158
159
override def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = {
0 commit comments