Skip to content

Commit 3094c92

Browse files
committed
Fixes
1 parent 83dee51 commit 3094c92

6 files changed

Lines changed: 25 additions & 16 deletions

File tree

client/src/main/scala/org/apache/celeborn/client/CommitManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,8 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage
372372
}
373373

374374
private class UnknownWorkerListener extends WorkerStatusListener {
375-
private val shuffleDataLostOnUnknownWorkerEnabled = conf.clientShuffleDataLostOnUnknownWorkerEnabled
375+
private val shuffleDataLostOnUnknownWorkerEnabled =
376+
conf.clientShuffleDataLostOnUnknownWorkerEnabled
376377
private val pushReplicateEnabled = conf.clientPushReplicateEnabled
377378

378379
override def notifyChangedWorkersStatus(workersStatus: WorkersStatus): Unit = {

client/src/test/scala/org/apache/celeborn/client/CommitManagerSuite.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,11 @@ import org.apache.celeborn.CelebornFunSuite
3333
import org.apache.celeborn.client.LifecycleManager.ShuffleAllocatedWorkers
3434
import org.apache.celeborn.client.listener.WorkerStatusListener
3535
import org.apache.celeborn.common.CelebornConf
36-
import org.apache.celeborn.common.CelebornConf.{
37-
CLIENT_BATCH_HANDLE_COMMIT_PARTITION_ENABLED,
38-
CLIENT_PUSH_REPLICATE_ENABLED,
39-
CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED
40-
}
36+
import org.apache.celeborn.common.CelebornConf.{CLIENT_BATCH_HANDLE_COMMIT_PARTITION_ENABLED, CLIENT_PUSH_REPLICATE_ENABLED, CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED}
4137
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo}
4238
import org.apache.celeborn.common.network.protocol.SerdeVersion
4339
import org.apache.celeborn.common.protocol.PartitionType
44-
import org.apache.celeborn.common.protocol.message.ControlMessages.{
45-
GetReducerFileGroupResponse,
46-
HeartbeatFromApplicationResponse
47-
}
40+
import org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse, HeartbeatFromApplicationResponse}
4841
import org.apache.celeborn.common.protocol.message.StatusCode
4942
import org.apache.celeborn.common.rpc.RpcAddress
5043
import org.apache.celeborn.common.rpc.netty.LocalNettyRpcCallContext

client/src/test/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandlerSuite.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import scala.concurrent.{Await, Promise}
2323
import scala.concurrent.duration._
2424

2525
import org.apache.celeborn.CelebornFunSuite
26-
import org.apache.celeborn.client.WorkerStatusTracker
2726
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
2827
import org.apache.celeborn.client.LifecycleManager.ShuffleAllocatedWorkers
28+
import org.apache.celeborn.client.WorkerStatusTracker
2929
import org.apache.celeborn.common.CelebornConf
3030
import org.apache.celeborn.common.network.protocol.SerdeVersion
3131
import org.apache.celeborn.common.protocol.message.ControlMessages.GetReducerFileGroupResponse
@@ -76,7 +76,11 @@ class ReducePartitionCommitHandlerSuite extends CelebornFunSuite {
7676
test("markShuffleDataLost replies SHUFFLE_DATA_LOST to GetReducerFileGroup contexts") {
7777
val handler = newHandler()
7878
val shuffleId = 1
79-
handler.registerShuffle(shuffleId, numMappers = 2, isSegmentGranularityVisible = false, numPartitions = 4)
79+
handler.registerShuffle(
80+
shuffleId,
81+
numMappers = 2,
82+
isSegmentGranularityVisible = false,
83+
numPartitions = 4)
8084

8185
val (ctx1, p1) = pendingContext()
8286
handler.handleGetReducerFileGroup(ctx1, shuffleId, SerdeVersion.V1)
@@ -97,10 +101,15 @@ class ReducePartitionCommitHandlerSuite extends CelebornFunSuite {
97101
}
98102
}
99103

100-
test("markShuffleDataLost marks data lost even when stage already ended (worker crash after commit)") {
104+
test(
105+
"markShuffleDataLost marks data lost even when stage already ended (worker crash after commit)") {
101106
val handler = newHandler()
102107
val shuffleId = 1
103-
handler.registerShuffle(shuffleId, numMappers = 1, isSegmentGranularityVisible = false, numPartitions = 2)
108+
handler.registerShuffle(
109+
shuffleId,
110+
numMappers = 1,
111+
isSegmentGranularityVisible = false,
112+
numPartitions = 2)
104113

105114
// Clean stage-end
106115
handler.setStageEnd(shuffleId)

common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6998,7 +6998,11 @@ object CelebornConf extends Logging {
69986998
buildConf("celeborn.client.shuffleDataLostOnUnknownWorker.enabled")
69996999
.categories("client")
70007000
.version("0.6.3")
7001-
.doc("Whether to mark shuffle data lost when unknown worker is detected.")
7001+
.doc("When enabled, any shuffle that had partitions on the (crashed) " +
7002+
"unknown worker is immediately marked as data lost. " +
7003+
"On the write flow revive/commit request for that shuffle will fast fail. " +
7004+
"GetReducerFileGroup requests are replied with SHUFFLE_DATA_LOST. " +
7005+
"This has no effect when ${CLIENT_PUSH_REPLICATE_ENABLED.key}=true")
70027006
.booleanConf
70037007
.createWithDefault(true)
70047008

docs/configuration/client.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ license: |
122122
| celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark application have skewed partition, this value can set to true to improve performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled |
123123
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
124124
| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether to revise lost shuffles. | 0.6.0 | |
125-
| celeborn.client.shuffleDataLostOnUnknownWorker.enabled | false | false | Whether to mark shuffle data lost when unknown worker is detected. | 0.6.3 | |
125+
| celeborn.client.shuffleDataLostOnUnknownWorker.enabled | true | false | When enabled, any shuffle that had partitions on the (crashed) unknown worker is immediately marked as data lost. On the write flow revive/commit request for that shuffle will fast fail. GetReducerFileGroup requests are replied with SHUFFLE_DATA_LOST. This has no effect when ${CLIENT_PUSH_REPLICATE_ENABLED.key}=true | 0.6.3 | |
126126
| celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 0.3.1 | |
127127
| celeborn.client.spark.batch.openStream.parallelClientCreation.enabled | true | false | Whether to create data clients in parallel before sending Spark batch open-stream requests. When false, data clients are created serially. | 0.6.3 | |
128128
| celeborn.client.spark.fetch.cleanFailedShuffle | false | false | whether to clean those disk space occupied by shuffles which cannot be fetched | 0.6.0 | |

docs/migration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ license: |
3737

3838
- Since 0.7.0, Celeborn changed the default value of `celeborn.port.maxRetries` from `1` to `16`.
3939

40+
- Since 0.7.0, Celeborn change the default value of `celeborn.client.shuffleDataLostOnUnknownWorker.enabled` from `false` to `true`, which means Celeborn will treat shuffle data lost when unknown worker is detected at default.
41+
4042
# Upgrading from 0.5 to 0.6
4143

4244
- Since 0.6.0, Celeborn deprecate `celeborn.client.spark.fetch.throwsFetchFailure`. Please use `celeborn.client.spark.stageRerun.enabled` instead.

0 commit comments

Comments
 (0)