diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java
index ed7965b85a36a..b0e3ad5e61d0b 100644
--- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java
+++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java
@@ -32,8 +32,10 @@
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.monitor.fs.FsProbe;
@@ -590,6 +592,128 @@ public void maybeFetchRegion(
}
}
+ /**
+ * Fetch and write in cache multiple region of a blob if there are enough free pages in the cache to do so.
+ *
+ * This method returns as soon as the download tasks are instantiated, but the tasks themselves
+ * are run on the bulk executor.
+ *
+ * If an exception is thrown from the writer then the cache entry being downloaded is freed
+ * and unlinked
+ *
+ * @param cacheKey the key to fetch data for
+ * @param firstRegion the first region of the blob to fetch
+ * @param lastRegion the last region of the blob to fetch
+ * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
+ * @param writer a writer that handles writing of newly downloaded data to the shared cache
+ * @param fetchExecutor an executor to use for reading from the blob store
+ * @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
+ * which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
+ * the region to write is already available in cache, if the region is pending fetching via another thread or if
+ * there is not enough free pages to fetch the region.
+ */
+ public void maybeFetchRegions(
+ final KeyType cacheKey,
+ final int firstRegion,
+ final int lastRegion,
+ final long blobLength,
+ final RangeMissingHandler writer,
+ final Executor fetchExecutor,
+ final ActionListener listener
+ ) {
+ int regionCount = lastRegion + 1 - firstRegion;
+ ArrayList> regionsToFetch = new ArrayList<>(regionCount);
+ for (int i = firstRegion; i <= lastRegion; i++) {
+ if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) {
+ // no free page available and no old enough unused region to be evicted
+ logger.info("No free regions, skipping loading regions [{}-{}]", i, lastRegion);
+ break;
+ } else {
+ try {
+ final CacheFileRegion entry = get(cacheKey, blobLength, i);
+ entry.incRefEnsureOpen();
+ regionsToFetch.add(entry);
+ } catch (AlreadyClosedException e) {
+ // Fine to ignore. It is only added to the list after we have inc()
+ }
+
+ }
+ }
+ if (regionsToFetch.isEmpty()) {
+ listener.onResponse(null);
+ return;
+ }
+
+ try (
+ RefCountingListener regionsListener = new RefCountingListener(
+ ActionListener.releaseAfter(listener, () -> regionsToFetch.forEach(AbstractRefCounted::decRef))
+ )
+ ) {
+ try {
+ final List, RegionGaps>> gaps = new ArrayList<>(regionsToFetch.size());
+ for (CacheFileRegion toFetch : regionsToFetch) {
+ int region = toFetch.regionKey.region();
+ ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, region));
+ if (regionRange.isEmpty() == false) {
+ List regionGaps = toFetch.tracker.waitForRange(
+ regionRange,
+ regionRange,
+ regionsListener.acquire()
+ );
+ if (regionGaps.isEmpty() == false) {
+ gaps.add(new Tuple<>(toFetch, new RegionGaps((long) region * regionSize, regionGaps)));
+ }
+ }
+ }
+
+ if (gaps.isEmpty()) {
+ regionsListener.acquire().onResponse(null);
+ return;
+ }
+ final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps.stream().map(Tuple::v2).toList());
+ logger.trace(
+ () -> Strings.format(
+ "fill regions gaps %s %s shared input stream factory",
+ gaps,
+ streamFactory == null ? "without" : "with"
+ )
+ );
+
+ if (streamFactory == null) {
+ for (Tuple, RegionGaps> gapsToFetch : gaps) {
+ for (SparseFileTracker.Gap gap : gapsToFetch.v2().gaps()) {
+ int offset = Math.toIntExact(gapsToFetch.v2().regionOffset());
+ fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(offset, gap, writer, null, regionsListener.acquire()));
+ }
+ }
+ } else {
+ var regionsToRelease = regionsListener.acquire();
+ try (
+ var sequentialGapsListener = new RefCountingListener(
+ ActionListener.runBefore(regionsToRelease, streamFactory::close)
+ )
+ ) {
+ ArrayList gapFillingTasks = new ArrayList<>();
+ for (Tuple, RegionGaps> gapsToFetch : gaps) {
+ int offset = Math.toIntExact(gapsToFetch.v2().regionOffset());
+ gapFillingTasks.addAll(
+ gapsToFetch.v1()
+ .gapFillingTasks(offset, writer, sequentialGapsListener, streamFactory, gapsToFetch.v2().gaps())
+ );
+ }
+ fetchExecutor.execute(() -> {
+ // Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next
+ // gap will still be executed.
+ gapFillingTasks.forEach(Runnable::run);
+ });
+ }
+ }
+ } catch (Exception e) {
+ regionsListener.acquire().onFailure(e);
+ }
+ }
+ }
+
/**
* Fetch and write in cache a range within a blob region if there is at least a free page in the cache to do so.
*
@@ -967,7 +1091,9 @@ void populate(
listener.onResponse(false);
return;
}
- final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps);
+ final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(
+ new RegionGaps((long) regionKey.region() * blobCacheService.getRegionSize(), gaps)
+ );
logger.trace(
() -> Strings.format(
"fill gaps %s %s shared input stream factory",
@@ -989,21 +1115,16 @@ void populate(
}
}
} else {
+ var refsToRelease = refs.acquire();
try (
var sequentialGapsListener = new RefCountingListener(
- ActionListener.runBefore(listener.map(unused -> true), streamFactory::close)
+ ActionListener.runBefore(
+ listener.map(unused -> true),
+ () -> Releasables.close(refsToRelease, streamFactory::close)
+ )
)
) {
- final List gapFillingTasks = gaps.stream()
- .map(
- gap -> fillGapRunnable(
- gap,
- writer,
- streamFactory,
- ActionListener.releaseAfter(sequentialGapsListener.acquire(), refs.acquire())
- )
- )
- .toList();
+ List gapFillingTasks = gapFillingTasks(0, writer, sequentialGapsListener, streamFactory, gaps);
executor.execute(() -> {
// Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next
// gap will still be executed.
@@ -1050,7 +1171,9 @@ void populateAndRead(
);
if (gaps.isEmpty() == false) {
- final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps);
+ final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(
+ new RegionGaps((long) regionKey.region() * blobCacheService.getRegionSize(), gaps)
+ );
logger.trace(
() -> Strings.format(
"fill gaps %s %s shared input stream factory",
@@ -1082,7 +1205,29 @@ void populateAndRead(
}
}
+ List gapFillingTasks(
+ int gapOffset,
+ RangeMissingHandler writer,
+ RefCountingListener sequentialGapsListener,
+ SourceInputStreamFactory streamFactory,
+ List gaps
+ ) {
+ return gaps.stream()
+ .map(gap -> fillGapRunnable(gapOffset, gap, writer, streamFactory, sequentialGapsListener.acquire()))
+ .toList();
+ }
+
+ private Runnable fillGapRunnable(
+ SparseFileTracker.Gap gap,
+ RangeMissingHandler writer,
+ @Nullable SourceInputStreamFactory streamFactory,
+ ActionListener listener
+ ) {
+ return fillGapRunnable(0, gap, writer, streamFactory, listener);
+ }
+
private Runnable fillGapRunnable(
+ int gapOffset,
SparseFileTracker.Gap gap,
RangeMissingHandler writer,
@Nullable SourceInputStreamFactory streamFactory,
@@ -1097,7 +1242,7 @@ private Runnable fillGapRunnable(
ioRef,
start,
streamFactory,
- start,
+ gapOffset + start,
Math.toIntExact(gap.end() - start),
progress -> gap.onProgress(start + progress),
l.map(unused -> {
@@ -1126,6 +1271,8 @@ protected void alreadyClosed() {
}
}
+ public record RegionGaps(long regionOffset, List gaps) {}
+
public class CacheFile {
private final KeyType cacheKey;
@@ -1403,6 +1550,11 @@ public interface RangeAvailableHandler {
@FunctionalInterface
public interface RangeMissingHandler {
+
+ default SourceInputStreamFactory sharedInputStreamFactory(RegionGaps gaps) {
+ return sharedInputStreamFactory(List.of(gaps));
+ }
+
/**
* Attempt to get a shared {@link SourceInputStreamFactory} for the given list of Gaps so that all of them
* can be filled from the input stream created from the factory. If a factory is returned, the gaps must be
@@ -1413,7 +1565,7 @@ public interface RangeMissingHandler {
* its own input stream.
*/
@Nullable
- default SourceInputStreamFactory sharedInputStreamFactory(List gaps) {
+ default SourceInputStreamFactory sharedInputStreamFactory(List gaps) {
return null;
}
@@ -1463,7 +1615,7 @@ protected DelegatingRangeMissingHandler(RangeMissingHandler delegate) {
}
@Override
- public SourceInputStreamFactory sharedInputStreamFactory(List gaps) {
+ public SourceInputStreamFactory sharedInputStreamFactory(List gaps) {
return delegate.sharedInputStreamFactory(gaps);
}
diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java
index b6f5b550aea90..5784901b484c0 100644
--- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java
+++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java
@@ -14,7 +14,6 @@
import org.elasticsearch.blobcache.BlobCacheMetrics;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.blobcache.common.ByteRange;
-import org.elasticsearch.blobcache.common.SparseFileTracker;
import org.elasticsearch.blobcache.shared.SharedBlobCacheService.RangeMissingHandler;
import org.elasticsearch.blobcache.shared.SharedBlobCacheService.SourceInputStreamFactory;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -1489,7 +1488,7 @@ public void close() {
final AtomicInteger position = new AtomicInteger(-1);
@Override
- public SourceInputStreamFactory sharedInputStreamFactory(List gaps) {
+ public SourceInputStreamFactory sharedInputStreamFactory(SharedBlobCacheService.RegionGaps gaps) {
return dummyStreamFactory;
}