Skip to content

Commit 4a91093

Browse files
committed
[core] Introduce 'cache.expire-after-write' to expire a max time interval
1 parent 9b379a0 commit 4a91093

File tree

5 files changed

+85
-23
lines changed

5 files changed

+85
-23
lines changed

docs/layouts/shortcodes/generated/catalog_configuration.html

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,16 @@
3333
<td>Controls whether the catalog will cache databases, tables, manifests and partitions.</td>
3434
</tr>
3535
<tr>
36-
<td><h5>cache.expiration-interval</h5></td>
36+
<td><h5>cache.expire-after-access</h5></td>
3737
<td style="word-wrap: break-word;">10 min</td>
3838
<td>Duration</td>
39-
<td>Controls the duration for which databases and tables in the catalog are cached.</td>
39+
<td>Cache expiration policy: marks cache entries to expire after a specified duration has passed since their last access.</td>
40+
</tr>
41+
<tr>
42+
<td><h5>cache.expire-after-write</h5></td>
43+
<td style="word-wrap: break-word;">30 min</td>
44+
<td>Duration</td>
45+
<td>Cache expiration policy: marks cache entries to expire after a specified duration has passed since their last refresh.</td>
4046
</tr>
4147
<tr>
4248
<td><h5>cache.manifest.max-memory</h5></td>

paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,20 @@ public class CatalogOptions {
8989
.withDescription(
9090
"Controls whether the catalog will cache databases, tables, manifests and partitions.");
9191

92-
public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
93-
key("cache.expiration-interval")
92+
public static final ConfigOption<Duration> CACHE_EXPIRE_AFTER_ACCESS =
93+
key("cache.expire-after-access")
9494
.durationType()
9595
.defaultValue(Duration.ofMinutes(10))
96+
.withFallbackKeys("cache.expiration-interval")
9697
.withDescription(
97-
"Controls the duration for which databases and tables in the catalog are cached.");
98+
"Cache expiration policy: marks cache entries to expire after a specified duration has passed since their last access.");
99+
100+
public static final ConfigOption<Duration> CACHE_EXPIRE_AFTER_WRITE =
101+
key("cache.expire-after-write")
102+
.durationType()
103+
.defaultValue(Duration.ofMinutes(30))
104+
.withDescription(
105+
"Cache expiration policy: marks cache entries to expire after a specified duration has passed since their last refresh.");
98106

99107
public static final ConfigOption<Long> CACHE_PARTITION_MAX_NUM =
100108
key("cache.partition.max-num")

paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
import java.util.Optional;
4545

4646
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
47-
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
47+
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
48+
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
4849
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
4950
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
5051
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
@@ -57,7 +58,8 @@ public class CachingCatalog extends DelegateCatalog {
5758

5859
private final Options options;
5960

60-
private final Duration expirationInterval;
61+
private final Duration expireAfterAccess;
62+
private final Duration expireAfterWrite;
6163
private final int snapshotMaxNumPerTable;
6264
private final long cachedPartitionMaxNum;
6365

@@ -80,11 +82,17 @@ public CachingCatalog(Catalog wrapped, Options options) {
8082
manifestCacheThreshold = Long.MAX_VALUE;
8183
}
8284

83-
this.expirationInterval = options.get(CACHE_EXPIRATION_INTERVAL_MS);
84-
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
85+
this.expireAfterAccess = options.get(CACHE_EXPIRE_AFTER_ACCESS);
86+
if (expireAfterAccess.isZero() || expireAfterAccess.isNegative()) {
8587
throw new IllegalArgumentException(
86-
"When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
88+
"When 'cache.expire-after-access' is set to negative or 0, the catalog cache should be disabled.");
8789
}
90+
this.expireAfterWrite = options.get(CACHE_EXPIRE_AFTER_WRITE);
91+
if (expireAfterWrite.isZero() || expireAfterWrite.isNegative()) {
92+
throw new IllegalArgumentException(
93+
"When 'cache.expire-after-write' is set to negative or 0, the catalog cache should be disabled.");
94+
}
95+
8896
this.snapshotMaxNumPerTable = options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE);
8997
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
9098

