Skip to content

Commit 47b324f

Browse files
committed
fix
1 parent 51dc38c commit 47b324f

5 files changed

Lines changed: 79 additions & 45 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/fs/cache/BlockDiskCache.java

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -104,19 +104,17 @@ public void putBlock(String filePath, int blockIndex, byte[] data) {
104104
return;
105105
}
106106

107+
boolean needEvict = false;
107108
synchronized (lock) {
108109
currentSize += data.length;
109-
if (maxSizeBytes < Long.MAX_VALUE) {
110-
evictIfNeeded();
111-
}
110+
needEvict = maxSizeBytes < Long.MAX_VALUE && currentSize > maxSizeBytes;
112111
}
113-
}
114-
115-
private void evictIfNeeded() {
116-
if (currentSize <= maxSizeBytes) {
117-
return;
112+
if (needEvict) {
113+
evict();
118114
}
115+
}
119116

117+
private void evict() {
120118
List<CacheEntry> entries = new ArrayList<>();
121119
File[] prefixDirs = cacheDir.listFiles();
122120
if (prefixDirs == null) {
@@ -140,12 +138,14 @@ private void evictIfNeeded() {
140138

141139
entries.sort(Comparator.comparingLong(e -> e.mtime));
142140

143-
for (CacheEntry entry : entries) {
144-
if (currentSize <= maxSizeBytes) {
145-
break;
146-
}
147-
if (entry.file.delete()) {
148-
currentSize -= entry.size;
141+
synchronized (lock) {
142+
for (CacheEntry entry : entries) {
143+
if (currentSize <= maxSizeBytes) {
144+
break;
145+
}
146+
if (entry.file.delete()) {
147+
currentSize -= entry.size;
148+
}
149149
}
150150
}
151151
}
@@ -187,18 +187,28 @@ private File cachePath(String filePath, int blockIndex) {
187187
return new File(new File(cacheDir, prefix), hex);
188188
}
189189

190+
private static final ThreadLocal<MessageDigest> SHA256_DIGEST =
191+
ThreadLocal.withInitial(
192+
() -> {
193+
try {
194+
return MessageDigest.getInstance("SHA-256");
195+
} catch (NoSuchAlgorithmException e) {
196+
throw new RuntimeException("SHA-256 not available", e);
197+
}
198+
});
199+
200+
private static final char[] HEX_CHARS = "0123456789abcdef".toCharArray();
201+
190202
private static String sha256Hex(String input) {
191-
try {
192-
MessageDigest md = MessageDigest.getInstance("SHA-256");
193-
byte[] hash = md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
194-
StringBuilder sb = new StringBuilder(hash.length * 2);
195-
for (byte b : hash) {
196-
sb.append(String.format("%02x", b & 0xff));
197-
}
198-
return sb.toString();
199-
} catch (NoSuchAlgorithmException e) {
200-
throw new RuntimeException("SHA-256 not available", e);
203+
MessageDigest md = SHA256_DIGEST.get();
204+
md.reset();
205+
byte[] hash = md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
206+
char[] chars = new char[hash.length * 2];
207+
for (int i = 0; i < hash.length; i++) {
208+
chars[i * 2] = HEX_CHARS[(hash[i] >> 4) & 0x0f];
209+
chars[i * 2 + 1] = HEX_CHARS[hash[i] & 0x0f];
201210
}
211+
return new String(chars);
202212
}
203213

204214
@VisibleForTesting

paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@ public CachingFileIO(FileIO delegate, BlockDiskCache cache, Set<FileType> whitel
5252

5353
@Override
5454
public SeekableInputStream newInputStream(Path path) throws IOException {
55+
if (cache == null) {
56+
return delegate.newInputStream(path);
57+
}
5558
FileType fileType = FileType.classify(path);
5659
if (!whitelist.contains(fileType) || FileType.isMutable(path)) {
5760
return delegate.newInputStream(path);
5861
}
59-
SeekableInputStream stream = delegate.newInputStream(path);
6062
long fileSize = delegate.getFileStatus(path).getLen();
61-
return new CachingSeekableInputStream(stream, path.toString(), fileSize, cache);
63+
return new CachingSeekableInputStream(delegate, path, fileSize, cache);
6264
}
6365

6466
@Override
@@ -113,7 +115,9 @@ public void setRuntimeContext(Map<String, String> options) {
113115

114116
@Override
115117
public void close() throws IOException {
116-
cache.close();
118+
if (cache != null) {
119+
cache.close();
120+
}
117121
delegate.close();
118122
}
119123
}

paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingSeekableInputStream.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,28 @@
1818

1919
package org.apache.paimon.fs.cache;
2020

21+
import org.apache.paimon.fs.FileIO;
22+
import org.apache.paimon.fs.Path;
2123
import org.apache.paimon.fs.SeekableInputStream;
22-
import org.apache.paimon.fs.SeekableInputStreamWrapper;
24+
25+
import javax.annotation.Nullable;
2326

2427
import java.io.IOException;
2528

2629
/** A {@link SeekableInputStream} that caches reads at block granularity on local disk. */
27-
public class CachingSeekableInputStream extends SeekableInputStreamWrapper {
30+
public class CachingSeekableInputStream extends SeekableInputStream {
2831

29-
private final String filePath;
32+
private final FileIO fileIO;
33+
private final Path path;
3034
private final long fileSize;
3135
private final BlockDiskCache cache;
3236
private long pos;
37+
@Nullable private SeekableInputStream remoteStream;
3338

3439
public CachingSeekableInputStream(
35-
SeekableInputStream in, String filePath, long fileSize, BlockDiskCache cache) {
36-
super(in);
37-
this.filePath = filePath;
40+
FileIO fileIO, Path path, long fileSize, BlockDiskCache cache) {
41+
this.fileIO = fileIO;
42+
this.path = path;
3843
this.fileSize = fileSize;
3944
this.cache = cache;
4045
this.pos = 0;
@@ -94,7 +99,7 @@ public int read(byte[] b, int off, int len) throws IOException {
9499
}
95100

96101
private byte[] readBlock(int blockIndex) throws IOException {
97-
byte[] cached = cache.getBlock(filePath, blockIndex);
102+
byte[] cached = cache.getBlock(path.toString(), blockIndex);
98103
if (cached != null) {
99104
return cached;
100105
}
@@ -103,14 +108,22 @@ private byte[] readBlock(int blockIndex) throws IOException {
103108
long offset = (long) blockIndex * blockSize;
104109
int readSize = (int) Math.min(blockSize, fileSize - offset);
105110

106-
in.seek(offset);
107-
byte[] data = readFully(readSize);
111+
SeekableInputStream stream = getRemoteStream();
112+
stream.seek(offset);
113+
byte[] data = readFully(stream, readSize);
108114

109-
cache.putBlock(filePath, blockIndex, data);
115+
cache.putBlock(path.toString(), blockIndex, data);
110116
return data;
111117
}
112118

113-
private byte[] readFully(int size) throws IOException {
119+
private SeekableInputStream getRemoteStream() throws IOException {
120+
if (remoteStream == null) {
121+
remoteStream = fileIO.newInputStream(path);
122+
}
123+
return remoteStream;
124+
}
125+
126+
private static byte[] readFully(SeekableInputStream in, int size) throws IOException {
114127
byte[] buf = new byte[size];
115128
int remaining = size;
116129
int off = 0;
@@ -124,4 +137,12 @@ private byte[] readFully(int size) throws IOException {
124137
}
125138
return buf;
126139
}
140+
141+
@Override
142+
public void close() throws IOException {
143+
if (remoteStream != null) {
144+
remoteStream.close();
145+
remoteStream = null;
146+
}
147+
}
127148
}

paimon-common/src/test/java/org/apache/paimon/fs/cache/CachingFileIOTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,16 +197,15 @@ void testCacheHitAvoidsRemoteRead() throws IOException {
197197
readAll(s, data.length);
198198
}
199199
int firstReadCount = delegate.newInputStreamCallCount("snapshot-1");
200+
assertThat(firstReadCount).isEqualTo(1);
200201

201-
// second read should hit cache — delegate.newInputStream still called (we create the
202-
// wrapper), but the underlying stream's read should not be invoked for cached blocks
202+
// second read should hit cache — delegate.newInputStream should NOT be called
203+
// because the remote stream is lazily opened and all blocks are cached
203204
try (SeekableInputStream s = cachingIO.newInputStream(new Path("snapshot-1"))) {
204205
byte[] result = readAll(s, data.length);
205206
assertThat(result).isEqualTo(data);
206207
}
207-
// newInputStream is called each time (we need the stream as fallback),
208-
// but the data comes from cache
209-
assertThat(delegate.newInputStreamCallCount("snapshot-1")).isEqualTo(firstReadCount + 1);
208+
assertThat(delegate.newInputStreamCallCount("snapshot-1")).isEqualTo(firstReadCount);
210209
}
211210

212211
@Test

paimon-python/pypaimon/filesystem/caching_file_io.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def _evict_if_needed(self) -> None:
9999
entries = []
100100
for dirpath, _, filenames in os.walk(self._cache_dir):
101101
for fn in filenames:
102-
if fn.endswith('.tmp'):
102+
if '.tmp.' in fn:
103103
continue
104104
fp = os.path.join(dirpath, fn)
105105
try:
@@ -123,7 +123,7 @@ def _scan_size(self) -> int:
123123
total = 0
124124
for dirpath, _, filenames in os.walk(self._cache_dir):
125125
for fn in filenames:
126-
if fn.endswith('.tmp'):
126+
if '.tmp.' in fn:
127127
continue
128128
try:
129129
total += os.path.getsize(os.path.join(dirpath, fn))

0 commit comments

Comments
 (0)