Skip to content

Commit adf02a5

Browse files
committed
Iceberg manifest-list cache
1 parent 34af2a5 commit adf02a5

File tree

5 files changed

+183
-6
lines changed

5 files changed

+183
-6
lines changed

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.iceberg.DataFile;
3636
import org.apache.iceberg.DeleteFile;
3737
import org.apache.iceberg.ManifestFile;
38+
import org.apache.iceberg.Snapshot;
3839
import org.apache.iceberg.PartitionSpec;
3940
import org.apache.iceberg.Schema;
4041
import org.apache.iceberg.SortOrder;
@@ -81,6 +82,11 @@ public class CachingIcebergCatalog implements IcebergCatalog {
8182
private final Map<IcebergTableName, Long> tableLatestAccessTime = new ConcurrentHashMap<>();
8283
private final Map<IcebergTableName, Long> tableLatestRefreshTime = new ConcurrentHashMap<>();
8384

85+
// Cache for manifest-list: snapshotId -> SnapshotManifests (dataManifests + deleteManifests)
86+
// snapshotId is globally unique in Iceberg, so no need for table-level indexing
87+
// Invalidation happens via TTL (same as tables cache) - snapshots are immutable anyway
88+
private final com.github.benmanes.caffeine.cache.Cache<Long, SnapshotManifests> manifestListCache;
89+
8490
private final com.github.benmanes.caffeine.cache.LoadingCache<IcebergTableName, Map<String, Partition>> partitionCache;
8591

8692
public CachingIcebergCatalog(String catalogName, IcebergCatalog delegate, IcebergCatalogProperties icebergProperties,
@@ -167,6 +173,17 @@ public Map<String, Partition> load(IcebergTableName key) throws Exception {
167173
})
168174
.build() : null;
169175

176+
// Initialize manifest-list cache (snapshotId -> SnapshotManifests)
177+
// Using simple count-based eviction since ManifestFile objects are lightweight
178+
this.manifestListCache = enableCache ? Caffeine.newBuilder()
179+
.executor(executorService)
180+
.expireAfterWrite(icebergProperties.getIcebergMetaCacheTtlSec(), SECONDS)
181+
.maximumSize(DEFAULT_CACHE_NUM)
182+
.removalListener((Long snapshotId, SnapshotManifests value, RemovalCause cause) -> {
183+
LOG.debug("Manifest list cache removal: snapshotId={}, cause={}", snapshotId, cause);
184+
})
185+
.build() : null;
186+
170187
this.backgroundExecutor = executorService;
171188
}
172189

