Skip to content
Closed
Show file tree
Hide file tree
Changes from 115 commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
8668754
compilable
CodingCat Feb 19, 2025
1bf53bd
fetch failure suite
CodingCat Feb 19, 2025
4f6acca
add disk clean suite
CodingCat Feb 19, 2025
935f11b
fix compilation error
CodingCat Feb 19, 2025
36af3f3
continue fixing compilation error
CodingCat Feb 19, 2025
3599e47
fix compilation error
CodingCat Feb 19, 2025
23a76fa
param doc
CodingCat Feb 19, 2025
8932e4b
change param ver
CodingCat Feb 19, 2025
4db2d88
add debug info
CodingCat Feb 19, 2025
422784e
lint
CodingCat Feb 19, 2025
29d2094
try less number of workers
CodingCat Feb 19, 2025
136df5e
ignore fetchfailure test for now to see whether it is concurrency issue
CodingCat Feb 21, 2025
7c4ff32
lint
CodingCat Feb 21, 2025
8714dce
try 1 worker
CodingCat Feb 21, 2025
ee6dcaa
resume celeborn fetch failure suite
CodingCat Feb 21, 2025
446bd71
make it only available for spark 3
CodingCat Feb 22, 2025
f30b5a0
Revert "make it only available for spark 3"
CodingCat Feb 22, 2025
eac8e4b
compatible with 2.11
CodingCat Feb 22, 2025
35f6db0
fix rebase errors
CodingCat Mar 7, 2025
59ea64f
more time to finish test
CodingCat Mar 8, 2025
1e471f8
Revert "more time to finish test"
CodingCat Mar 8, 2025
fbf36b4
add more msg got storage
CodingCat Mar 8, 2025
2ed92b2
remove first few tests and test what happened
CodingCat Mar 8, 2025
4d9139d
test
CodingCat Mar 8, 2025
5b557b0
more test
CodingCat Mar 8, 2025
b6f7285
add back one more test
CodingCat Mar 8, 2025
23b230b
one more test
CodingCat Mar 8, 2025
72a170c
more debugging info
CodingCat Mar 8, 2025
fa80ed3
add back one more test
CodingCat Mar 8, 2025
e40c4c1
handle empty message
CodingCat Mar 8, 2025
413dbc6
rm useless println
CodingCat Mar 8, 2025
7339267
allow more time in the suspicious test
CodingCat Mar 9, 2025
3702259
more
CodingCat Mar 9, 2025
257e649
try to separate test and see whether it works
CodingCat Mar 9, 2025
574bc51
check more frequently
CodingCat Mar 9, 2025
7b9bcb4
override shutdown minicluster in expensive suite
CodingCat Mar 9, 2025
0a1dc80
try persist
CodingCat Mar 9, 2025
9fb22d9
move back test and see
CodingCat Mar 9, 2025
14c53a5
Revert "move back test and see"
CodingCat Mar 9, 2025
633fc2a
addr comments1
CodingCat Apr 4, 2025
c539533
addr comments 2
CodingCat Apr 4, 2025
142966f
addr comments 3
CodingCat Apr 5, 2025
ed8ebff
fix compilation
CodingCat Apr 5, 2025
80c397f
use runnable to be compatible with spark 2
CodingCat Apr 5, 2025
2476f3a
update param doc
CodingCat Apr 5, 2025
9d705c3
fix NPE
CodingCat Apr 5, 2025
0f45cb8
fix tests
CodingCat Apr 6, 2025
d48e6a8
add debugging info2
CodingCat Apr 6, 2025
d5344da
remove flaky test
CodingCat Apr 6, 2025
b76fbfb
addr comments
CodingCat Apr 14, 2025
61bae50
fix compile
CodingCat Apr 14, 2025
8a5692b
fix spark 2 compile
CodingCat Apr 15, 2025
bbe9638
fix build
CodingCat Apr 15, 2025
e35f996
refactor encode/decode app identifier and remove runningstagemanagers
CodingCat Apr 21, 2025
c121ece
stylistic fixes
CodingCat Apr 21, 2025
33b4145
addr comments
CodingCat Apr 25, 2025
c6e2f81
license
CodingCat Apr 25, 2025
5adfe0b
addr comments
CodingCat Apr 28, 2025
a948f92
update param doc
CodingCat Apr 28, 2025
fc5142a
addr comments
CodingCat Apr 29, 2025
e658641
comments
turboFei Apr 29, 2025
f407158
ensure type safe
CodingCat Apr 30, 2025
dbb1423
make it compilable with spark 2
CodingCat Apr 30, 2025
d8ed331
add unit test to guard runningstagemanagerimpl
CodingCat Apr 30, 2025
c896f47
add unit test
CodingCat Apr 30, 2025
6ab91a7
add header
CodingCat Apr 30, 2025
e0c2ee7
update test
CodingCat Apr 30, 2025
b724959
RunningStageManager UT
turboFei Apr 30, 2025
ec21555
avoid using property
CodingCat May 1, 2025
5d8ade0
merge
CodingCat May 1, 2025
ef55def
param fix
CodingCat May 1, 2025
b0d9bb2
handle indeterminstic case
CodingCat May 1, 2025
2395e26
resume tests
CodingCat May 1, 2025
b896120
lint
CodingCat May 1, 2025
0b39821
fix typos
CodingCat May 1, 2025
fb5a84e
fix spark 2
CodingCat May 1, 2025
4e1aa67
change debugging string
CodingCat May 1, 2025
7151992
simplify code
CodingCat May 6, 2025
018c75e
addr comments
CodingCat May 9, 2025
43e50c6
4 mins
CodingCat May 10, 2025
3025a39
avoid driver oom
CodingCat May 10, 2025
c8ed30a
16g?
CodingCat May 10, 2025
68ed11e
change
CodingCat May 10, 2025
6f1a7fe
smaller test data?
CodingCat May 10, 2025
5e2b507
recover test data size to ensure enough partitions
CodingCat May 10, 2025
61ef8fc
code cleanup
CodingCat May 10, 2025
6a10ef8
use more cores
CodingCat May 11, 2025
76e92e4
add back original test
CodingCat May 14, 2025
f63c81e
stylistic fixes
CodingCat May 14, 2025
6a60db6
less data
CodingCat May 14, 2025
ba7b882
further reduce memory overhead
CodingCat May 14, 2025
e6f87fd
addr comments
CodingCat May 14, 2025
00914c3
doc update
CodingCat May 14, 2025
c5a6495
10g
CodingCat May 14, 2025
802431f
further reduce test data
CodingCat May 14, 2025
7b09c43
enlength timeout
CodingCat May 14, 2025
d8f0a27
recover to 240
CodingCat May 14, 2025
7916059
rm one expensive test
CodingCat May 14, 2025
c5dbaf5
check faster
CodingCat May 14, 2025
8b99b7a
check per sec
CodingCat May 15, 2025
e18a6d9
addr comments
CodingCat May 16, 2025
6ae7fe0
test param
CodingCat May 16, 2025
f595e57
data
CodingCat May 16, 2025
10f2c18
4g
CodingCat May 16, 2025
c03adcd
addr comments2
CodingCat May 16, 2025
518a680
10000
CodingCat May 16, 2025
5fcb5da
10 mins
CodingCat May 16, 2025
c6af16d
20 mins
CodingCat May 16, 2025
23539b4
delete most of tests
CodingCat May 17, 2025
b0302ef
Merge branch 'main' into delete_fi
turboFei May 20, 2025
3e1bd1a
Update tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/…
turboFei May 20, 2025
2fa6907
Update tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/…
turboFei May 20, 2025
a2b23f9
Update tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/…
turboFei May 20, 2025
757fb1a
NIT
turboFei May 20, 2025
9f9c2aa
style
turboFei May 20, 2025
bc133c9
Revert tests module refactor
turboFei May 20, 2025
3679598
Revert "Revert tests module refactor"
CodingCat May 20, 2025
a13b3f0
fix ut
CodingCat May 20, 2025
9264c88
Revert "fix ut"
CodingCat May 20, 2025
37365bc
Revert "Revert "Revert tests module refactor""
CodingCat May 20, 2025
4f513d6
further clean up
CodingCat May 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public static void validateAttemptConfig(SparkConf conf) throws IllegalArgumentE
}
}

