Skip to content

Commit a717a1c

Browse files
committed
[CELEBORN-2327] Add active-slot weight to load-aware placement
1 parent 9ebbc6b commit a717a1c

6 files changed

Lines changed: 63 additions & 4 deletions

File tree

common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
679679
get(MASTER_SLOT_ASSIGN_LOADAWARE_FLUSHTIME_WEIGHT)
680680
def masterSlotAssignLoadAwareFetchTimeWeight: Double =
681681
get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT)
682+
def masterSlotAssignLoadAwareActiveSlotsWeight: Double =
683+
get(MASTER_SLOT_ASSIGN_LOADAWARE_ACTIVE_SLOTS_WEIGHT)
682684
def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS)
683685
def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS)
684686
def masterSlotAssignMinWorkers: Int = get(MASTER_SLOT_ASSIGN_MIN_WORKERS)
@@ -3154,6 +3156,16 @@ object CelebornConf extends Logging {
31543156
.doubleConf
31553157
.createWithDefault(1)
31563158

3159+
val MASTER_SLOT_ASSIGN_LOADAWARE_ACTIVE_SLOTS_WEIGHT: ConfigEntry[Double] =
3160+
buildConf("celeborn.master.slot.assign.loadAware.activeSlotsWeight")
3161+
.withAlternative("celeborn.slots.assign.loadAware.activeSlotsWeight")
3162+
.categories("master")
3163+
.doc(
3164+
"Weight of active slots when calculating ordering in load-aware assignment strategy")
3165+
.version("0.7.0")
3166+
.doubleConf
3167+
.createWithDefault(0)
3168+
31573169
val MASTER_SLOT_ASSIGN_EXTRA_SLOTS: ConfigEntry[Int] =
31583170
buildConf("celeborn.master.slot.assign.extraSlots")
31593171
.withAlternative("celeborn.slots.assign.extraSlots")

docs/configuration/master.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ license: |
7777
| celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. Provided enough workers are available. | 0.3.0 | celeborn.slots.assign.extraSlots |
7878
| celeborn.master.slot.assign.interruptionAware | false | false | If this is set to true, Celeborn master will prioritize partition placement on workers that are not in scope for maintenance soon. | 0.7.0 | |
7979
| celeborn.master.slot.assign.interruptionAware.threshold | 50 | false | This controls what percentage of hosts would be selected for slot selection in the first iteration of creating partitions. Default is 50%. | 0.7.0 | |
80+
| celeborn.master.slot.assign.loadAware.activeSlotsWeight | 0.0 | false | Weight of active slots when calculating ordering in load-aware assignment strategy | 0.7.0 | celeborn.slots.assign.loadAware.activeSlotsWeight |
8081
| celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient |
8182
| celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight |
8283
| celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | false | Weight of average flush time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.flushTimeWeight |

docs/developers/slotsallocation.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ celeborn.master.slot.assign.loadAware.numDiskGroups 5
3333
celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1
3434
celeborn.master.slot.assign.loadAware.flushTimeWeight 0
3535
celeborn.master.slot.assign.loadAware.fetchTimeWeight 1
36+
celeborn.master.slot.assign.loadAware.activeSlotsWeight 0
3637
[spark.client.]celeborn.storage.availableTypes HDD,SSD
3738
```
3839
### Detail
@@ -44,7 +45,7 @@ Load-aware slots allocation will take following elements into consideration.
4445
- disk's used slot
4546

4647
Slots allocator will find out all worker involved in this allocation and sort their disks by
47-
`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight`.
48+
`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight + disk's active slots * active slots weight`.
4849
After getting the sorted disks list, Celeborn will split the disks into
4950
`celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number to be placed into a disk group
5051
is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient` which means that a group's

master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ static class UsableDiskInfo {
124124
double diskGroupGradient,
125125
double flushTimeWeight,
126126
double fetchTimeWeight,
127+
double activeSlotsWeight,
127128
int availableStorageTypes,
128129
boolean interruptionAware,
129130
int interruptionAwareThreshold) {
@@ -193,7 +194,8 @@ static class UsableDiskInfo {
193194

194195
Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions =
195196
getSlotsRestrictionsByLoadAwareAlgorithm(
196-
placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight),
197+
placeDisksToGroups(
198+
usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight, activeSlotsWeight),
197199
diskToWorkerMap,
198200
shouldReplicate ? partitionIds.size() * 2 : partitionIds.size());
199201
return locateSlots(
@@ -685,13 +687,17 @@ private static List<List<DiskInfo>> placeDisksToGroups(
685687
List<DiskInfo> usableDisks,
686688
int diskGroupCount,
687689
double flushTimeWeight,
688-
double fetchTimeWeight) {
690+
double fetchTimeWeight,
691+
double activeSlotsWeight) {
689692
List<List<DiskInfo>> diskGroups = new ArrayList<>();
690693
usableDisks.sort(
691694
(o1, o2) -> {
692695
double delta =
693696
(o1.avgFlushTime() * flushTimeWeight + o1.avgFetchTime() * fetchTimeWeight)
694-
- (o2.avgFlushTime() * flushTimeWeight + o2.avgFetchTime() * fetchTimeWeight);
697+
+ o1.activeSlots() * activeSlotsWeight
698+
- (o2.avgFlushTime() * flushTimeWeight
699+
+ o2.avgFetchTime() * fetchTimeWeight
700+
+ o2.activeSlots() * activeSlotsWeight);
695701
return delta < 0 ? -1 : (delta > 0 ? 1 : 0);
696702
});
697703
int diskCount = usableDisks.size();

master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ private[celeborn] class Master(
212212
conf.masterSlotAssignLoadAwareDiskGroupGradient
213213
private val loadAwareFlushTimeWeight = conf.masterSlotAssignLoadAwareFlushTimeWeight
214214
private val loadAwareFetchTimeWeight = conf.masterSlotAssignLoadAwareFetchTimeWeight
215+
private val loadAwareActiveSlotsWeight = conf.masterSlotAssignLoadAwareActiveSlotsWeight
215216

216217
private val estimatedPartitionSizeUpdaterInitialDelay =
217218
conf.estimatedPartitionSizeUpdaterInitialDelay
@@ -981,6 +982,7 @@ private[celeborn] class Master(
981982
slotsAssignLoadAwareDiskGroupGradient,
982983
loadAwareFlushTimeWeight,
983984
loadAwareFetchTimeWeight,
985+
loadAwareActiveSlotsWeight,
984986
requestSlots.availableStorageTypes,
985987
slotsAssignInterruptionAware,
986988
slotsAssignInterruptionAwareThreshold)

master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,41 @@ public void testAllocate3000ReduceIdsWithReplicateOnRoundRobin() {
144144
check(workers, partitionIds, shouldReplicate, true, true, false, 0);
145145
}
146146

147+
@Test
148+
public void testLoadAwarePrefersLowerActiveSlotsWhenConfigured() {
149+
final List<WorkerInfo> workers =
150+
basePrepareWorkers(
151+
2,
152+
true,
153+
ImmutableMap.of("/mnt/disk1", 100 * 1024 * 1024 * 1024L),
154+
64 * 1024 * 1024L,
155+
1,
156+
false,
157+
new Random(0));
158+
final DiskInfo overloadedDisk = workers.get(0).diskInfos().get("/mnt/disk1");
159+
final DiskInfo lightlyReservedDisk = workers.get(1).diskInfos().get("/mnt/disk1");
160+
overloadedDisk.activeSlots_$eq(1000);
161+
lightlyReservedDisk.activeSlots_$eq(0);
162+
163+
final Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots =
164+
SlotsAllocator.offerSlotsLoadAware(
165+
workers,
166+
Collections.singletonList(0),
167+
false,
168+
false,
169+
2,
170+
1,
171+
0,
172+
0,
173+
1,
174+
StorageInfo.ALL_TYPES_AVAILABLE_MASK,
175+
false,
176+
0);
177+
178+
assertTrue(slots.containsKey(workers.get(1)));
179+
assertFalse(slots.containsKey(workers.get(0)));
180+
}
181+
147182
private void check(
148183
List<WorkerInfo> workers,
149184
List<Integer> partitionIds,
@@ -186,6 +221,7 @@ private Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>
186221
conf.masterSlotAssignLoadAwareDiskGroupGradient(),
187222
conf.masterSlotAssignLoadAwareFlushTimeWeight(),
188223
conf.masterSlotAssignLoadAwareFetchTimeWeight(),
224+
conf.masterSlotAssignLoadAwareActiveSlotsWeight(),
189225
StorageInfo.ALL_TYPES_AVAILABLE_MASK,
190226
interruptionAware,
191227
interruptionAwareThreshold);
@@ -298,6 +334,7 @@ private void checkSlotsOnDFS(
298334
0.1,
299335
0,
300336
1,
337+
0,
301338
StorageInfo.LOCAL_DISK_MASK | availableStorageTypes,
302339
false,
303340
0);

0 commit comments

Comments
 (0)