Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,30 @@ public class DiskFileInfo extends FileInfo {
private static final Logger logger = LoggerFactory.getLogger(DiskFileInfo.class);
private final String filePath;
private final StorageInfo.Type storageType;
private final DiskInfo diskInfo;

public DiskFileInfo(
DiskInfo diskInfo,
UserIdentifier userIdentifier,
boolean partitionSplitEnabled,
FileMeta fileMeta,
String filePath,
StorageInfo.Type storageType) {
super(userIdentifier, partitionSplitEnabled, fileMeta);
this.diskInfo = diskInfo;
this.filePath = filePath;
this.storageType = storageType;
}

public DiskFileInfo(
UserIdentifier userIdentifier,
boolean partitionSplitEnabled,
FileMeta fileMeta,
String filePath,
StorageInfo.Type storageType) {
this(null, userIdentifier, partitionSplitEnabled, fileMeta, filePath, storageType);
}

// only called when restore from pb or in UT
public DiskFileInfo(
UserIdentifier userIdentifier,
Expand All @@ -60,6 +72,13 @@ public DiskFileInfo(
StorageInfo.Type storageType,
long bytesFlushed) {
super(userIdentifier, partitionSplitEnabled, fileMeta);

// TODO: Figure out a way to map the right diskInfo when restoring from pb,
// currently we just set it to null and skip the acquireBytesFlushed logic in
// DiskFileInfo#acquireBytesFlushed
// However during graceful shutdown, we likley have already hard split therefore no more writes
// to this file.
this.diskInfo = null;
this.filePath = filePath;
if (storageType != null) {
this.storageType = storageType;
Expand All @@ -83,6 +102,9 @@ public DiskFileInfo(UserIdentifier userIdentifier, FileMeta fileMeta, String fil
super(userIdentifier, true, fileMeta);
this.filePath = filePath;
this.storageType = StorageInfo.Type.HDD;

// Only used by sorter, hence we know no diskInfo acquires needed
this.diskInfo = null;
}

public File getFile() {
Expand All @@ -94,6 +116,15 @@ public String getFilePath() {
return filePath;
}

@Override
protected boolean canAcquireBytes(long bytes) {
if (diskInfo != null) {
return diskInfo.acquireBytesFlushed(bytes);
} else {
return true;
}
}

public String getSortedPath() {
return Utils.getSortedFilePath(filePath);
}
Expand Down
21 changes: 21 additions & 0 deletions common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract class FileInfo {
protected FileMeta fileMeta;
protected final Set<Long> streams = ConcurrentHashMap.newKeySet();
protected volatile long bytesFlushed;
protected volatile long acquiredBytes;
private boolean isReduceFileMeta;

public FileInfo(UserIdentifier userIdentifier, boolean partitionSplitEnabled, FileMeta fileMeta) {
Expand All @@ -56,13 +57,31 @@ public long getFileLength() {
return bytesFlushed;
}

public long getAcquiredBytes() {
return acquiredBytes;
}

public synchronized void updateBytesFlushed(long bytes) {
acquireBytes(bytes);
bytesFlushed += bytes;
if (isReduceFileMeta) {
getReduceFileMeta().updateChunkOffset(bytesFlushed, false);
}
}

public synchronized void acquireBytes(long bytes) {
if (!canAcquireBytes(bytes)) {
throw new IllegalStateException(
"Failed to acquire bytes for file: "
+ getFilePath()
+ ", current bytesFlushed: "
+ bytesFlushed
+ ", trying to acquire: "
+ bytes);
}
acquiredBytes += bytes;
}

public UserIdentifier getUserIdentifier() {
return userIdentifier;
}
Expand Down Expand Up @@ -112,4 +131,6 @@ public boolean isStreamsEmpty() {
public boolean isReduceFileMeta() {
return isReduceFileMeta;
}

protected abstract boolean canAcquireBytes(long bytes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ public int releaseMemoryBuffers() {
public String getFilePath() {
return "";
}

@Override
protected boolean canAcquireBytes(long bytes) {
// NO-OP
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
def workerStorageBaseDirDiskType: String = get(WORKER_STORAGE_BASE_DIR_DISK_TYPE)
def workerStorageExpireDirTimeout: Long = get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT)
def workerDiskStorageStrictReserveEnabled: Boolean =
get(WORKER_DISK_STORAGE_STRICT_RESERVE_ENABLED)
def creditStreamThreadsPerMountpoint: Int = get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT)
def workerDirectMemoryRatioForReadBuffer: Double = get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
Expand Down Expand Up @@ -3313,6 +3315,16 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")

val WORKER_DISK_STORAGE_STRICT_RESERVE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.disk.storage.strictReserve.enabled")
.categories("worker")
.version("0.6.0")
.doc("Whether to enable strict bookkeeping for worker's disk storage." +
"With this set to true, data wrtiers try to acquire storage space before each flush," +
"ensuring that disk full based HARD_SPLITs are accurately triggered.")
.booleanConf
.createWithDefault(false)

val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.storage.hdfs.dir")
.categories("worker", "master", "client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.celeborn.common.meta

import java.io.File
import java.util
import java.util.concurrent.atomic.AtomicLong
import java.util.function.LongUnaryOperator

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
Expand All @@ -29,7 +31,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.util.{JavaUtils, Utils}
import org.apache.celeborn.common.util.Utils.runCommand
import org.apache.celeborn.common.util.Utils.{runCommand, userPort}

class DiskInfo(
val mountPoint: String,
Expand Down Expand Up @@ -88,6 +90,31 @@ class DiskInfo(
lazy val shuffleAllocations = new util.HashMap[String, Integer]()
lazy val applicationAllocations = new util.HashMap[String, Integer]()

@volatile
var transientAvailableBytes = new AtomicLong(actualUsableSpace)

def getTransientAvailableBytes: Long = {
transientAvailableBytes.get()
}

def acquireBytesFlushed(bytes: Long): Boolean = {
// Update only if transientAvailableBytes is greater than or equal to bytes to acquire, otherwise return false.
var updated = false
transientAvailableBytes.getAndUpdate(new LongUnaryOperator() {
override def applyAsLong(availableBytes: Long): Long = {
if (availableBytes >= bytes) {
updated = true
availableBytes - bytes
} else {
updated = false
availableBytes
}
}
})

updated
}

def setStorageType(storageType: StorageInfo.Type) = {
this.storageType = storageType
}
Expand All @@ -99,6 +126,7 @@ class DiskInfo(

def setUsableSpace(usableSpace: Long): this.type = this.synchronized {
this.actualUsableSpace = usableSpace
transientAvailableBytes = new AtomicLong(usableSpace)
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.celeborn.common.meta
import java.util

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.protocol.StorageInfo

class DeviceInfoSuite extends CelebornFunSuite {

Expand All @@ -38,4 +39,30 @@ class DeviceInfoSuite extends CelebornFunSuite {
assert(DeviceInfo.getMountPoint("/data/data", mountPoints) === "/data")
assert(DeviceInfo.getMountPoint("/data1/data", mountPoints) === "/")
}

test("Test diskInfo usableSpace accounting") {
val usableSpace = 1000L
val diskInfo = new DiskInfo(
"SSD",
usableSpace,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
StorageInfo.Type.SSD)

assert(diskInfo.actualUsableSpace === usableSpace)
assert(diskInfo.getTransientAvailableBytes === usableSpace)
assert(diskInfo.acquireBytesFlushed(100L), "Should be able to acquire 100 bytes")
assert(diskInfo.getTransientAvailableBytes === usableSpace - 100L)

assert(!diskInfo.acquireBytesFlushed(901L), "Should not be able to acquire 900 bytes")
assert(
diskInfo.getTransientAvailableBytes === usableSpace - 100L,
"Usable space should not change after failed acquire")

diskInfo.setUsableSpace(800L)
assert(
diskInfo.getTransientAvailableBytes === 800L,
"Usable space should reflect the new usable space")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.common.meta

import java.nio.file.Files

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.protocol.StorageInfo

class DiskFileInfoSuite extends CelebornFunSuite {

test("test diskFileInfoUsageAccounting positive + negative") {
val usableSpace = 1000L
val diskInfo = new DiskInfo(
"SSD",
usableSpace,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
StorageInfo.Type.SSD)
val tmpFilePath = Files.createTempFile("testDiskUsageAccounting", ".tmp")
val diskFileInfo = new DiskFileInfo(
diskInfo,
new UserIdentifier("tenant1", "user1"),
false,
new ReduceFileMeta(8192),
tmpFilePath.toString,
StorageInfo.Type.SSD)

diskFileInfo.updateBytesFlushed(100)
assert(diskFileInfo.getFileLength == 100, "file length should be updated to flushed bytes")
assert(
diskInfo.getTransientAvailableBytes == (usableSpace - 100),
"available bytes should be reduced by flushed bytes")

try {
diskFileInfo.updateBytesFlushed(901)
fail("should throw IllegalStateException when flush bytes exceed usable space")
} catch {
case _: IllegalStateException =>
assert(
diskInfo.getTransientAvailableBytes == usableSpace - 100,
"available bytes should not be reduced when flush bytes exceed usable space")
}
// The failed acquire should not affect the available bytes, and the successful acquire should reduce the available bytes
assert(
diskInfo.getTransientAvailableBytes == usableSpace - 100,
"available bytes should be reduced by flushed bytes")

// With null diskInfo, no exceptions during acquisition
val diskFileInfo2 = new DiskFileInfo(
null,
new UserIdentifier("tenant1", "user1"),
false,
new ReduceFileMeta(8192),
tmpFilePath.toString,
StorageInfo.Type.SSD)
diskFileInfo2.updateBytesFlushed(5000)
assert(diskFileInfo2.getFileLength == 5000, "file length should be updated to flushed bytes")
tmpFilePath.toFile.deleteOnExit()
}

test("Multi threaded acquisition") {
val usableSpace = 1000L
val diskInfo = new DiskInfo(
"SSD",
usableSpace,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
StorageInfo.Type.SSD)
val tmpFilePath = Files.createTempFile("testDiskUsageAccountingMultiThreaded", ".tmp")

val failures = new java.util.concurrent.atomic.AtomicInteger(0)
val totalSuccessfulAcquisition = new java.util.concurrent.atomic.AtomicInteger(0)
val perThreadFlushBytes = 101

val threads = (1 to 10).map { _ =>
new Thread(new Runnable {
override def run(): Unit = {
try {
val diskFileInfo = new DiskFileInfo(
diskInfo,
new UserIdentifier("tenant1", "user1"),
false,
new ReduceFileMeta(8192),
tmpFilePath.toString,
StorageInfo.Type.SSD)
diskFileInfo.updateBytesFlushed(perThreadFlushBytes)
totalSuccessfulAcquisition.addAndGet(perThreadFlushBytes)
} catch {
case _: IllegalStateException =>
failures.incrementAndGet()
// expected when flush bytes exceed usable space
}
}
})
}

threads.foreach(_.start())
threads.foreach(_.join())

// Only the first 6 threads should succeed in flushing 150 bytes each (total 900 bytes), and the rest should fail due to insufficient space
assert(
diskInfo.getTransientAvailableBytes == (usableSpace - totalSuccessfulAcquisition.get()),
"available bytes should be reduced by flushed bytes")
assert(diskInfo.getTransientAvailableBytes > 0, "available bytes should not be negative")
assert(failures.get() == 1, "1 threads should fail due to insufficient space")
}

}
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ license: |
| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | false | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToResume | 0.7 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | |
| celeborn.worker.disk.clean.threads | 4 | false | Thread number of worker to clean up directories of expired shuffle keys on disk. | 0.3.2 | |
| celeborn.worker.disk.storage.strictReserve.enabled | false | false | Whether to enable strict bookkeeping for worker's disk storage.With this set to true, data wrtiers try to acquire storage space before each flush,ensuring that disk full based HARD_SPLITs are accurately triggered. | 0.6.0 | |
| celeborn.worker.fetch.heartbeat.enabled | false | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | |
| celeborn.worker.fetch.io.threads | &lt;undefined&gt; | false | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | |
| celeborn.worker.fetch.port | 0 | false | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 | |
Expand Down
Loading
Loading