Skip to content

Commit 38847e8

Browse files
Added changes for full file cache stats.
Signed-off-by: Mayank Sharma <[email protected]>
1 parent 444df2c commit 38847e8

22 files changed

+960
-184
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java

+92
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
import org.apache.lucene.store.Directory;
1414
import org.apache.lucene.store.FilterDirectory;
15+
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
16+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
17+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1518
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
1619
import org.opensearch.action.admin.indices.get.GetIndexRequest;
1720
import org.opensearch.action.admin.indices.get.GetIndexResponse;
@@ -28,6 +31,8 @@
2831
import org.opensearch.index.store.CompositeDirectory;
2932
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
3033
import org.opensearch.index.store.remote.filecache.FileCache;
34+
import org.opensearch.index.store.remote.filecache.FileCacheStats;
35+
import org.opensearch.index.store.remote.filecache.FullFileCacheStats;
3136
import org.opensearch.index.store.remote.utils.FileTypeUtils;
3237
import org.opensearch.indices.IndicesService;
3338
import org.opensearch.node.Node;
@@ -36,7 +41,9 @@
3641

3742
import java.util.Arrays;
3843
import java.util.HashSet;
44+
import java.util.Objects;
3945
import java.util.Set;
46+
import java.util.concurrent.ExecutionException;
4047
import java.util.stream.Collectors;
4148

