Skip to content

Commit 3c44a55

Browse files
committed
[core] Apply 'file-operation-thread-num' to commit
1 parent 1f937be commit 3c44a55

File tree

10 files changed

+39
-42
lines changed

10 files changed

+39
-42
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,6 @@
362362
<td>Boolean</td>
363363
<td>Enable data file thin mode to avoid duplicate columns storage.</td>
364364
</tr>
365-
<tr>
366-
<td><h5>delete-file.thread-num</h5></td>
367-
<td style="word-wrap: break-word;">(none)</td>
368-
<td>Integer</td>
369-
<td>The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</td>
370-
</tr>
371365
<tr>
372366
<td><h5>delete.force-produce-changelog</h5></td>
373367
<td style="word-wrap: break-word;">false</td>
@@ -452,6 +446,12 @@
452446
<td>Boolean</td>
453447
<td>Whether enabled read file index.</td>
454448
</tr>
449+
<tr>
450+
<td><h5>file-operation.thread-num</h5></td>
451+
<td style="word-wrap: break-word;">(none)</td>
452+
<td>Integer</td>
453+
<td>The maximum number of concurrent file operations. By default is the number of processors available to the Java virtual machine.</td>
454+
</tr>
455455
<tr>
456456
<td><h5>file-reader-async-threshold</h5></td>
457457
<td style="word-wrap: break-word;">10 mb</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,12 +1769,13 @@ public InlineElement getDescription() {
17691769
+ "a forced lookup compaction will be performed to flush L0 files to higher level. "
17701770
+ "This option is only valid when lookup-compact mode is gentle.");
17711771

1772-
public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
1773-
key("delete-file.thread-num")
1772+
public static final ConfigOption<Integer> FILE_OPERATION_THREAD_NUM =
1773+
key("file-operation.thread-num")
17741774
.intType()
17751775
.noDefaultValue()
1776+
.withFallbackKeys("delete-file.thread-num")
17761777
.withDescription(
1777-
"The maximum number of concurrent deleting files. "
1778+
"The maximum number of concurrent file operations. "
17781779
+ "By default is the number of processors available to the Java virtual machine.");
17791780

17801781
public static final ConfigOption<String> SCAN_FALLBACK_BRANCH =
@@ -2266,8 +2267,8 @@ public boolean cleanEmptyDirectories() {
22662267
return options.get(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES);
22672268
}
22682269

2269-
public int deleteFileThreadNum() {
2270-
return options.getOptional(DELETE_FILE_THREAD_NUM)
2270+
public int fileOperationThreadNum() {
2271+
return options.getOptional(FILE_OPERATION_THREAD_NUM)
22712272
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
22722273
}
22732274

paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java renamed to paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424

2525
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
2626

27-
/** Thread pool to delete files using {@link FileIO}. */
28-
public class FileDeletionThreadPool {
27+
/** Thread pool to operate files using {@link FileIO}. */
28+
public class FileOperationThreadPool {
2929

30-
private static final String THREAD_NAME = "DELETE-FILE-THREAD-POOL";
30+
private static final String THREAD_NAME = "FILE-OPERATION-THREAD-POOL";
3131

3232
private static ThreadPoolExecutor executorService =
3333
createCachedThreadPool(Runtime.getRuntime().availableProcessors(), THREAD_NAME);

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ public SnapshotDeletion newSnapshotDeletion() {
312312
newStatsFileHandler(),
313313
options.changelogProducer() != CoreOptions.ChangelogProducer.NONE,
314314
options.cleanEmptyDirectories(),
315-
options.deleteFileThreadNum());
315+
options.fileOperationThreadNum());
316316
}
317317

318318
@Override
@@ -325,7 +325,7 @@ public ChangelogDeletion newChangelogDeletion() {
325325
newIndexFileHandler(),
326326
newStatsFileHandler(),
327327
options.cleanEmptyDirectories(),
328-
options.deleteFileThreadNum());
328+
options.fileOperationThreadNum());
329329
}
330330

331331
@Override
@@ -343,7 +343,7 @@ public TagDeletion newTagDeletion() {
343343
newIndexFileHandler(),
344344
newStatsFileHandler(),
345345
options.cleanEmptyDirectories(),
346-
options.deleteFileThreadNum());
346+
options.fileOperationThreadNum());
347347
}
348348