@@ -98,14 +106,16 @@ void init(Ticker ticker) {
98106
Caffeine.newBuilder()
99107
.softValues()
100108
.executor(Runnable::run)
101-
.expireAfterAccess(expirationInterval)
109+
.expireAfterAccess(expireAfterAccess)
110+
.expireAfterWrite(expireAfterWrite)
102111
.ticker(ticker)
103112
.build();
104113
this.tableCache =
105114
Caffeine.newBuilder()
106115
.softValues()
107116
.executor(Runnable::run)
108-
.expireAfterAccess(expirationInterval)
117+
.expireAfterAccess(expireAfterAccess)
118+
.expireAfterWrite(expireAfterWrite)
109119
.ticker(ticker)
110120
.build();
111121
this.partitionCache =
@@ -114,7 +124,8 @@ void init(Ticker ticker) {
114124
: Caffeine.newBuilder()
115125
.softValues()
116126
.executor(Runnable::run)
117-
.expireAfterAccess(expirationInterval)
127+
.expireAfterAccess(expireAfterAccess)
128+
.expireAfterWrite(expireAfterWrite)
118129
.weigher(
119130
(Weigher<Identifier, List<Partition>>)
120131
(identifier, v) -> v.size())
@@ -239,14 +250,16 @@ private void putTableCache(Identifier identifier, Table table) {
239250
storeTable.setSnapshotCache(
240251
Caffeine.newBuilder()
241252
.softValues()
242-
.expireAfterAccess(expirationInterval)
253+
.expireAfterAccess(expireAfterAccess)
254+
.expireAfterWrite(expireAfterWrite)
243255
.maximumSize(snapshotMaxNumPerTable)
244256
.executor(Runnable::run)
245257
.build());
246258
storeTable.setStatsCache(
247259
Caffeine.newBuilder()
248260
.softValues()
249-
.expireAfterAccess(expirationInterval)
261+
.expireAfterAccess(expireAfterAccess)
262+
.expireAfterWrite(expireAfterWrite)
250263
.maximumSize(5)
251264
.executor(Runnable::run)
252265
.build());

paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
import static java.util.Collections.emptyList;
6565
import static java.util.Collections.singletonList;
6666
import static org.apache.paimon.data.BinaryString.fromString;
67-
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
67+
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
6868
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
6969
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
7070
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
@@ -199,6 +199,34 @@ public void testTableExpiresAfterInterval() throws Exception {
199199
.isNotSameAs(table);
200200
}
201201

202+
@Test
203+
public void testTableExpiresAfterWrite() throws Exception {
204+
TestableCachingCatalog catalog =
205+
new TestableCachingCatalog(
206+
this.catalog, Duration.ofMinutes(5), Duration.ofMinutes(8), ticker);
207+
208+
Identifier tableIdent = new Identifier("db", "tbl");
209+
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
210+
Table table = catalog.getTable(tableIdent);
211+
212+
ticker.advance(Duration.ofMinutes(2));
213+
214+
// refresh from get
215+
catalog.getTable(tableIdent);
216+
217+
// not expire
218+
ticker.advance(Duration.ofMinutes(4));
219+
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
220+
catalog.getTable(tableIdent);
221+
222+
// advance 10 minutes to expire from write
223+
ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(4)));
224+
assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
225+
assertThat(catalog.getTable(tableIdent))
226+
.as("CachingCatalog should return a new instance after expiration")
227+
.isNotSameAs(table);
228+
}
229+
202230
@Test
203231
public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exception {
204232
TestableCachingCatalog catalog =
@@ -320,7 +348,7 @@ public void testCachingCatalogRejectsExpirationIntervalOfZero() {
320348
() -> new TestableCachingCatalog(this.catalog, Duration.ZERO, ticker))
321349
.isInstanceOf(IllegalArgumentException.class)
322350
.hasMessage(
323-
"When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
351+
"When 'cache.expire-after-access' is set to negative or 0, the catalog cache should be disabled.");
324352
}
325353

326354
@Test
@@ -378,7 +406,7 @@ public void testManifestCache() throws Exception {
378406

379407
private void innerTestManifestCache(long manifestCacheThreshold) throws Exception {
380408
Options options = new Options();
381-
options.set(CACHE_EXPIRATION_INTERVAL_MS, Duration.ofSeconds(10));
409+
options.set(CACHE_EXPIRE_AFTER_ACCESS, Duration.ofSeconds(10));
382410
options.set(CACHE_MANIFEST_SMALL_FILE_MEMORY, MemorySize.ofMebiBytes(1));
383411
options.set(
384412
CACHE_MANIFEST_SMALL_FILE_THRESHOLD, MemorySize.ofBytes(manifestCacheThreshold));

paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
import java.util.List;
3030
import java.util.Optional;
3131

32-
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
32+
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
33+
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
3334
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
3435

3536
/**
@@ -41,14 +42,20 @@ public class TestableCachingCatalog extends CachingCatalog {
4142
private final Duration cacheExpirationInterval;
4243

4344
public TestableCachingCatalog(Catalog catalog, Duration expirationInterval, Ticker ticker) {
44-
super(catalog, createOptions(expirationInterval));
45+
this(catalog, expirationInterval, Duration.ofDays(1), ticker);
46+
}
47+
48+
public TestableCachingCatalog(
49+
Catalog catalog, Duration expireAfterAccess, Duration expireAfterWrite, Ticker ticker) {
50+
super(catalog, createOptions(expireAfterAccess, expireAfterWrite));
4551
init(ticker);
46-
this.cacheExpirationInterval = expirationInterval;
52+
this.cacheExpirationInterval = expireAfterAccess;
4753
}
4854

49-
private static Options createOptions(Duration expirationInterval) {
55+
private static Options createOptions(Duration expireAfterAccess, Duration expireAfterWrite) {
5056
Options options = new Options();
51-
options.set(CACHE_EXPIRATION_INTERVAL_MS, expirationInterval);
57+
options.set(CACHE_EXPIRE_AFTER_ACCESS, expireAfterAccess);
58+
options.set(CACHE_EXPIRE_AFTER_WRITE, expireAfterWrite);
5259
options.set(CACHE_PARTITION_MAX_NUM, 100L);
5360
return options;
5461
}

0 commit comments

Comments
 (0)