public static String encodeAppShuffleIdentifier(int appShuffleId, TaskContext context) {
return appShuffleId + "-" + context.stageId() + "-" + context.stageAttemptNumber();
}

public static String[] decodeAppShuffleIdentifier(String appShuffleIdentifier) {
return appShuffleIdentifier.split("-");
}

public static int getEncodedAttemptNumber(TaskContext context) {
return (context.stageAttemptNumber() << 16) | context.attemptNumber();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.spark

import java.util
import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.shuffle.celeborn.SparkCommonUtils

import org.apache.celeborn.client.LifecycleManager
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.ThreadUtils

private[celeborn] class FailedShuffleCleaner(lifecycleManager: LifecycleManager) extends Logging {

Comment thread
turboFei marked this conversation as resolved.
// in celeborn ids
private val shufflesToBeCleaned = new LinkedBlockingQueue[Int]()
private val cleanedShuffleIds = new mutable.HashSet[Int]
Comment thread
CodingCat marked this conversation as resolved.

private lazy val cleanInterval =
lifecycleManager.conf.clientFetchCleanFailedShuffleIntervalMS

// for test
def reset(): Unit = {
shufflesToBeCleaned.clear()
cleanedShuffleIds.clear()
if (cleanerThreadPool != null) {
cleanerThreadPool.shutdownNow()
cleanerThreadPool = null
}
}

def addShuffleIdToBeCleaned(appShuffleIdentifier: String): Unit = {
Comment thread
CodingCat marked this conversation as resolved.
val Array(appShuffleId, _, _) = SparkCommonUtils.decodeAppShuffleIdentifier(
appShuffleIdentifier)
lifecycleManager.getShuffleIdMapping.get(appShuffleId.toInt).foreach {
case (_, (celebornShuffleId, _)) => shufflesToBeCleaned.put(celebornShuffleId)
}
}

def init(): Unit = {
cleanerThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"failedShuffleCleanerThreadPool")
cleanerThreadPool.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
val allShuffleIds = new util.ArrayList[Int]
shufflesToBeCleaned.drainTo(allShuffleIds)
allShuffleIds.asScala.foreach { shuffleId =>
if (!cleanedShuffleIds.contains(shuffleId)) {
lifecycleManager.unregisterShuffle(shuffleId)
logInfo(
s"sent unregister shuffle request for shuffle $shuffleId (celeborn shuffle id)")
cleanedShuffleIds += shuffleId
}
Comment thread
CodingCat marked this conversation as resolved.
}
} catch {
case e: Exception =>
logError("unexpected exception in cleaner thread", e)
}
}
},
cleanInterval,
cleanInterval,
TimeUnit.MILLISECONDS)
}

