diff --git a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java index d4571fa4bbe..86f8cb1353e 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java @@ -39,18 +39,30 @@ public class DiskFileInfo extends FileInfo { private static final Logger logger = LoggerFactory.getLogger(DiskFileInfo.class); private final String filePath; private final StorageInfo.Type storageType; + private final DiskInfo diskInfo; public DiskFileInfo( + DiskInfo diskInfo, UserIdentifier userIdentifier, boolean partitionSplitEnabled, FileMeta fileMeta, String filePath, StorageInfo.Type storageType) { super(userIdentifier, partitionSplitEnabled, fileMeta); + this.diskInfo = diskInfo; this.filePath = filePath; this.storageType = storageType; } + public DiskFileInfo( + UserIdentifier userIdentifier, + boolean partitionSplitEnabled, + FileMeta fileMeta, + String filePath, + StorageInfo.Type storageType) { + this(null, userIdentifier, partitionSplitEnabled, fileMeta, filePath, storageType); + } + // only called when restore from pb or in UT public DiskFileInfo( UserIdentifier userIdentifier, @@ -60,6 +72,13 @@ public DiskFileInfo( StorageInfo.Type storageType, long bytesFlushed) { super(userIdentifier, partitionSplitEnabled, fileMeta); + + // TODO: Figure out a way to map the right diskInfo when restoring from pb, + // currently we just set it to null and skip the acquireBytesFlushed logic in + // DiskFileInfo#acquireBytesFlushed + // However during graceful shutdown, we likley have already hard split therefore no more writes + // to this file. + this.diskInfo = null; this.filePath = filePath; if (storageType != null) { this.storageType = storageType; @@ -83,6 +102,9 @@ public DiskFileInfo(UserIdentifier userIdentifier, FileMeta fileMeta, String fil super(userIdentifier, true, fileMeta); this.filePath = filePath; this.storageType = StorageInfo.Type.HDD; + + // Only used by sorter, hence we know no diskInfo acquires needed + this.diskInfo = null; } public File getFile() { @@ -94,6 +116,15 @@ public String getFilePath() { return filePath; } + @Override + protected boolean canAcquireBytes(long bytes) { + if (diskInfo != null) { + return diskInfo.acquireBytesFlushed(bytes); + } else { + return true; + } + } + public String getSortedPath() { return Utils.getSortedFilePath(filePath); } diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index e8511f1bff1..ef9a75e5804 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -30,6 +30,7 @@ public abstract class FileInfo { protected FileMeta fileMeta; protected final Set streams = ConcurrentHashMap.newKeySet(); protected volatile long bytesFlushed; + protected volatile long acquiredBytes; private boolean isReduceFileMeta; public FileInfo(UserIdentifier userIdentifier, boolean partitionSplitEnabled, FileMeta fileMeta) { @@ -56,13 +57,31 @@ public long getFileLength() { return bytesFlushed; } + public long getAcquiredBytes() { + return acquiredBytes; + } + public synchronized void updateBytesFlushed(long bytes) { + acquireBytes(bytes); bytesFlushed += bytes; if (isReduceFileMeta) { getReduceFileMeta().updateChunkOffset(bytesFlushed, false); } } + public synchronized void acquireBytes(long bytes) { + if (!canAcquireBytes(bytes)) { + throw new IllegalStateException( + "Failed to acquire bytes for file: " + + getFilePath() + + ", current bytesFlushed: " + + bytesFlushed + + ", trying to acquire: " + + bytes); + } + acquiredBytes += bytes; + } + public UserIdentifier getUserIdentifier() { return userIdentifier; } @@ -112,4 +131,6 @@ public boolean isStreamsEmpty() { public boolean isReduceFileMeta() { return isReduceFileMeta; } + + protected abstract boolean canAcquireBytes(long bytes); } diff --git a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java index 9b933ea6c00..017ec5d781e 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java @@ -85,4 +85,10 @@ public int releaseMemoryBuffers() { public String getFilePath() { return ""; } + + @Override + protected boolean canAcquireBytes(long bytes) { + // NO-OP + return true; + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 798099bcbd2..6ed49d8bf6f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1319,6 +1319,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT) def workerStorageBaseDirDiskType: String = get(WORKER_STORAGE_BASE_DIR_DISK_TYPE) def workerStorageExpireDirTimeout: Long = get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT) + def workerDiskStorageStrictReserveEnabled: Boolean = + get(WORKER_DISK_STORAGE_STRICT_RESERVE_ENABLED) def creditStreamThreadsPerMountpoint: Int = get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT) def workerDirectMemoryRatioForReadBuffer: Double = get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER) def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN) @@ -3313,6 +3315,16 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1h") + val WORKER_DISK_STORAGE_STRICT_RESERVE_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.disk.storage.strictReserve.enabled") + .categories("worker") + .version("0.6.0") + .doc("Whether to enable strict bookkeeping for worker's disk storage." + + "With this set to true, data wrtiers try to acquire storage space before each flush," + + "ensuring that disk full based HARD_SPLITs are accurately triggered.") + .booleanConf + .createWithDefault(false) + val HDFS_DIR: OptionalConfigEntry[String] = buildConf("celeborn.storage.hdfs.dir") .categories("worker", "master", "client") diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index f66df671116..8b138ba980f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -19,6 +19,8 @@ package org.apache.celeborn.common.meta import java.io.File import java.util +import java.util.concurrent.atomic.AtomicLong +import java.util.function.LongUnaryOperator import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -29,7 +31,7 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.StorageInfo import org.apache.celeborn.common.util.{JavaUtils, Utils} -import org.apache.celeborn.common.util.Utils.runCommand +import org.apache.celeborn.common.util.Utils.{runCommand, userPort} class DiskInfo( val mountPoint: String, @@ -88,6 +90,31 @@ class DiskInfo( lazy val shuffleAllocations = new util.HashMap[String, Integer]() lazy val applicationAllocations = new util.HashMap[String, Integer]() + @volatile + var transientAvailableBytes = new AtomicLong(actualUsableSpace) + + def getTransientAvailableBytes: Long = { + transientAvailableBytes.get() + } + + def acquireBytesFlushed(bytes: Long): Boolean = { + // Update only if transientAvailableBytes is greater than or equal to bytes to acquire, otherwise return false. + var updated = false + transientAvailableBytes.getAndUpdate(new LongUnaryOperator() { + override def applyAsLong(availableBytes: Long): Long = { + if (availableBytes >= bytes) { + updated = true + availableBytes - bytes + } else { + updated = false + availableBytes + } + } + }) + + updated + } + def setStorageType(storageType: StorageInfo.Type) = { this.storageType = storageType } @@ -99,6 +126,7 @@ class DiskInfo( def setUsableSpace(usableSpace: Long): this.type = this.synchronized { this.actualUsableSpace = usableSpace + transientAvailableBytes = new AtomicLong(usableSpace) this } diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala index 250c20c3cbb..943545cf1a8 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala @@ -20,6 +20,7 @@ package org.apache.celeborn.common.meta import java.util import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.protocol.StorageInfo class DeviceInfoSuite extends CelebornFunSuite { @@ -38,4 +39,30 @@ class DeviceInfoSuite extends CelebornFunSuite { assert(DeviceInfo.getMountPoint("/data/data", mountPoints) === "/data") assert(DeviceInfo.getMountPoint("/data1/data", mountPoints) === "/") } + + test("Test diskInfo usableSpace accounting") { + val usableSpace = 1000L + val diskInfo = new DiskInfo( + "SSD", + usableSpace, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + StorageInfo.Type.SSD) + + assert(diskInfo.actualUsableSpace === usableSpace) + assert(diskInfo.getTransientAvailableBytes === usableSpace) + assert(diskInfo.acquireBytesFlushed(100L), "Should be able to acquire 100 bytes") + assert(diskInfo.getTransientAvailableBytes === usableSpace - 100L) + + assert(!diskInfo.acquireBytesFlushed(901L), "Should not be able to acquire 900 bytes") + assert( + diskInfo.getTransientAvailableBytes === usableSpace - 100L, + "Usable space should not change after failed acquire") + + diskInfo.setUsableSpace(800L) + assert( + diskInfo.getTransientAvailableBytes === 800L, + "Usable space should reflect the new usable space") + } } diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala new file mode 100644 index 00000000000..79a7f34f46f --- /dev/null +++ b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta + +import java.nio.file.Files + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.identity.UserIdentifier +import org.apache.celeborn.common.protocol.StorageInfo + +class DiskFileInfoSuite extends CelebornFunSuite { + + test("test diskFileInfoUsageAccounting positive + negative") { + val usableSpace = 1000L + val diskInfo = new DiskInfo( + "SSD", + usableSpace, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + StorageInfo.Type.SSD) + val tmpFilePath = Files.createTempFile("testDiskUsageAccounting", ".tmp") + val diskFileInfo = new DiskFileInfo( + diskInfo, + new UserIdentifier("tenant1", "user1"), + false, + new ReduceFileMeta(8192), + tmpFilePath.toString, + StorageInfo.Type.SSD) + + diskFileInfo.updateBytesFlushed(100) + assert(diskFileInfo.getFileLength == 100, "file length should be updated to flushed bytes") + assert( + diskInfo.getTransientAvailableBytes == (usableSpace - 100), + "available bytes should be reduced by flushed bytes") + + try { + diskFileInfo.updateBytesFlushed(901) + fail("should throw IllegalStateException when flush bytes exceed usable space") + } catch { + case _: IllegalStateException => + assert( + diskInfo.getTransientAvailableBytes == usableSpace - 100, + "available bytes should not be reduced when flush bytes exceed usable space") + } + // The failed acquire should not affect the available bytes, and the successful acquire should reduce the available bytes + assert( + diskInfo.getTransientAvailableBytes == usableSpace - 100, + "available bytes should be reduced by flushed bytes") + + // With null diskInfo, no exceptions during acquisition + val diskFileInfo2 = new DiskFileInfo( + null, + new UserIdentifier("tenant1", "user1"), + false, + new ReduceFileMeta(8192), + tmpFilePath.toString, + StorageInfo.Type.SSD) + diskFileInfo2.updateBytesFlushed(5000) + assert(diskFileInfo2.getFileLength == 5000, "file length should be updated to flushed bytes") + tmpFilePath.toFile.deleteOnExit() + } + + test("Multi threaded acquisition") { + val usableSpace = 1000L + val diskInfo = new DiskInfo( + "SSD", + usableSpace, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + StorageInfo.Type.SSD) + val tmpFilePath = Files.createTempFile("testDiskUsageAccountingMultiThreaded", ".tmp") + + val failures = new java.util.concurrent.atomic.AtomicInteger(0) + val totalSuccessfulAcquisition = new java.util.concurrent.atomic.AtomicInteger(0) + val perThreadFlushBytes = 101 + + val threads = (1 to 10).map { _ => + new Thread(new Runnable { + override def run(): Unit = { + try { + val diskFileInfo = new DiskFileInfo( + diskInfo, + new UserIdentifier("tenant1", "user1"), + false, + new ReduceFileMeta(8192), + tmpFilePath.toString, + StorageInfo.Type.SSD) + diskFileInfo.updateBytesFlushed(perThreadFlushBytes) + totalSuccessfulAcquisition.addAndGet(perThreadFlushBytes) + } catch { + case _: IllegalStateException => + failures.incrementAndGet() + // expected when flush bytes exceed usable space + } + } + }) + } + + threads.foreach(_.start()) + threads.foreach(_.join()) + + // Only the first 6 threads should succeed in flushing 150 bytes each (total 900 bytes), and the rest should fail due to insufficient space + assert( + diskInfo.getTransientAvailableBytes == (usableSpace - totalSuccessfulAcquisition.get()), + "available bytes should be reduced by flushed bytes") + assert(diskInfo.getTransientAvailableBytes > 0, "available bytes should not be negative") + assert(failures.get() == 1, "1 threads should fail due to insufficient space") + } + +} diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index bb2cec89bc3..0e16a9fb550 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -85,6 +85,7 @@ license: | | celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | false | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | | | celeborn.worker.directMemoryRatioToResume | 0.7 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | | | celeborn.worker.disk.clean.threads | 4 | false | Thread number of worker to clean up directories of expired shuffle keys on disk. | 0.3.2 | | +| celeborn.worker.disk.storage.strictReserve.enabled | false | false | Whether to enable strict bookkeeping for worker's disk storage.With this set to true, data wrtiers try to acquire storage space before each flush,ensuring that disk full based HARD_SPLITs are accurately triggered. | 0.6.0 | | | celeborn.worker.fetch.heartbeat.enabled | false | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | | | celeborn.worker.fetch.io.threads | <undefined> | false | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | | | celeborn.worker.fetch.port | 0 | false | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 | | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index cdfb95c1e59..24e29cfcef3 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -863,6 +863,7 @@ private long transferBlock(long offset, long length) throws IOException { if (isDfs) { return transferStreamFully(dfsOriginInput, dfsSortedOutput, offset, length); } else { + originFileInfo.acquireBytes(length); return transferChannelFully(originFileChannel, sortedFileChannel, offset, length); } } @@ -876,6 +877,8 @@ public void deleteOriginFiles() throws IOException { } if (!deleteSuccess) { logger.warn("Clean origin file failed, origin file is : {}", originFilePath); + } else { + originFileInfo.acquireBytes(-originFileLen); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 50f085bf85e..af899b35668 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1430,7 +1430,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler if (flusher.isInstanceOf[LocalFlusher]) { val mountPoint = flusher.asInstanceOf[LocalFlusher].mountPoint val diskInfo = workerInfo.diskInfos.get(mountPoint) - diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <= 0 + diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <= 0 || + diskInfo.getTransientAvailableBytes <= 0 } else { false } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 6d375932b82..df7aa503b0d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -75,6 +75,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val remoteStorageDirs = conf.remoteStorageDirs val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout + val diskStorageStrictReserveEnabled = conf.workerDiskStorageStrictReserveEnabled val storagePolicy = new StoragePolicy(conf, this, workerSource) val diskReserveSize = conf.workerDiskReserveSize @@ -110,6 +111,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } } + def healthyWorkingDirsWithDiskInfo(): List[(DiskInfo, File)] = + disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(diskInfo => + diskInfo.dirs.map(dir => (diskInfo, dir))) + def healthyWorkingDirs(): List[File] = disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs) @@ -955,7 +960,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val writers = workingDirWriters.get(dir) if (writers != null) { writers.synchronized { - writers.values.asScala.map(_.getDiskFileInfo.getFileLength).sum + writers.values.asScala.map(_.getDiskFileInfo.getAcquiredBytes).sum } } else { 0 @@ -1138,9 +1143,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) while (retryCount < conf.workerCreateWriterMaxAttempts) { val diskInfo = diskInfos.get(suggestedMountPoint) - val dirs = + val dirsWithDiskInfos: List[(DiskInfo, File)] = if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) { - diskInfo.dirs + diskInfo.dirs.map(dir => (diskInfo, dir)).toList } else { if (suggestedMountPoint.isEmpty) { logDebug(s"Location suggestedMountPoint is not set, return all healthy working dirs.") @@ -1148,9 +1153,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" + s" working dirs.") } - healthyWorkingDirs() + healthyWorkingDirsWithDiskInfo() } - if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) { + if (dirsWithDiskInfos.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) { throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") } @@ -1205,10 +1210,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs fileName, ossFileInfo) return (ossFlusher.get, ossFileInfo, null) - } else if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) { - val dir = dirs(getNextIndex % dirs.size) - val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints) - val shuffleDir = new File(dir, s"$appId/$shuffleId") + } else if (dirsWithDiskInfos.nonEmpty && location.getStorageInfo.localDiskAvailable()) { + val dir = dirsWithDiskInfos(getNextIndex % dirsWithDiskInfos.size) + val mountPoint = DeviceInfo.getMountPoint(dir._2.getAbsolutePath, mountPoints) + val shuffleDir = new File(dir._2, s"$appId/$shuffleId") shuffleDir.mkdirs() val file = new File(shuffleDir, fileName) try { @@ -1226,6 +1231,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val fileMeta = getFileMeta(partitionType, mountPoint, conf.shuffleChunkSize) val storageType = diskInfos.get(mountPoint).storageType val diskFileInfo = new DiskFileInfo( + if (diskStorageStrictReserveEnabled) diskInfo else null, userIdentifier, partitionSplitEnabled, fileMeta, @@ -1238,7 +1244,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs return ( localFlushers.get(mountPoint), diskFileInfo, - dir) + dir._2) } catch { case fe: FileAlreadyExistsException => logError("Failed to create fileWriter because of existed file", fe)