349349
public abstract Comparator<InternalRow> newKeyComparator();

paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.apache.paimon.manifest.ManifestList;
3636
import org.apache.paimon.stats.StatsFileHandler;
3737
import org.apache.paimon.utils.DataFilePathFactories;
38-
import org.apache.paimon.utils.FileDeletionThreadPool;
38+
import org.apache.paimon.utils.FileOperationThreadPool;
3939
import org.apache.paimon.utils.FileStorePathFactory;
4040
import org.apache.paimon.utils.Pair;
4141
import org.apache.paimon.utils.SnapshotManager;
@@ -102,7 +102,7 @@ public FileDeletionBase(
102102
this.statsFileHandler = statsFileHandler;
103103
this.cleanEmptyDirectories = cleanEmptyDirectories;
104104
this.deletionBuckets = new HashMap<>();
105-
this.deleteFileExecutor = FileDeletionThreadPool.getExecutorService(deleteFileThreadNum);
105+
this.deleteFileExecutor = FileOperationThreadPool.getExecutorService(deleteFileThreadNum);
106106
}
107107

108108
/**

paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.paimon.table.FileStoreTable;
2626
import org.apache.paimon.table.source.DataSplit;
2727
import org.apache.paimon.table.source.Split;
28+
import org.apache.paimon.utils.FileOperationThreadPool;
2829
import org.apache.paimon.utils.FileStorePathFactory;
2930
import org.apache.paimon.utils.ThreadPoolUtils;
3031

@@ -38,8 +39,6 @@
3839
import java.util.Map;
3940
import java.util.concurrent.ThreadPoolExecutor;
4041

41-
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
42-
4342
/** List what data files recorded in manifests are missing from the filesystem. */
4443
public class ListUnexistingFiles {
4544

@@ -51,8 +50,8 @@ public ListUnexistingFiles(FileStoreTable table) {
5150
this.table = table;
5251
this.pathFactory = table.store().pathFactory();
5352
this.executor =
54-
createCachedThreadPool(
55-
table.coreOptions().deleteFileThreadNum(), "LIST_UNEXISTING_FILES");
53+
FileOperationThreadPool.getExecutorService(
54+
table.coreOptions().fileOperationThreadNum());
5655
}
5756

5857
public Map<Integer, Map<String, DataFileMeta>> list(BinaryRow partition) throws Exception {

paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis, boolean
8989
this.deleteFiles = new ArrayList<>();
9090
this.executor =
9191
createCachedThreadPool(
92-
table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN");
92+
table.coreOptions().fileOperationThreadNum(), "ORPHAN_FILES_CLEAN");
9393
this.dryRun = dryRun;
9494
}
9595

@@ -276,7 +276,7 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
276276
: new HashMap<String, String>() {
277277
{
278278
put(
279-
CoreOptions.DELETE_FILE_THREAD_NUM.key(),
279+
CoreOptions.FILE_OPERATION_THREAD_NUM.key(),
280280
parallelism.toString());
281281
}
282282
};

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,8 @@ public TableCommitImpl newCommit(String commitUser) {
455455
new ConsumerManager(fileIO, path, snapshotManager().branch()),
456456
options.snapshotExpireExecutionMode(),
457457
name(),
458-
options.forceCreatingSnapshot());
458+
options.forceCreatingSnapshot(),
459+
options.fileOperationThreadNum());
459460
}
460461

461462
@Override

paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.paimon.utils.CompactedChangelogPathResolver;
3737
import org.apache.paimon.utils.DataFilePathFactories;
3838
import org.apache.paimon.utils.ExecutorThreadFactory;
39+
import org.apache.paimon.utils.FileOperationThreadPool;
3940
import org.apache.paimon.utils.IndexFilePathFactories;
4041

4142
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -57,6 +58,7 @@
5758
import java.util.Map;
5859
import java.util.concurrent.ExecutorService;
5960
import java.util.concurrent.Executors;
61+
import java.util.concurrent.ThreadPoolExecutor;
6062
import java.util.concurrent.atomic.AtomicReference;
6163
import java.util.function.Consumer;
6264
import java.util.function.Predicate;
@@ -66,7 +68,6 @@
6668
import static java.util.Collections.singletonList;
6769
import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
6870
import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
69-
import static org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
7071
import static org.apache.paimon.utils.Preconditions.checkState;
7172
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
7273