init()

def removeCleanedShuffleId(celebornShuffleId: Int): Unit = {
cleanedShuffleIds.remove(celebornShuffleId)
}

private var cleanerThreadPool: ScheduledExecutorService = _
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.ShuffleMode;
import org.apache.celeborn.reflect.DynMethods;
import org.apache.celeborn.spark.FailedShuffleCleaner;

/**
* In order to support Spark Stage resubmit with ShuffleReader FetchFails, Celeborn shuffleId has to
Expand Down Expand Up @@ -84,6 +85,8 @@ public class SparkShuffleManager implements ShuffleManager {
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;

private FailedShuffleCleaner failedShuffleCleaner = null;

private long sendBufferPoolCheckInterval;
private long sendBufferPoolExpireTimeout;

Expand Down Expand Up @@ -158,6 +161,23 @@ private void initializeLifecycleManager(String appId) {
}
}

if (lifecycleManager.conf().clientFetchCleanFailedShuffle()) {
Comment thread
CodingCat marked this conversation as resolved.
if (!lifecycleManager.conf().clientStageRerunEnabled()) {
throw new IllegalArgumentException(
CelebornConf.CLIENT_STAGE_RERUN_ENABLED().key()
+ " has to be "
+ "enabled, when "
+ CelebornConf.CLIENT_FETCH_CLEAN_FAILED_SHUFFLE().key()
+ " is set to true");
}
failedShuffleCleaner = new FailedShuffleCleaner(lifecycleManager);
lifecycleManager.registerValidateCelebornShuffleIdForCleanCallback(
Comment thread
CodingCat marked this conversation as resolved.
(appShuffleIdentifier) ->
SparkUtils.addWriterShuffleIdsToBeCleaned(this, appShuffleIdentifier));
lifecycleManager.registerUnregisterShuffleCallback(
(celebornShuffleId) -> SparkUtils.removeCleanedShuffleId(this, celebornShuffleId));
}

if (celebornConf.getReducerFileGroupBroadcastEnabled()) {
lifecycleManager.registerBroadcastGetReducerFileGroupResponseCallback(
(shuffleId, getReducerFileGroupResponse) ->
Expand Down Expand Up @@ -249,6 +269,9 @@ public void stop() {
_sortShuffleManager.stop();
_sortShuffleManager = null;
}
if (celebornConf.clientFetchCleanFailedShuffle()) {
failedShuffleCleaner.reset();
}
}

@Override
Expand Down Expand Up @@ -470,4 +493,8 @@ private void checkUserClassPathFirst(ShuffleHandle handle) {
public LifecycleManager getLifecycleManager() {
return this.lifecycleManager;
}

public FailedShuffleCleaner getFailedShuffleCleaner() {
return this.failedShuffleCleaner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,14 @@ public static String appUniqueId(SparkContext context) {
.getOrElse(context::applicationId);
}

public static String getAppShuffleIdentifier(int appShuffleId, TaskContext context) {
return appShuffleId + "-" + context.stageId() + "-" + context.stageAttemptNumber();
}

public static int celebornShuffleId(
ShuffleClient client,
CelebornShuffleHandle<?, ?, ?> handle,
TaskContext context,
Boolean isWriter) {
if (handle.throwsFetchFailure()) {
String appShuffleIdentifier = getAppShuffleIdentifier(handle.shuffleId(), context);
String appShuffleIdentifier =
SparkCommonUtils.encodeAppShuffleIdentifier(handle.shuffleId(), context);
Tuple2<Integer, Boolean> res =
client.getShuffleId(
handle.shuffleId(),
Expand Down Expand Up @@ -327,7 +324,8 @@ public static void addFailureListenerIfBarrierTask(

if (!(taskContext instanceof BarrierTaskContext)) return;
int appShuffleId = handle.shuffleId();
String appShuffleIdentifier = SparkUtils.getAppShuffleIdentifier(appShuffleId, taskContext);
String appShuffleIdentifier =
SparkCommonUtils.encodeAppShuffleIdentifier(appShuffleId, taskContext);

BarrierTaskContext barrierContext = (BarrierTaskContext) taskContext;
barrierContext.addTaskFailureListener(
Expand Down Expand Up @@ -625,4 +623,14 @@ public static void invalidateSerializedGetReducerFileGroupResponse(Integer shuff
return null;
});
}

public static void addWriterShuffleIdsToBeCleaned(
SparkShuffleManager sparkShuffleManager, String appShuffleIdentifier) {
sparkShuffleManager.getFailedShuffleCleaner().addShuffleIdToBeCleaned(appShuffleIdentifier);
}

public static void removeCleanedShuffleId(
SparkShuffleManager sparkShuffleManager, int celebornShuffleId) {
sparkShuffleManager.getFailedShuffleCleaner().removeCleanedShuffleId(celebornShuffleId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
logInfo(s"reuse existing shuffleId $id for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
id
} else {
// this branch means it is a redo of previous write stage
if (isBarrierStage) {
// unregister previous shuffle(s) which are still valid
val mapUpdates = shuffleIds.filter(_._2._2).map { kv =>
Expand All @@ -941,6 +942,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}
val newShuffleId = shuffleIdGenerator.getAndIncrement()
logInfo(s"generate new shuffleId $newShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
validateCelebornShuffleIdForClean.foreach(callback =>
callback.accept(appShuffleIdentifier))
shuffleIds.put(appShuffleIdentifier, (newShuffleId, true))
newShuffleId
}
Expand All @@ -954,11 +957,12 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
} else {
shuffleIds.values.filter(v => v._2).map(v => v._1).toSeq.reverse.find(
areAllMapTasksEnd) match {
case Some(shuffleId) =>
case Some(celebornShuffleId) =>
val pbGetShuffleIdResponse = {
logDebug(
s"get shuffleId $shuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter")
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).setSuccess(true).build()
s"get shuffleId $celebornShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter")
PbGetShuffleIdResponse.newBuilder().setShuffleId(celebornShuffleId).setSuccess(
true).build()
}
context.reply(pbGetShuffleIdResponse)
case None =>
Expand Down Expand Up @@ -1160,6 +1164,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
shuffleIds.values.map {
case (shuffleId, _) =>
unregisterShuffle(shuffleId)
unregisterShuffleCallback.foreach(c => c.accept(shuffleId))
})
}
} else {
Expand Down Expand Up @@ -1850,6 +1855,19 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
appShuffleTrackerCallback = Some(callback)
}

// expecting celeborn shuffle id and application shuffle identifier
@volatile private var validateCelebornShuffleIdForClean: Option[Consumer[String]] =
None
def registerValidateCelebornShuffleIdForCleanCallback(
callback: Consumer[String]): Unit = {
validateCelebornShuffleIdForClean = Some(callback)
}

@volatile private var unregisterShuffleCallback: Option[Consumer[Integer]] = None
def registerUnregisterShuffleCallback(callback: Consumer[Integer]): Unit = {
unregisterShuffleCallback = Some(callback)
}

def registerAppShuffleDeterminate(appShuffleId: Int, determinate: Boolean): Unit = {
appShuffleDeterminateMap.put(appShuffleId, determinate)
}
Expand Down Expand Up @@ -1943,4 +1961,6 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}
})
}

def getShuffleIdMapping = shuffleIdMapping
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ReducePartitionCommitHandler(

private val getReducerFileGroupRequest =
JavaUtils.newConcurrentHashMap[Int, util.Set[MultiSerdeVersionRpcContext]]()
private val dataLostShuffleSet = ConcurrentHashMap.newKeySet[Int]()
private[celeborn] val dataLostShuffleSet = ConcurrentHashMap.newKeySet[Int]()
private val stageEndShuffleSet = ConcurrentHashMap.newKeySet[Int]()
private val inProcessStageEndShuffleSet = ConcurrentHashMap.newKeySet[Int]()
private val shuffleMapperAttempts = JavaUtils.newConcurrentHashMap[Int, Array[Int]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se

def clientFetchMaxRetriesForEachReplica: Int = get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
def clientStageRerunEnabled: Boolean = get(CLIENT_STAGE_RERUN_ENABLED)
def clientFetchCleanFailedShuffle: Boolean = get(CLIENT_FETCH_CLEAN_FAILED_SHUFFLE)
def clientFetchCleanFailedShuffleIntervalMS: Long =
get(CLIENT_FETCH_CLEAN_FAILED_SHUFFLE_INTERVAL)
def clientFetchExcludeWorkerOnFailureEnabled: Boolean =
get(CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
def clientFetchExcludedWorkerExpireTimeout: Long =
Expand Down Expand Up @@ -4813,6 +4816,23 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val CLIENT_FETCH_CLEAN_FAILED_SHUFFLE: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.fetch.cleanFailedShuffle")
.categories("client")
.version("0.6.0")
.doc("whether to clean those disk space occupied by shuffles which cannot be fetched")
.booleanConf
.createWithDefault(false)

val CLIENT_FETCH_CLEAN_FAILED_SHUFFLE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.spark.fetch.cleanFailedShuffleInterval")
.categories("client")
.version("0.6.0")
.doc("the interval to clean the failed-to-fetch shuffle files, only valid when" +
s" ${CLIENT_FETCH_CLEAN_FAILED_SHUFFLE.key} is enabled")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
Comment thread
mridulm marked this conversation as resolved.

val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
.categories("client")
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ license: |
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether to revise lost shuffles. | 0.6.0 | |
| celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 0.3.1 | |
| celeborn.client.spark.fetch.cleanFailedShuffle | false | false | whether to clean those disk space occupied by shuffles which cannot be fetched | 0.6.0 | |
| celeborn.client.spark.fetch.cleanFailedShuffleInterval | 1s | false | the interval to clean the failed-to-fetch shuffle files, only valid when celeborn.client.spark.fetch.cleanFailedShuffle is enabled | 0.6.0 | |
| celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | Whether to dynamically switch push write mode based on conditions.If true, shuffle mode will be only determined by partition count | 0.5.0 | |
| celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | false | Threshold of shuffle partition number for dynamically switching push writer mode. When the shuffle partition number is greater than this value, use the sort-based shuffle writer for memory efficiency; otherwise use the hash-based shuffle writer for speed. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 | |
| celeborn.client.spark.push.sort.memory.maxMemoryFactor | 0.4 | false | the max portion of executor memory which can be used for SortBasedWriter buffer (only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is enabled | 0.5.0 | |
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@
<log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
<log4j2.configurationFile>src/test/resources/log4j2-test.xml</log4j2.configurationFile>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<spark.driver.memory>1g</spark.driver.memory>
<spark.driver.memory>8g</spark.driver.memory>
<spark.shuffle.sort.io.plugin.class>${spark.shuffle.plugin.class}</spark.shuffle.sort.io.plugin.class>
</systemProperties>
<environmentVariables>
Expand Down Expand Up @@ -946,7 +946,7 @@
<log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
<log4j2.configurationFile>src/test/resources/log4j2-test.xml</log4j2.configurationFile>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<spark.driver.memory>1g</spark.driver.memory>
<spark.driver.memory>8g</spark.driver.memory>
<spark.shuffle.sort.io.plugin.class>${spark.shuffle.plugin.class}</spark.shuffle.sort.io.plugin.class>
</systemProperties>
<environmentVariables>
Expand Down
Loading
Loading