Skip to content

Commit 71a7d0a

Browse files
Dzeri96SteNicholas
authored andcommitted
[CELEBORN-2257] Add reporting of remote disks during registration
### What changes were proposed in this pull request? 1. Disks reported to the master on registration now include remote disks (HDFS, S3, OSS) 2. Refactored method names to clarify difference between local and remote disks. 3. Embedded disk type information into the enum. 4. Refactored unnecessarily complicated code in the slot assignment and worker registration path. ### Why are the changes needed? 1. Before the first heartbeat, the master won't be able to assign slots from the remote disks on the worker. 2. All other changes are in preparation for better support of remote disks. ### Does this PR resolve a correctness bug? Not a correctness bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? **Important**: I want help from the community on how to write tests for this. Closes #3597 from Dzeri96/CELEBORN-2257. Authored-by: Filip Darmanovic <dzeri96@proton.me> Signed-off-by: SteNicholas <programgeek@163.com>
1 parent 9ebbc6b commit 71a7d0a

10 files changed

Lines changed: 131 additions & 90 deletions

File tree

common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,34 @@
2222

2323
public class StorageInfo implements Serializable {
2424
public enum Type {
25-
MEMORY(0),
26-
HDD(1),
27-
SSD(2),
28-
HDFS(3),
29-
OSS(4),
30-
S3(5);
25+
MEMORY(0, false, MEMORY_MASK),
26+
HDD(1, false, LOCAL_DISK_MASK),
27+
SSD(2, false, LOCAL_DISK_MASK),
28+
HDFS(3, true, HDFS_MASK),
29+
OSS(4, true, OSS_MASK),
30+
S3(5, true, S3_MASK);
3131

3232
private final int value;
33+
private final boolean isDFS;
34+
private final int mask;
3335

34-
Type(int value) {
36+
Type(int value, boolean isDFS, int mask) {
3537
this.value = value;
38+
this.isDFS = isDFS;
39+
this.mask = mask;
3640
}
3741

3842
public int getValue() {
3943
return value;
4044
}
45+
46+
public boolean isDFS() {
47+
return isDFS;
48+
}
49+
50+
public int getMask() {
51+
return mask;
52+
}
4153
}
4254

4355
public static final Map<Integer, Type> typesMap = new HashMap<>();
@@ -232,6 +244,11 @@ public boolean S3Available() {
232244
return S3Available(availableStorageTypes);
233245
}
234246

247+
public static boolean isAvailable(Type type, int availableStorageTypes) {
248+
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
249+
|| (availableStorageTypes & type.getMask()) > 0;
250+
}
251+
235252
@Override
236253
public boolean equals(Object o) {
237254
if (this == o) return true;

common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,25 +214,21 @@ class WorkerInfo(
214214
for (newDisk <- newDiskInfos.values().asScala) {
215215
val mountPoint: String = newDisk.mountPoint
216216
val curDisk = diskInfos.get(mountPoint)
217+
if (estimatedPartitionSize.nonEmpty && !newDisk.storageType.isDFS) {
218+
newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
219+
newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get
220+
}
217221
if (curDisk != null) {
218222
curDisk.actualUsableSpace = newDisk.actualUsableSpace
219223
curDisk.totalSpace = newDisk.totalSpace
220224
// Update master's diskinfo activeslots to worker's value
221225
curDisk.activeSlots = newDisk.activeSlots
222226
curDisk.avgFlushTime = newDisk.avgFlushTime
223227
curDisk.avgFetchTime = newDisk.avgFetchTime
224-
if (estimatedPartitionSize.nonEmpty && curDisk.storageType != StorageInfo.Type.HDFS
225-
&& curDisk.storageType != StorageInfo.Type.S3 && curDisk.storageType != StorageInfo.Type.OSS) {
226-
curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
227-
curDisk.availableSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get
228-
}
228+
curDisk.maxSlots = newDisk.maxSlots
229+
curDisk.availableSlots = newDisk.availableSlots
229230
curDisk.setStatus(newDisk.status)
230231
} else {
231-
if (estimatedPartitionSize.nonEmpty && newDisk.storageType != StorageInfo.Type.HDFS
232-
&& newDisk.storageType != StorageInfo.Type.S3 && newDisk.storageType != StorageInfo.Type.OSS) {
233-
newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
234-
newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get
235-
}
236232
diskInfos.put(mountPoint, newDisk)
237233
}
238234
}

master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.celeborn.service.deploy.master;
1919

20+
import static org.apache.celeborn.common.protocol.StorageInfo.Type.*;
21+
2022
import java.util.*;
2123
import java.util.function.IntUnaryOperator;
2224
import java.util.stream.Collectors;
@@ -40,6 +42,12 @@ static class UsableDiskInfo {
4042
DiskInfo diskInfo;
4143
long usableSlots;
4244

45+
/** @param diskInfo will be used as source for usableSlots. */
46+
UsableDiskInfo(DiskInfo diskInfo) {
47+
this.diskInfo = diskInfo;
48+
this.usableSlots = diskInfo.getAvailableSlots();
49+
}
50+
4351
UsableDiskInfo(DiskInfo diskInfo, long usableSlots) {
4452
this.diskInfo = diskInfo;
4553
this.usableSlots = usableSlots;
@@ -70,31 +78,10 @@ static class UsableDiskInfo {
7078
for (WorkerInfo worker : workers) {
7179
List<UsableDiskInfo> usableDisks =
7280
slotsRestrictions.computeIfAbsent(worker, v -> new ArrayList<>());
73-
for (Map.Entry<String, DiskInfo> diskInfoEntry : worker.diskInfos().entrySet()) {
74-
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
75-
if (StorageInfo.localDiskAvailable(availableStorageTypes)
76-
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.HDFS
77-
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3
78-
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.OSS) {
79-
usableDisks.add(
80-
new UsableDiskInfo(
81-
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
82-
} else if (StorageInfo.HDFSAvailable(availableStorageTypes)
83-
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.HDFS) {
84-
usableDisks.add(
85-
new UsableDiskInfo(
86-
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
87-
} else if (StorageInfo.S3Available(availableStorageTypes)
88-
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.S3) {
89-
usableDisks.add(
90-
new UsableDiskInfo(
91-
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
92-
} else if (StorageInfo.OSSAvailable(availableStorageTypes)
93-
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.OSS) {
94-
usableDisks.add(
95-
new UsableDiskInfo(
96-
diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots()));
97-
}
81+
for (DiskInfo diskInfo : worker.diskInfos().values()) {
82+
if (DiskStatus.HEALTHY.equals(diskInfo.status())
83+
&& StorageInfo.isAvailable(diskInfo.storageType(), availableStorageTypes)) {
84+
usableDisks.add(new UsableDiskInfo(diskInfo));
9885
}
9986
}
10087
}
@@ -157,9 +144,7 @@ static class UsableDiskInfo {
157144
diskToWorkerMap.put(diskInfo, i);
158145
if (diskInfo.actualUsableSpace() > 0
159146
&& diskInfo.status().equals(DiskStatus.HEALTHY)
160-
&& diskInfo.storageType() != StorageInfo.Type.HDFS
161-
&& diskInfo.storageType() != StorageInfo.Type.S3
162-
&& diskInfo.storageType() != StorageInfo.Type.OSS) {
147+
&& !diskInfo.storageType().isDFS()) {
163148
usableDisks.add(diskInfo);
164149
}
165150
}));
@@ -225,12 +210,8 @@ static StorageInfo buildStorageInfo(
225210
}
226211
usableDiskInfos.get(diskIndex).usableSlots--;
227212
DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo;
228-
if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) {
229-
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes);
230-
} else if (selectedDiskInfo.storageType() == StorageInfo.Type.S3) {
231-
storageInfo = new StorageInfo("", StorageInfo.Type.S3, availableStorageTypes);
232-
} else if (selectedDiskInfo.storageType() == StorageInfo.Type.OSS) {
233-
storageInfo = new StorageInfo("", StorageInfo.Type.OSS, availableStorageTypes);
213+
if (selectedDiskInfo.storageType().isDFS()) {
214+
storageInfo = new StorageInfo("", selectedDiskInfo.storageType(), availableStorageTypes);
234215
} else {
235216
storageInfo =
236217
new StorageInfo(
@@ -243,9 +224,7 @@ static StorageInfo buildStorageInfo(
243224
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
244225
DiskInfo[] diskInfos =
245226
selectedWorker.diskInfos().values().stream()
246-
.filter(p -> p.storageType() != StorageInfo.Type.HDFS)
247-
.filter(p -> p.storageType() != StorageInfo.Type.S3)
248-
.filter(p -> p.storageType() != StorageInfo.Type.OSS)
227+
.filter(p -> !p.storageType().isDFS())
249228
.collect(Collectors.toList())
250229
.toArray(new DiskInfo[0]);
251230
int diskIndex =
@@ -257,11 +236,11 @@ static StorageInfo buildStorageInfo(
257236
availableStorageTypes);
258237
workerDiskIndex.put(selectedWorker, (diskIndex + 1) % diskInfos.length);
259238
} else if (StorageInfo.S3Available(availableStorageTypes)) {
260-
storageInfo = new StorageInfo("", StorageInfo.Type.S3, availableStorageTypes);
239+
storageInfo = new StorageInfo("", S3, availableStorageTypes);
261240
} else if (StorageInfo.OSSAvailable(availableStorageTypes)) {
262-
storageInfo = new StorageInfo("", StorageInfo.Type.OSS, availableStorageTypes);
241+
storageInfo = new StorageInfo("", OSS, availableStorageTypes);
263242
} else if (StorageInfo.HDFSAvailable(availableStorageTypes)) {
264-
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes);
243+
storageInfo = new StorageInfo("", HDFS, availableStorageTypes);
265244
} else if (StorageInfo.memoryAvailable(availableStorageTypes)) {
266245
storageInfo = new StorageInfo("", StorageInfo.Type.MEMORY, availableStorageTypes);
267246
} else {

tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
7575
// shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
7676
workers.foreach { worker =>
7777
worker.storageManager.updateDiskInfos()
78-
worker.storageManager.disksSnapshot().foreach { diskInfo =>
78+
worker.storageManager.localDisksSnapshot().foreach { diskInfo =>
7979
assert(diskInfo.actualUsableSpace <= 0)
8080
}
8181
}
@@ -89,7 +89,7 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
8989
assert(t.size() === 0)
9090
}
9191
// after shuffle key expired, diskInfo.actualUsableSpace will equal capacity=1000
92-
worker.storageManager.disksSnapshot().foreach { diskInfo =>
92+
worker.storageManager.localDisksSnapshot().foreach { diskInfo =>
9393
assert(diskInfo.actualUsableSpace === 1000)
9494
}
9595
}

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ private[deploy] class Controller(
190190
return
191191
}
192192

193-
if (storageManager.healthyWorkingDirs().size <= 0 && remoteStorageDirs.isEmpty) {
193+
if (storageManager.healthyLocalWorkingDirs().size <= 0 && remoteStorageDirs.isEmpty) {
194194
val msg = "Local storage has no available dirs!"
195195
logError(s"[handleReserveSlots] $msg")
196196
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, msg))

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,10 @@ private[celeborn] class Worker(
284284
storageManager.updateDiskInfos()
285285
storageManager.startDeviceMonitor()
286286

287-
// WorkerInfo's diskInfos is a reference to storageManager.diskInfos
288-
val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
289-
storageManager.disksSnapshot().foreach { diskInfo =>
290-
diskInfos.put(diskInfo.mountPoint, diskInfo)
291-
}
287+
private val diskInfos = storageManager
288+
.allDisksSnapshot()
289+
.map { diskInfo => diskInfo.mountPoint -> diskInfo }
290+
.toMap.asJava
292291

293292
val workerInfo =
294293
new WorkerInfo(
@@ -515,10 +514,10 @@ private[celeborn] class Worker(
515514
activeShuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
516515
activeShuffleKeys.addAll(storageManager.shuffleKeySet())
517516
storageManager.updateDiskInfos()
518-
val diskInfos =
519-
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { disk =>
520-
disk.mountPoint -> disk
521-
}.toMap.asJava).values().asScala.toSeq ++ storageManager.remoteDiskInfos.getOrElse(Set.empty)
517+
val currentDiskMap = storageManager.allDisksSnapshot().map { disk =>
518+
disk.mountPoint -> disk
519+
}.toMap.asJava
520+
val diskInfos = workerInfo.updateThenGetDiskInfos(currentDiskMap).asScala.values.toSeq
522521
workerStatusManager.checkIfNeedTransitionStatus()
523522
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
524523
HeartbeatFromWorker(

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,19 +103,23 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
103103
if (diskInfoSet.nonEmpty) Some(diskInfoSet) else None
104104
}
105105

106-
def disksSnapshot(): List[DiskInfo] = {
106+
def localDisksSnapshot(): List[DiskInfo] = {
107107
diskInfos.synchronized {
108108
val disks = new util.ArrayList[DiskInfo](diskInfos.values())
109109
disks.asScala.toList
110110
}
111111
}
112112

113-
def healthyWorkingDirs(): List[File] =
114-
disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)
113+
def allDisksSnapshot(): List[DiskInfo] = {
114+
localDisksSnapshot() ++ remoteDiskInfos.getOrElse(Nil)
115+
}
116+
117+
def healthyLocalWorkingDirs(): List[File] =
118+
localDisksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)
115119

116120
private val diskOperators: ConcurrentHashMap[String, ThreadPoolExecutor] = {
117121
val cleaners = JavaUtils.newConcurrentHashMap[String, ThreadPoolExecutor]()
118-
disksSnapshot().foreach {
122+
localDisksSnapshot().foreach {
119123
diskInfo =>
120124
cleaners.put(
121125
diskInfo.mountPoint,
@@ -127,7 +131,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
127131
}
128132

129133
val tmpDiskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
130-
disksSnapshot().foreach { diskInfo =>
134+
localDisksSnapshot().foreach { diskInfo =>
131135
tmpDiskInfos.put(diskInfo.mountPoint, diskInfo)
132136
}
133137
private val deviceMonitor =
@@ -142,7 +146,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
142146
_totalLocalFlusherThread: Int) = {
143147
val flushers = JavaUtils.newConcurrentHashMap[String, LocalFlusher]()
144148
var totalThread = 0
145-
disksSnapshot().foreach { diskInfo =>
149+
localDisksSnapshot().foreach { diskInfo =>
146150
if (!flushers.containsKey(diskInfo.mountPoint)) {
147151
val flusher = new LocalFlusher(
148152
workerSource,
@@ -269,7 +273,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
269273
private val counter = new AtomicInteger()
270274
private val counterOperator = new IntUnaryOperator() {
271275
override def applyAsInt(operand: Int): Int = {
272-
val dirs = healthyWorkingDirs()
276+
val dirs = healthyLocalWorkingDirs()
273277
if (dirs.nonEmpty) {
274278
(operand + 1) % dirs.length
275279
} else 0
@@ -489,7 +493,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
489493
userIdentifier: UserIdentifier,
490494
partitionSplitEnabled: Boolean,
491495
isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
492-
if (healthyWorkingDirs().isEmpty && remoteStorageDirs.isEmpty) {
496+
if (healthyLocalWorkingDirs().isEmpty && remoteStorageDirs.isEmpty) {
493497
throw new IOException("No available working dirs!")
494498
}
495499
val partitionDataWriterContext = new PartitionDataWriterContext(
@@ -687,7 +691,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
687691
}
688692
}
689693
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
690-
disksSnapshot().filter(diskInfo =>
694+
localDisksSnapshot().filter(diskInfo =>
691695
diskInfo.status == DiskStatus.HEALTHY
692696
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE).foreach { diskInfo =>
693697
diskInfo.dirs.foreach { dir =>
@@ -751,7 +755,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
751755
TimeUnit.MINUTES)
752756

753757
private def cleanupExpiredAppDirs(expireDuration: Long): Unit = {
754-
val diskInfoAndAppDirs = disksSnapshot()
758+
val diskInfoAndAppDirs = localDisksSnapshot()
755759
.filter(diskInfo =>
756760
diskInfo.status == DiskStatus.HEALTHY
757761
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE)
@@ -801,7 +805,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
801805
val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1)
802806
while (retryTimes < conf.workerCheckFileCleanMaxRetries) {
803807
val localCleaned =
804-
!disksSnapshot().filter(_.status != DiskStatus.IO_HANG).exists { diskInfo =>
808+
!localDisksSnapshot().filter(_.status != DiskStatus.IO_HANG).exists { diskInfo =>
805809
diskInfo.dirs.exists {
806810
case workingDir if workingDir.exists() =>
807811
// Don't check appDirs that store information in the fileInfos
@@ -946,7 +950,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
946950
}
947951

948952
def updateDiskInfos(): Unit = this.synchronized {
949-
disksSnapshot()
953+
localDisksSnapshot()
950954
.filter(diskInfo =>
951955
diskInfo.status != DiskStatus.IO_HANG && diskInfo.status != DiskStatus.READ_OR_WRITE_FAILURE)
952956
.foreach {
@@ -985,7 +989,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
985989
diskInfo.updateFlushTime()
986990
diskInfo.updateFetchTime()
987991
}
988-
logInfo(s"Updated diskInfos:\n${disksSnapshot().mkString("\n")}")
992+
logInfo(s"Updated diskInfos:\n${localDisksSnapshot().mkString("\n")}")
989993
}
990994

991995
def getFileSystemReportedSpace(mountPoint: String): (Long, Long) = {
@@ -1148,7 +1152,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
11481152
logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" +
11491153
s" working dirs.")
11501154
}
1151-
healthyWorkingDirs()
1155+
healthyLocalWorkingDirs()
11521156
}
11531157
if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) {
11541158
throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")

worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,12 +213,12 @@ trait MiniClusterFeature extends Logging {
213213
val workers = new Array[Worker](workerNum)
214214
val flagUpdateLock = new ReentrantLock()
215215
val threads = (1 to workerNum).map { i =>
216+
val worker = createWorker(workerConf)
216217
val workerThread = new RunnerWrap({
217218
var workerStartRetry = 0
218219
var workerStarted = false
219220
while (!workerStarted) {
220221
try {
221-
val worker = createWorker(workerConf)
222222
flagUpdateLock.lock()
223223
workers(i - 1) = worker
224224
flagUpdateLock.unlock()

0 commit comments

Comments
 (0)