@@ -79,18 +80,16 @@ public class TableCommitImpl implements InnerTableCommit {
7980
@Nullable private final Runnable expireSnapshots;
8081
@Nullable private final PartitionExpire partitionExpire;
8182
@Nullable private final TagAutoManager tagAutoManager;
82-
8383
@Nullable private final Duration consumerExpireTime;
8484
private final ConsumerManager consumerManager;
85-
8685
private final ExecutorService maintainExecutor;
8786
private final AtomicReference<Throwable> maintainError;
88-
8987
private final String tableName;
88+
private final boolean forceCreatingSnapshot;
89+
private final ThreadPoolExecutor fileCheckExecutor;
9090

9191
@Nullable private Map<String, String> overwritePartition = null;
9292
private boolean batchCommitted = false;
93-
private final boolean forceCreatingSnapshot;
9493
private boolean expireForEmptyCommit = true;
9594

9695
public TableCommitImpl(
@@ -102,7 +101,8 @@ public TableCommitImpl(
102101
ConsumerManager consumerManager,
103102
ExpireExecutionMode expireExecutionMode,
104103
String tableName,
105-
boolean forceCreatingSnapshot) {
104+
boolean forceCreatingSnapshot,
105+
int threadNum) {
106106
if (partitionExpire != null) {
107107
commit.withPartitionExpire(partitionExpire);
108108
}
@@ -125,6 +125,7 @@ public TableCommitImpl(
125125

126126
this.tableName = tableName;
127127
this.forceCreatingSnapshot = forceCreatingSnapshot;
128+
this.fileCheckExecutor = FileOperationThreadPool.getExecutorService(threadNum);
128129
}
129130

130131
public boolean forceCreatingSnapshot() {
@@ -294,17 +295,12 @@ private void checkFilesExistence(List<ManifestCommittable> committables) {
294295
msg.newFilesIncrement().newIndexFiles().stream()
295296
.map(indexFileFactory::toPath)
296297
.forEach(files::add);
297-
msg.newFilesIncrement().deletedIndexFiles().stream()
298-
.map(indexFileFactory::toPath)
299-
.forEach(files::add);
300-
msg.compactIncrement().compactBefore().forEach(collector);
301298
msg.compactIncrement().compactAfter().forEach(collector);
302299
msg.compactIncrement().newIndexFiles().stream()
303300
.map(indexFileFactory::toPath)
304301
.forEach(files::add);
305-
msg.compactIncrement().deletedIndexFiles().stream()
306-
.map(indexFileFactory::toPath)
307-
.forEach(files::add);
302+
303+
// skip compact before files, deleted index files
308304
}
309305
}
310306

@@ -329,7 +325,7 @@ private void checkFilesExistence(List<ManifestCommittable> committables) {
329325
List<Path> nonExistFiles =
330326
Lists.newArrayList(
331327
randomlyExecuteSequentialReturn(
332-
getExecutorService(null),
328+
fileCheckExecutor,
333329
f -> nonExists.test(f) ? singletonList(f) : emptyList(),
334330
resolvedFiles));
335331

paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ public void testDataFileSkippingSetException() throws Exception {
751751
store.newStatsFileHandler(),
752752
store.options().changelogProducer() != CoreOptions.ChangelogProducer.NONE,
753753
store.options().cleanEmptyDirectories(),
754-
store.options().deleteFileThreadNum());
754+
store.options().fileOperationThreadNum());
755755

756756
ExpireSnapshots expireSnapshots =
757757
new ExpireSnapshotsImpl(
@@ -816,7 +816,7 @@ public void testManifestFileSkippingSetException() throws Exception {
816816
store.newStatsFileHandler(),
817817
store.options().changelogProducer() != CoreOptions.ChangelogProducer.NONE,
818818
store.options().cleanEmptyDirectories(),
819-
store.options().deleteFileThreadNum());
819+
store.options().fileOperationThreadNum());
820820
ExpireSnapshots expireSnapshots =
821821
new ExpireSnapshotsImpl(
822822
snapshotManager, changelogManager, snapshotDeletion, tagManager);

0 commit comments

Comments
 (0)