@@ -390,8 +407,15 @@ private void refreshTable(BaseTable currentTable, BaseTable updatedTable,
390407
partitionCache.invalidate(baseIcebergTableName);
391408
partitionCache.get(updatedIcebergTableName);
392409

410+
// Cache manifest-list for the new snapshot
411+
List<ManifestFile> dataManifests = updatedTable.currentSnapshot().dataManifests(updatedTable.io());
412+
List<ManifestFile> deleteManifests = updatedTable.currentSnapshot().deleteManifests(updatedTable.io());
413+
if (manifestListCache != null) {
414+
manifestListCache.put(updatedSnapshotId, new SnapshotManifests(dataManifests, deleteManifests));
415+
}
416+
393417
TableMetadata updatedTableMetadata = updatedTable.operations().current();
394-
List<ManifestFile> manifestFiles = updatedTable.currentSnapshot().dataManifests(updatedTable.io()).stream()
418+
List<ManifestFile> manifestFiles = dataManifests.stream()
395419
.filter(f -> updatedTableMetadata.snapshot(f.snapshotId()) != null)
396420
.filter(f -> updatedTableMetadata.snapshot(f.snapshotId()).timestampMillis() > latestRefreshTime)
397421
.filter(f -> dataFileCache.getIfPresent(f.path()) == null)
@@ -451,7 +475,18 @@ public void invalidateCache(String dbName, String tableName) {
451475
}
452476

453477
private void invalidateCache(IcebergTableName key) {
454-
tables.invalidate(new IcebergTableCacheKey(key, new ConnectContext()));
478+
// Get snapshotId before invalidating table cache
479+
IcebergTableCacheKey tableKey = new IcebergTableCacheKey(key, new ConnectContext());
480+
Table cachedTable = tables.getIfPresent(tableKey);
481+
Long snapshotId = null;
482+
if (cachedTable != null && cachedTable instanceof BaseTable) {
483+
Snapshot snapshot = ((BaseTable) cachedTable).currentSnapshot();
484+
if (snapshot != null) {
485+
snapshotId = snapshot.snapshotId();
486+
}
487+
}
488+
489+
tables.invalidate(tableKey);
455490
// will invalidate all snapshots of this table
456491
partitionCache.invalidate(key);
457492
tableLatestAccessTime.remove(key);
@@ -462,6 +497,11 @@ private void invalidateCache(IcebergTableName key) {
462497
dataFileCache.invalidateAll(paths);
463498
deleteFileCache.invalidateAll(paths);
464499
}
500+
501+
// Invalidate manifest-list cache for current snapshot
502+
if (manifestListCache != null && snapshotId != null) {
503+
manifestListCache.invalidate(snapshotId);
504+
}
465505
}
466506

467507
@Override
@@ -472,10 +512,15 @@ public StarRocksIcebergTableScan getTableScan(Table table, StarRocksIcebergTable
472512
scanContext.setDataFileCacheWithMetrics(icebergProperties.isIcebergManifestCacheWithColumnStatistics());
473513
scanContext.setEnableCacheDataFileIdentifierColumnMetrics(
474514
icebergProperties.enableCacheDataFileIdentifierColumnStatistics());
515+
scanContext.setManifestListCache(manifestListCache);
475516

476517
return delegate.getTableScan(table, scanContext);
477518
}
478519

520+
public com.github.benmanes.caffeine.cache.Cache<Long, SnapshotManifests> getManifestListCache() {
521+
return manifestListCache;
522+
}
523+
479524
private Caffeine<Object, Object> newCacheBuilder(long expiresAfterWriteSec, long refreshInterval,
480525
long maximumSize) {
481526
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -714,14 +714,15 @@ public SerializedMetaSpec getSerializedMetaSpec(String dbName, String tableName,
714714
return new IcebergMetaSpec(serializedTable, List.of(IcebergMetaSplit.placeholderSplit()), false);
715715
}
716716

717-
List<ManifestFile> dataManifests = snapshot.dataManifests(nativeTable.io());
717+
SnapshotManifests manifests = getSnapshotManifestsWithCache(snapshot, nativeTable);
718+
List<ManifestFile> dataManifests = manifests.getDataManifests();
718719

719720
List<ManifestFile> matchingDataManifests = filterManifests(dataManifests, nativeTable, predicate);
720721
for (ManifestFile file : matchingDataManifests) {
721722
remoteMetaSplits.add(IcebergMetaSplit.from(file));
722723
}
723724

724-
List<ManifestFile> deleteManifests = snapshot.deleteManifests(nativeTable.io());
725+
List<ManifestFile> deleteManifests = manifests.getDeleteManifests();
725726
List<ManifestFile> matchingDeleteManifests = filterManifests(deleteManifests, nativeTable, predicate);
726727
if (metadataTableType == MetadataTableType.FILES || metadataTableType == MetadataTableType.PARTITIONS) {
727728
for (ManifestFile file : matchingDeleteManifests) {
@@ -737,6 +738,34 @@ public SerializedMetaSpec getSerializedMetaSpec(String dbName, String tableName,
737738
return new IcebergMetaSpec(serializedTable, remoteMetaSplits, loadColumnStats);
738739
}
739740

741+
/**
742+
* Get snapshot manifests with cache support.
743+
* Uses manifestListCache from CachingIcebergCatalog if available.
744+
*/
745+
private SnapshotManifests getSnapshotManifestsWithCache(Snapshot snapshot, org.apache.iceberg.Table nativeTable) {
746+
if (icebergCatalog instanceof CachingIcebergCatalog) {
747+
CachingIcebergCatalog cachingCatalog = (CachingIcebergCatalog) icebergCatalog;
748+
com.github.benmanes.caffeine.cache.Cache<Long, SnapshotManifests> cache = cachingCatalog.getManifestListCache();
749+
if (cache != null) {
750+
long snapshotId = snapshot.snapshotId();
751+
SnapshotManifests cached = cache.getIfPresent(snapshotId);
752+
if (cached != null) {
753+
return cached;
754+
}
755+
// Cache miss - read and cache
756+
List<ManifestFile> dataManifests = snapshot.dataManifests(nativeTable.io());
757+
List<ManifestFile> deleteManifests = snapshot.deleteManifests(nativeTable.io());
758+
SnapshotManifests manifests = new SnapshotManifests(dataManifests, deleteManifests);
759+
cache.put(snapshotId, manifests);
760+
return manifests;
761+
}
762+
}
763+
// No cache available - read directly
764+
return new SnapshotManifests(
765+
snapshot.dataManifests(nativeTable.io()),
766+
snapshot.deleteManifests(nativeTable.io()));
767+
}
768+
740769
private void triggerIcebergPlanFilesIfNeeded(PredicateSearchKey key, Table table) {
741770
triggerIcebergPlanFilesIfNeeded(key, table, null, ConnectContext.get());
742771
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright 2021-present StarRocks, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.starrocks.connector.iceberg;
16+
17+
import org.apache.iceberg.ManifestFile;
18+
19+
import java.util.List;
20+
21+
/**
22+
* Holds cached manifest lists for an Iceberg snapshot.
23+
* This avoids re-reading and re-parsing the manifest-list file (snap-*.avro)
24+
* on every query to the same snapshot.
25+
*/
26+
public class SnapshotManifests {
27+
private final List<ManifestFile> dataManifests;
28+
private final List<ManifestFile> deleteManifests;
29+
30+
public SnapshotManifests(List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) {
31+
this.dataManifests = dataManifests;
32+
this.deleteManifests = deleteManifests;
33+
}
34+
35+
public List<ManifestFile> getDataManifests() {
36+
return dataManifests;
37+
}
38+
39+
public List<ManifestFile> getDeleteManifests() {
40+
return deleteManifests;
41+
}
42+
}

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/StarRocksIcebergTableScanContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.github.benmanes.caffeine.cache.Cache;
1818
import com.starrocks.connector.PlanMode;
1919
import com.starrocks.connector.iceberg.CachingIcebergCatalog.IcebergTableName;
20+
import com.starrocks.connector.iceberg.SnapshotManifests;
2021
import com.starrocks.qe.ConnectContext;
2122
import org.apache.iceberg.DataFile;
2223
import org.apache.iceberg.DeleteFile;
@@ -33,6 +34,7 @@ public class StarRocksIcebergTableScanContext {
3334
private Cache<String, Set<DataFile>> dataFileCache;
3435
private Cache<String, Set<DeleteFile>> deleteFileCache;
3536
private Map<IcebergTableName, Set<String>> metaFileCacheMap;
37+
private Cache<Long, SnapshotManifests> manifestListCache;
3638
private boolean onlyReadCache;
3739
private int localParallelism;
3840
private long localPlanningMaxSlotSize;
@@ -96,6 +98,14 @@ public void setMetaFileCacheMap(Map<IcebergTableName, Set<String>> metaFileCache
9698
this.metaFileCacheMap = metaFileCacheMap;
9799
}
98100

101+
public Cache<Long, SnapshotManifests> getManifestListCache() {
102+
return manifestListCache;
103+
}
104+
105+
public void setManifestListCache(Cache<Long, SnapshotManifests> manifestListCache) {
106+
this.manifestListCache = manifestListCache;
107+
}
108+
99109
public boolean isOnlyReadCache() {
100110
return onlyReadCache;
101111
}

fe/fe-core/src/main/java/org/apache/iceberg/StarRocksIcebergTableScan.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.starrocks.connector.iceberg.AsyncIterable;
2525
import com.starrocks.connector.iceberg.CachingIcebergCatalog.IcebergTableName;
2626
import com.starrocks.connector.iceberg.IcebergApiConverter;
27+
import com.starrocks.connector.iceberg.SnapshotManifests;
2728
import com.starrocks.connector.iceberg.StarRocksIcebergTableScanContext;
2829
import com.starrocks.connector.iceberg.cost.IcebergMetricsReporter;
2930
import com.starrocks.connector.metadata.MetadataCollectJob;
@@ -72,6 +73,7 @@ public class StarRocksIcebergTableScan
7273
private final Cache<String, Set<DataFile>> dataFileCache;
7374
private final Cache<String, Set<DeleteFile>> deleteFileCache;
7475
private final Map<IcebergTableName, Set<String>> metaFileCacheMap;
76+
private final Cache<Long, SnapshotManifests> manifestListCache;
7577
private final Map<Integer, String> specStringCache;
7678
private final Map<Integer, ResidualEvaluator> residualCache;
7779
private final Map<Integer, Evaluator> partitionEvaluatorCache;
@@ -117,6 +119,7 @@ public StarRocksIcebergTableScan(Table table,
117119
this.dataFileCache = scanContext.getDataFileCache();
118120
this.deleteFileCache = scanContext.getDeleteFileCache();
119121
this.metaFileCacheMap = scanContext.getMetaFileCacheMap();
122+
this.manifestListCache = scanContext.getManifestListCache();
120123
this.dataFileCacheWithMetrics = scanContext.isDataFileCacheWithMetrics();
121124
this.enableCacheDataFileIdentifierColumnMetrics = scanContext.isEnableCacheDataFileIdentifierColumnMetrics();
122125
this.onlyReadCache = scanContext.isOnlyReadCache();
@@ -206,7 +209,7 @@ private DeleteFileIndex planDeletesLocally(List<ManifestFile> deleteManifests, S
206209
}
207210

208211
private List<ManifestFile> findMatchingDataManifests(Snapshot snapshot) {
209-
List<ManifestFile> dataManifests = snapshot.dataManifests(io());
212+
List<ManifestFile> dataManifests = getDataManifestsWithCache(snapshot);
210213
scanMetrics().totalDataManifests().increment(dataManifests.size());
211214

212215
List<ManifestFile> matchingDataManifests = IcebergApiConverter.filterManifests(dataManifests, table(), filter());
@@ -217,7 +220,7 @@ private List<ManifestFile> findMatchingDataManifests(Snapshot snapshot) {
217220
}
218221

219222
private List<ManifestFile> findMatchingDeleteManifests(Snapshot snapshot) {
220-
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io());
223+
List<ManifestFile> deleteManifests = getDeleteManifestsWithCache(snapshot);
221224
List<ManifestFile> matchingDeleteManifests = IcebergApiConverter.filterManifests(deleteManifests, table(), filter());
222225

223226
scanMetrics().totalDeleteManifests().increment(deleteManifests.size());
@@ -226,6 +229,54 @@ private List<ManifestFile> findMatchingDeleteManifests(Snapshot snapshot) {
226229
return matchingDeleteManifests;
227230
}
228231

232+
/**
233+
* Get data manifests from cache if available, otherwise read from snapshot and cache.
234+
*/
235+
private List<ManifestFile> getDataManifestsWithCache(Snapshot snapshot) {
236+
if (manifestListCache == null) {
237+
return snapshot.dataManifests(io());
238+
}
239+
240+
long snapshotId = snapshot.snapshotId();
241+
SnapshotManifests cached = manifestListCache.getIfPresent(snapshotId);
242+
if (cached != null) {
243+
LOG.debug("Manifest-list cache HIT for snapshot {}", snapshotId);
244+
return cached.getDataManifests();
245+
}
246+
247+
// Cache miss - read from snapshot and cache
248+
LOG.debug("Manifest-list cache MISS for snapshot {}", snapshotId);
249+
List<ManifestFile> dataManifests = snapshot.dataManifests(io());
250+
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io());
251+
manifestListCache.put(snapshotId, new SnapshotManifests(dataManifests, deleteManifests));
252+
253+
return dataManifests;
254+
}
255+
256+
/**
257+
* Get delete manifests from cache if available, otherwise read from snapshot and cache.
258+
*/
259+
private List<ManifestFile> getDeleteManifestsWithCache(Snapshot snapshot) {
260+
if (manifestListCache == null) {
261+
return snapshot.deleteManifests(io());
262+
}
263+
264+
long snapshotId = snapshot.snapshotId();
265+
SnapshotManifests cached = manifestListCache.getIfPresent(snapshotId);
266+
if (cached != null) {
267+
LOG.debug("Manifest-list cache HIT for snapshot {} (delete)", snapshotId);
268+
return cached.getDeleteManifests();
269+
}
270+
271+
// Cache miss - read from snapshot and cache
272+
LOG.debug("Manifest-list cache MISS for snapshot {} (delete)", snapshotId);
273+
List<ManifestFile> dataManifests = snapshot.dataManifests(io());
274+
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io());
275+
manifestListCache.put(snapshotId, new SnapshotManifests(dataManifests, deleteManifests));
276+
277+
return deleteManifests;
278+
}
279+
229280
private CloseableIterable<FileScanTask> planFileTasksLocally(
230281
List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) {
231282
if (useCache()) {

0 commit comments

Comments
 (0)