Skip to content

Commit 62c39c6

Browse files
committed
Add cache support for Java
1 parent 49ee9f0 commit 62c39c6

10 files changed

Lines changed: 1227 additions & 2 deletions

File tree

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,41 @@ public InlineElement getDescription() {
710710
.defaultValue(MemorySize.parse("64 kb"))
711711
.withDescription("Memory page size for caching.");
712712

713+
public static final ConfigOption<Boolean> FILE_CACHE_ENABLED =
714+
key("file-cache.enabled")
715+
.booleanType()
716+
.defaultValue(false)
717+
.withDescription("Whether to enable local disk block cache for file reads.");
718+
719+
public static final ConfigOption<String> FILE_CACHE_DIR =
720+
key("file-cache.dir")
721+
.stringType()
722+
.noDefaultValue()
723+
.withDescription(
724+
"Directory for file block cache. "
725+
+ "Defaults to a 'paimon-file-cache' subdirectory under the system temp directory.");
726+
727+
public static final ConfigOption<MemorySize> FILE_CACHE_MAX_SIZE =
728+
key("file-cache.max-size")
729+
.memoryType()
730+
.defaultValue(MemorySize.MAX_VALUE)
731+
.withDescription(
732+
"Maximum total size of the local disk block cache. Unlimited by default.");
733+
734+
public static final ConfigOption<MemorySize> FILE_CACHE_BLOCK_SIZE =
735+
key("file-cache.block-size")
736+
.memoryType()
737+
.defaultValue(MemorySize.ofMebiBytes(1))
738+
.withDescription("Block size for local disk cache.");
739+
740+
public static final ConfigOption<String> FILE_CACHE_WHITELIST =
741+
key("file-cache.whitelist")
742+
.stringType()
743+
.defaultValue("meta,global-index")
744+
.withDescription(
745+
"Comma-separated list of file types to cache. "
746+
+ "Supported values: meta, global-index, bucket-index, data, file-index.");
747+
713748
public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
714749
key("target-file-size")
715750
.memoryType()
@@ -2887,6 +2922,27 @@ public int cachePageSize() {
28872922
return (int) options.get(CACHE_PAGE_SIZE).getBytes();
28882923
}
28892924

2925+
public boolean fileCacheEnabled() {
2926+
return options.get(FILE_CACHE_ENABLED);
2927+
}
2928+
2929+
@Nullable
2930+
public String fileCacheDir() {
2931+
return options.get(FILE_CACHE_DIR);
2932+
}
2933+
2934+
public MemorySize fileCacheMaxSize() {
2935+
return options.get(FILE_CACHE_MAX_SIZE);
2936+
}
2937+
2938+
public MemorySize fileCacheBlockSize() {
2939+
return options.get(FILE_CACHE_BLOCK_SIZE);
2940+
}
2941+
2942+
public String fileCacheWhitelist() {
2943+
return options.get(FILE_CACHE_WHITELIST);
2944+
}
2945+
28902946
public MemorySize lookupCacheMaxMemory() {
28912947
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
28922948
}
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.fs.cache;
20+
21+
import org.apache.paimon.annotation.VisibleForTesting;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.io.Closeable;
27+
import java.io.File;
28+
import java.io.FileOutputStream;
29+
import java.io.IOException;
30+
import java.nio.file.FileVisitResult;
31+
import java.nio.file.Files;
32+
import java.nio.file.Path;
33+
import java.nio.file.SimpleFileVisitor;
34+
import java.nio.file.attribute.BasicFileAttributes;
35+
import java.security.MessageDigest;
36+
import java.security.NoSuchAlgorithmException;
37+
import java.util.ArrayList;
38+
import java.util.Comparator;
39+
import java.util.List;
40+
41+
/** Block-level local disk cache with LRU eviction. Thread-safe. */
42+
public class BlockDiskCache implements Closeable {
43+
44+
private static final Logger LOG = LoggerFactory.getLogger(BlockDiskCache.class);
45+
46+
private final File cacheDir;
47+
private final long maxSizeBytes;
48+
private final int blockSize;
49+
private final Object lock = new Object();
50+
private long currentSize;
51+
52+
public BlockDiskCache(String cacheDir, long maxSizeBytes, int blockSize) {
53+
this.cacheDir = new File(cacheDir);
54+
this.maxSizeBytes = maxSizeBytes;
55+
this.blockSize = blockSize;
56+
this.cacheDir.mkdirs();
57+
this.currentSize = scanSize();
58+
}
59+
60+
public int blockSize() {
61+
return blockSize;
62+
}
63+
64+
public byte[] getBlock(String filePath, int blockIndex) {
65+
File path = cachePath(filePath, blockIndex);
66+
if (!path.exists()) {
67+
return null;
68+
}
69+
try {
70+
byte[] data = Files.readAllBytes(path.toPath());
71+
// touch to update mtime for LRU
72+
path.setLastModified(System.currentTimeMillis());
73+
return data;
74+
} catch (IOException e) {
75+
LOG.debug("Failed to read cache block: {}", path, e);
76+
return null;
77+
}
78+
}
79+
80+
public void putBlock(String filePath, int blockIndex, byte[] data) {
81+
File path = cachePath(filePath, blockIndex);
82+
if (path.exists()) {
83+
return;
84+
}
85+
86+
File subDir = path.getParentFile();
87+
subDir.mkdirs();
88+
89+
File tmpFile =
90+
new File(
91+
path.getParent(),
92+
path.getName() + ".tmp." + Thread.currentThread().getId());
93+
try {
94+
try (FileOutputStream fos = new FileOutputStream(tmpFile)) {
95+
fos.write(data);
96+
}
97+
if (!tmpFile.renameTo(path)) {
98+
tmpFile.delete();
99+
return;
100+
}
101+
} catch (IOException e) {
102+
tmpFile.delete();
103+
LOG.debug("Failed to write cache block: {}", path, e);
104+
return;
105+
}
106+
107+
synchronized (lock) {
108+
currentSize += data.length;
109+
if (maxSizeBytes < Long.MAX_VALUE) {
110+
evictIfNeeded();
111+
}
112+
}
113+
}
114+
115+
private void evictIfNeeded() {
116+
if (currentSize <= maxSizeBytes) {
117+
return;
118+
}
119+
120+
List<CacheEntry> entries = new ArrayList<>();
121+
File[] prefixDirs = cacheDir.listFiles();
122+
if (prefixDirs == null) {
123+
return;
124+
}
125+
for (File prefixDir : prefixDirs) {
126+
if (!prefixDir.isDirectory()) {
127+
continue;
128+
}
129+
File[] files = prefixDir.listFiles();
130+
if (files == null) {
131+
continue;
132+
}
133+
for (File file : files) {
134+
if (file.getName().contains(".tmp.")) {
135+
continue;
136+
}
137+
entries.add(new CacheEntry(file.lastModified(), file.length(), file));
138+
}
139+
}
140+
141+
entries.sort(Comparator.comparingLong(e -> e.mtime));
142+
143+
for (CacheEntry entry : entries) {
144+
if (currentSize <= maxSizeBytes) {
145+
break;
146+
}
147+
if (entry.file.delete()) {
148+
currentSize -= entry.size;
149+
}
150+
}
151+
}
152+
153+
private long scanSize() {
154+
long total = 0;
155+
try {
156+
Path root = cacheDir.toPath();
157+
if (!Files.exists(root)) {
158+
return 0;
159+
}
160+
total = scanDirectory(root);
161+
} catch (IOException e) {
162+
LOG.warn("Failed to scan cache directory size", e);
163+
}
164+
return total;
165+
}
166+
167+
private long scanDirectory(Path root) throws IOException {
168+
long[] total = {0};
169+
Files.walkFileTree(
170+
root,
171+
new SimpleFileVisitor<Path>() {
172+
@Override
173+
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
174+
if (!file.getFileName().toString().contains(".tmp.")) {
175+
total[0] += attrs.size();
176+
}
177+
return FileVisitResult.CONTINUE;
178+
}
179+
});
180+
return total[0];
181+
}
182+
183+
private File cachePath(String filePath, int blockIndex) {
184+
String key = filePath + ":" + blockIndex;
185+
String hex = sha256Hex(key);
186+
String prefix = hex.substring(0, 2);
187+
return new File(new File(cacheDir, prefix), hex);
188+
}
189+
190+
private static String sha256Hex(String input) {
191+
try {
192+
MessageDigest md = MessageDigest.getInstance("SHA-256");
193+
byte[] hash = md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
194+
StringBuilder sb = new StringBuilder(hash.length * 2);
195+
for (byte b : hash) {
196+
sb.append(String.format("%02x", b & 0xff));
197+
}
198+
return sb.toString();
199+
} catch (NoSuchAlgorithmException e) {
200+
throw new RuntimeException("SHA-256 not available", e);
201+
}
202+
}
203+
204+
@VisibleForTesting
205+
long currentSize() {
206+
synchronized (lock) {
207+
return currentSize;
208+
}
209+
}
210+
211+
@Override
212+
public void close() {}
213+
214+
private static class CacheEntry {
215+
final long mtime;
216+
final long size;
217+
final File file;
218+
219+
CacheEntry(long mtime, long size, File file) {
220+
this.mtime = mtime;
221+
this.size = size;
222+
this.file = file;
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)