4249
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -49,6 +56,7 @@
4956
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {
5057

5158
protected static final String INDEX_NAME = "test-idx-1";
59+
protected static final String INDEX_NAME_2 = "test-idx-2";
5260
protected static final int NUM_DOCS_IN_BULK = 1000;
5361

5462
/*
@@ -168,4 +176,88 @@ public void testWritableWarmBasic() throws Exception {
168176
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
169177
fileCache.prune();
170178
}
179+
180+
public void testFullFileAndFileCacheStats() throws ExecutionException, InterruptedException {
181+
182+
InternalTestCluster internalTestCluster = internalCluster();
183+
internalTestCluster.startClusterManagerOnlyNode();
184+
internalTestCluster.startDataAndSearchNodes(1);
185+
186+
Settings settings = Settings.builder()
187+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
188+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
189+
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
190+
.build();
191+
192+
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get());
193+
194+
// Verify from the cluster settings if the data locality is partial
195+
GetIndexResponse getIndexResponse = client().admin()
196+
.indices()
197+
.getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true))
198+
.get();
199+
200+
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2);
201+
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));
202+
203+
// Ingesting docs again before force merge
204+
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
205+
flushAndRefresh(INDEX_NAME_2);
206+
207+
// ensuring cluster is green
208+
ensureGreen();
209+
210+
SearchResponse searchResponse = client().prepareSearch(INDEX_NAME_2).setQuery(QueryBuilders.matchAllQuery()).get();
211+
// Asserting that search returns same number of docs as ingested
212+
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
213+
214+
// Ingesting docs again before force merge
215+
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
216+
flushAndRefresh(INDEX_NAME_2);
217+
218+
FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();
219+
220+
// TODO: Make these validation more robust, when SwitchableIndexInput is implemented.
221+
222+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
223+
224+
FileCacheStats fileCacheStats = nodesStatsResponse.getNodes()
225+
.stream()
226+
.filter(n -> n.getNode().isDataNode())
227+
.toList()
228+
.getFirst()
229+
.getFileCacheStats();
230+
231+
if (Objects.isNull(fileCacheStats)) {
232+
fail("File Cache Stats should not be null");
233+
}
234+
235+
FullFileCacheStats fullFileCacheStats = fileCacheStats.fullFileCacheStats();
236+
237+
if (Objects.isNull(fullFileCacheStats)) {
238+
fail("Full File Cache Stats should not be null");
239+
}
240+
241+
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
242+
// leaks
243+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME_2)).get());
244+
fileCache.prune();
245+
246+
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
247+
int nonEmptyFileCacheNodes = 0;
248+
for (NodeStats stats : response.getNodes()) {
249+
FileCacheStats fcStats = stats.getFileCacheStats();
250+
if (Objects.isNull(fcStats) == false) {
251+
if (isFileCacheEmpty(fcStats) == false) {
252+
nonEmptyFileCacheNodes++;
253+
}
254+
}
255+
}
256+
assertEquals(0, nonEmptyFileCacheNodes);
257+
258+
}
259+
260+
private boolean isFileCacheEmpty(FileCacheStats stats) {
261+
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
262+
}
171263
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java

+20-9
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.opensearch.common.annotation.PublicApi;
1515
import org.opensearch.core.common.breaker.CircuitBreaker;
1616
import org.opensearch.core.common.breaker.CircuitBreakingException;
17-
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
1817
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
1918
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
2019
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;
@@ -133,10 +132,15 @@ public long prune(Predicate<Path> keyPredicate) {
133132
}
134133

135134
@Override
136-
public CacheUsage usage() {
135+
public long usage() {
137136
return theCache.usage();
138137
}
139138

139+
@Override
140+
public long activeUsage() {
141+
return theCache.activeUsage();
142+
}
143+
140144
@Override
141145
public CacheStats stats() {
142146
return theCache.stats();
@@ -145,8 +149,8 @@ public CacheStats stats() {
145149
// To be used only for debugging purposes
146150
public void logCurrentState() {
147151
logger.trace("CURRENT STATE OF FILE CACHE \n");
148-
CacheUsage cacheUsage = theCache.usage();
149-
logger.trace("Total Usage: " + cacheUsage.usage() + " , Active Usage: " + cacheUsage.activeUsage());
152+
long cacheUsage = theCache.usage();
153+
logger.trace("Total Usage: " + cacheUsage);
150154
theCache.logCurrentState();
151155
}
152156

@@ -206,16 +210,23 @@ public void restoreFromDirectory(List<Path> fileCacheDataPaths) {
206210
* Returns the current {@link FileCacheStats}
207211
*/
208212
public FileCacheStats fileCacheStats() {
209-
CacheStats stats = stats();
210-
CacheUsage usage = usage();
213+
final CacheStats stats = stats();
214+
final CacheStats.FullFileStats fullFileStats = stats.fullFileStats();
215+
211216
return new FileCacheStats(
212217
System.currentTimeMillis(),
213-
usage.activeUsage(),
218+
stats.activeUsage(),
214219
capacity(),
215-
usage.usage(),
220+
stats.usage(),
216221
stats.evictionWeight(),
217222
stats.hitCount(),
218-
stats.missCount()
223+
stats.missCount(),
224+
new FullFileCacheStats(
225+
fullFileStats.getActiveUsage(),
226+
fullFileStats.getUsage(),
227+
fullFileStats.getEvictionWeight(),
228+
fullFileStats.getHitCount()
229+
)
219230
);
220231
}
221232

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheStats.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,15 @@
1919
import java.io.IOException;
2020

2121
/**
22-
* Statistics on file cache
22+
* Statistics for the file cache system that tracks memory usage and performance metrics.
23+
* {@link FileCache} internally uses a {@link org.opensearch.index.store.remote.utils.cache.SegmentedCache}
24+
* to manage cached file data in memory segments.
25+
* This class aggregates statistics across all cache segments including:
26+
* - Memory usage (total, active, used)
27+
* - Cache performance (hits, misses, evictions)
28+
* - Utilization percentages
29+
* The statistics are exposed via {@link org.opensearch.action.admin.cluster.node.stats.NodeStats}
30+
* to provide visibility into cache behavior and performance.
2331
*
2432
* @opensearch.api
2533
*/
@@ -33,6 +41,7 @@ public class FileCacheStats implements Writeable, ToXContentFragment {
3341
private final long evicted;
3442
private final long hits;
3543
private final long misses;
44+
private final FullFileCacheStats fullFileCacheStats;
3645

3746
public FileCacheStats(
3847
final long timestamp,
@@ -41,7 +50,8 @@ public FileCacheStats(
4150
final long used,
4251
final long evicted,
4352
final long hits,
44-
final long misses
53+
final long misses,
54+
final FullFileCacheStats fullFileCacheStats
4555
) {
4656
this.timestamp = timestamp;
4757
this.active = active;
@@ -50,6 +60,7 @@ public FileCacheStats(
5060
this.evicted = evicted;
5161
this.hits = hits;
5262
this.misses = misses;
63+
this.fullFileCacheStats = fullFileCacheStats;
5364
}
5465

5566
public FileCacheStats(final StreamInput in) throws IOException {
@@ -60,6 +71,7 @@ public FileCacheStats(final StreamInput in) throws IOException {
6071
this.evicted = in.readLong();
6172
this.hits = in.readLong();
6273
this.misses = in.readLong();
74+
this.fullFileCacheStats = new FullFileCacheStats(in);
6375
}
6476

6577
public static short calculatePercentage(long used, long max) {
@@ -75,6 +87,7 @@ public void writeTo(final StreamOutput out) throws IOException {
7587
out.writeLong(evicted);
7688
out.writeLong(hits);
7789
out.writeLong(misses);
90+
if (fullFileCacheStats != null) fullFileCacheStats.writeTo(out);
7891
}
7992

8093
public long getTimestamp() {
@@ -113,6 +126,10 @@ public long getCacheMisses() {
113126
return misses;
114127
}
115128

129+
public FullFileCacheStats fullFileCacheStats() {
130+
return fullFileCacheStats;
131+
}
132+
116133
@Override
117134
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
118135
builder.startObject(Fields.FILE_CACHE);
@@ -125,6 +142,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
125142
builder.field(Fields.USED_PERCENT, getUsedPercent());
126143
builder.field(Fields.HIT_COUNT, getCacheHits());
127144
builder.field(Fields.MISS_COUNT, getCacheMisses());
145+
if (fullFileCacheStats != null) {
146+
fullFileCacheStats.toXContent(builder, params);
147+
}
128148
builder.endObject();
129149
return builder;
130150
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store.remote.filecache;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.core.common.io.stream.StreamOutput;
14+
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.core.xcontent.ToXContentFragment;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
18+
import java.io.IOException;
19+
20+
import static org.opensearch.index.store.remote.filecache.FileCacheStats.calculatePercentage;
21+
22+
/**
23+
* Statistics for the file cache system that tracks memory usage and performance metrics.
24+
* Aggregates statistics across all cache segments including:
25+
* - Memory usage: active and used bytes.
26+
* - Cache performance: hit counts and eviction counts.
27+
* - Utilization: active percentage of total used memory.
28+
* The statistics are exposed as part of {@link FileCacheStats} and via {@link org.opensearch.action.admin.cluster.node.stats.NodeStats}
29+
* to provide visibility into cache behavior and performance.
30+
*
31+
* @opensearch.api
32+
*/
33+
@ExperimentalApi
34+
public class FullFileCacheStats implements Writeable, ToXContentFragment {
35+
36+
private final long active;
37+
private final long used;
38+
private final long evicted;
39+
private final long hits;
40+
41+
public FullFileCacheStats(final long active, final long used, final long evicted, final long hits) {
42+
this.active = active;
43+
this.used = used;
44+
this.evicted = evicted;
45+
this.hits = hits;
46+
}
47+
48+
public FullFileCacheStats(final StreamInput in) throws IOException {
49+
this.active = in.readLong();
50+
this.used = in.readLong();
51+
this.evicted = in.readLong();
52+
this.hits = in.readLong();
53+
}
54+
55+
@Override
56+
public void writeTo(final StreamOutput out) throws IOException {
57+
out.writeLong(active);
58+
out.writeLong(used);
59+
out.writeLong(evicted);
60+
out.writeLong(hits);
61+
}
62+
63+
public long getActive() {
64+
return active;
65+
}
66+
67+
public long getUsed() {
68+
return used;
69+
}
70+
71+
public long getEvicted() {
72+
return evicted;
73+
}
74+
75+
public long getHits() {
76+
return hits;
77+
}
78+
79+
public short getActivePercent() {
80+
return calculatePercentage(active, used);
81+
}
82+
83+
static final class Fields {
84+
static final String FULL_FILE_STATS = "full_file_stats";
85+
static final String ACTIVE = "active";
86+
static final String ACTIVE_IN_BYTES = "active_in_bytes";
87+
static final String USED = "used";
88+
static final String USED_IN_BYTES = "used_in_bytes";
89+
static final String EVICTIONS = "evictions";
90+
static final String EVICTIONS_IN_BYTES = "evictions_in_bytes";
91+
static final String ACTIVE_PERCENT = "active_percent";
92+
static final String HIT_COUNT = "hit_count";
93+
}
94+
95+
@Override
96+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
97+
builder.startObject(Fields.FULL_FILE_STATS);
98+
builder.humanReadableField(FullFileCacheStats.Fields.ACTIVE_IN_BYTES, FullFileCacheStats.Fields.ACTIVE, getActive());
99+
builder.humanReadableField(FullFileCacheStats.Fields.USED_IN_BYTES, FullFileCacheStats.Fields.USED, getUsed());
100+
builder.humanReadableField(FullFileCacheStats.Fields.EVICTIONS_IN_BYTES, FullFileCacheStats.Fields.EVICTIONS, getEvicted());
101+
builder.field(FullFileCacheStats.Fields.ACTIVE_PERCENT, getActivePercent());
102+
builder.field(FullFileCacheStats.Fields.HIT_COUNT, getHits());
103+
builder.endObject();
104+
return builder;
105+
}
106+
}

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, Stream
111111
try {
112112
// This local file cache is ref counted and may not strictly enforce configured capacity.
113113
// If we find available capacity is exceeded, deny further BlobFetchRequests.
114-
if (fileCache.capacity() < fileCache.usage().usage()) {
114+
if (fileCache.capacity() < fileCache.usage()) {
115115
fileCache.prune();
116116
throw new IOException(
117117
"Local file cache capacity ("
118118
+ fileCache.capacity()
119119
+ ") exceeded ("
120-
+ fileCache.usage().usage()
120+
+ fileCache.usage()
121121
+ ") - BlobFetchRequest failed: "
122122
+ request.getFilePath()
123123
);

0 commit comments

Comments
 (0)