Skip to content

Commit 5c79dc5

Browse files
committed
fix
1 parent 47b324f commit 5c79dc5

12 files changed

Lines changed: 155 additions & 112 deletions

File tree

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@
586586
</tr>
587587
<tr>
588588
<td><h5>file-cache.max-size</h5></td>
589-
<td style="word-wrap: break-word;">9223372036854775807 bytes</td>
589+
<td style="word-wrap: break-word;">(none)</td>
590590
<td>MemorySize</td>
591591
<td>Maximum total size of the local disk block cache. Unlimited by default.</td>
592592
</tr>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,7 @@ public InlineElement getDescription() {
727727
public static final ConfigOption<MemorySize> FILE_CACHE_MAX_SIZE =
728728
key("file-cache.max-size")
729729
.memoryType()
730-
.defaultValue(MemorySize.MAX_VALUE)
730+
.noDefaultValue()
731731
.withDescription(
732732
"Maximum total size of the local disk block cache. Unlimited by default.");
733733

@@ -2931,6 +2931,7 @@ public String fileCacheDir() {
29312931
return options.get(FILE_CACHE_DIR);
29322932
}
29332933

2934+
@Nullable
29342935
public MemorySize fileCacheMaxSize() {
29352936
return options.get(FILE_CACHE_MAX_SIZE);
29362937
}

paimon-common/src/main/java/org/apache/paimon/fs/cache/BlockDiskCache.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public void putBlock(String filePath, int blockIndex, byte[] data) {
9696
}
9797
if (!tmpFile.renameTo(path)) {
9898
tmpFile.delete();
99+
// another thread won the race — don't update currentSize
99100
return;
100101
}
101102
} catch (IOException e) {
@@ -138,13 +139,20 @@ private void evict() {
138139

139140
entries.sort(Comparator.comparingLong(e -> e.mtime));
140141

142+
List<CacheEntry> toDelete = new ArrayList<>();
141143
synchronized (lock) {
142144
for (CacheEntry entry : entries) {
143145
if (currentSize <= maxSizeBytes) {
144146
break;
145147
}
146-
if (entry.file.delete()) {
147-
currentSize -= entry.size;
148+
toDelete.add(entry);
149+
currentSize -= entry.size;
150+
}
151+
}
152+
for (CacheEntry entry : toDelete) {
153+
if (!entry.file.delete()) {
154+
synchronized (lock) {
155+
currentSize += entry.size;
148156
}
149157
}
150158
}

paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ public SeekableInputStream newInputStream(Path path) throws IOException {
5959
if (!whitelist.contains(fileType) || FileType.isMutable(path)) {
6060
return delegate.newInputStream(path);
6161
}
62-
long fileSize = delegate.getFileStatus(path).getLen();
63-
return new CachingSeekableInputStream(delegate, path, fileSize, cache);
62+
return new CachingSeekableInputStream(delegate, path, cache);
6463
}
6564

6665
@Override

paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingSeekableInputStream.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,28 @@ public class CachingSeekableInputStream extends SeekableInputStream {
3131

3232
private final FileIO fileIO;
3333
private final Path path;
34-
private final long fileSize;
3534
private final BlockDiskCache cache;
3635
private long pos;
36+
private long fileSize = -1;
3737
@Nullable private SeekableInputStream remoteStream;
3838

39-
public CachingSeekableInputStream(
40-
FileIO fileIO, Path path, long fileSize, BlockDiskCache cache) {
39+
public CachingSeekableInputStream(FileIO fileIO, Path path, BlockDiskCache cache) {
4140
this.fileIO = fileIO;
4241
this.path = path;
43-
this.fileSize = fileSize;
4442
this.cache = cache;
4543
this.pos = 0;
4644
}
4745

46+
private long fileSize() throws IOException {
47+
if (fileSize == -1) {
48+
fileSize = fileIO.getFileStatus(path).getLen();
49+
}
50+
return fileSize;
51+
}
52+
4853
@Override
4954
public void seek(long desired) throws IOException {
50-
this.pos = Math.max(0, Math.min(desired, fileSize));
55+
this.pos = Math.max(0, Math.min(desired, fileSize()));
5156
}
5257

5358
@Override
@@ -57,7 +62,7 @@ public long getPos() throws IOException {
5762

5863
@Override
5964
public int read() throws IOException {
60-
if (pos >= fileSize) {
65+
if (pos >= fileSize()) {
6166
return -1;
6267
}
6368
int blockSize = cache.blockSize();
@@ -73,12 +78,12 @@ public int read(byte[] b, int off, int len) throws IOException {
7378
if (len == 0) {
7479
return 0;
7580
}
76-
if (pos >= fileSize) {
81+
if (pos >= fileSize()) {
7782
return -1;
7883
}
7984

8085
int blockSize = cache.blockSize();
81-
long end = Math.min(pos + len, fileSize);
86+
long end = Math.min(pos + len, fileSize());
8287
int totalRead = 0;
8388

8489
while (pos < end) {
@@ -106,7 +111,7 @@ private byte[] readBlock(int blockIndex) throws IOException {
106111

107112
int blockSize = cache.blockSize();
108113
long offset = (long) blockIndex * blockSize;
109-
int readSize = (int) Math.min(blockSize, fileSize - offset);
114+
int readSize = (int) Math.min(blockSize, fileSize() - offset);
110115

111116
SeekableInputStream stream = getRemoteStream();
112117
stream.seek(offset);

paimon-common/src/main/java/org/apache/paimon/utils/FileType.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
import org.apache.paimon.fs.Path;
2222

23+
import java.util.EnumSet;
24+
import java.util.Set;
25+
2326
/**
2427
* Classification of Paimon files.
2528
*
@@ -70,6 +73,32 @@ public boolean isIndex() {
7073
return this == BUCKET_INDEX || this == GLOBAL_INDEX || this == FILE_INDEX;
7174
}
7275

76+
/** Parse a comma-separated whitelist string into a set of {@link FileType}s. */
77+
public static Set<FileType> parseWhitelist(String whitelist) {
78+
Set<FileType> result = EnumSet.noneOf(FileType.class);
79+
for (String name : whitelist.split(",")) {
80+
name = name.trim();
81+
switch (name) {
82+
case "meta":
83+
result.add(META);
84+
break;
85+
case "global-index":
86+
result.add(GLOBAL_INDEX);
87+
break;
88+
case "bucket-index":
89+
result.add(BUCKET_INDEX);
90+
break;
91+
case "data":
92+
result.add(DATA);
93+
break;
94+
case "file-index":
95+
result.add(FILE_INDEX);
96+
break;
97+
}
98+
}
99+
return result;
100+
}
101+
73102
/** Returns {@code true} if the file is mutable and should not be cached. */
74103
public static boolean isMutable(Path filePath) {
75104
String name = filePath.getName();

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.paimon.manifest.ManifestFileMeta;
3333
import org.apache.paimon.operation.FileStoreScan;
3434
import org.apache.paimon.options.ExpireConfig;
35+
import org.apache.paimon.options.MemorySize;
3536
import org.apache.paimon.options.Options;
3637
import org.apache.paimon.predicate.Predicate;
3738
import org.apache.paimon.schema.SchemaManager;
@@ -78,7 +79,6 @@
7879
import java.io.IOException;
7980
import java.io.UncheckedIOException;
8081
import java.time.Duration;
81-
import java.util.EnumSet;
8282
import java.util.HashMap;
8383
import java.util.List;
8484
import java.util.Map;
@@ -138,32 +138,12 @@ private static FileIO wrapWithCachingIfNeeded(FileIO fileIO, TableSchema tableSc
138138
+ java.io.File.separator
139139
+ "paimon-file-cache";
140140
}
141-
long maxSize = options.fileCacheMaxSize().getBytes();
141+
MemorySize maxSizeOpt = options.fileCacheMaxSize();
142+
long maxSize = maxSizeOpt == null ? Long.MAX_VALUE : maxSizeOpt.getBytes();
142143
int blockSize = (int) options.fileCacheBlockSize().getBytes();
143144
BlockDiskCache cache = new BlockDiskCache(cacheDir, maxSize, blockSize);
144145

145-
String whitelistStr = options.fileCacheWhitelist();
146-
Set<FileType> whitelist = EnumSet.noneOf(FileType.class);
147-
for (String name : whitelistStr.split(",")) {
148-
name = name.trim();
149-
switch (name) {
150-
case "meta":
151-
whitelist.add(FileType.META);
152-
break;
153-
case "global-index":
154-
whitelist.add(FileType.GLOBAL_INDEX);
155-
break;
156-
case "bucket-index":
157-
whitelist.add(FileType.BUCKET_INDEX);
158-
break;
159-
case "data":
160-
whitelist.add(FileType.DATA);
161-
break;
162-
case "file-index":
163-
whitelist.add(FileType.FILE_INDEX);
164-
break;
165-
}
166-
}
146+
Set<FileType> whitelist = FileType.parseWhitelist(options.fileCacheWhitelist());
167147
if (whitelist.isEmpty()) {
168148
return fileIO;
169149
}

paimon-python/pypaimon/common/options/core_options.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ class CoreOptions:
408408
FILE_CACHE_MAX_SIZE: ConfigOption[MemorySize] = (
409409
ConfigOptions.key("file-cache.max-size")
410410
.memory_type()
411-
.default_value(MemorySize.MAX_VALUE)
411+
.no_default_value()
412412
.with_description("Maximum total size of the local disk block cache. Unlimited by default.")
413413
)
414414

@@ -627,7 +627,7 @@ def file_cache_enabled(self) -> bool:
627627
def file_cache_dir(self) -> Optional[str]:
628628
return self.options.get(CoreOptions.FILE_CACHE_DIR)
629629

630-
def file_cache_max_size(self) -> MemorySize:
630+
def file_cache_max_size(self) -> Optional[MemorySize]:
631631
return self.options.get(CoreOptions.FILE_CACHE_MAX_SIZE)
632632

633633
def file_cache_block_size(self) -> MemorySize:

0 commit comments

Comments
 (0)