From 9379eb588a5daca7a3a6f1984ea9fc69353800a3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 27 Nov 2024 21:53:12 -0700 Subject: [PATCH 1/6] CHnage --- .../shared/SharedBlobCacheService.java | 132 ++++++++++++++++-- 1 file changed, 120 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index ed7965b85a36a..e11054d4a5a9d 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -32,8 +32,10 @@ import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.monitor.fs.FsProbe; @@ -590,6 +592,97 @@ public void maybeFetchRegion( } } + public void maybeFetchRegions( + final KeyType cacheKey, + final int firstRegion, + final int lastRegion, + final long blobLength, + final RangeMissingHandler writer, + final Executor fetchExecutor, + final ActionListener listener + ) { + ArrayList> regionsToFetch = new ArrayList<>(lastRegion + 1 - firstRegion); + for (int i = firstRegion; i <= lastRegion; i++) { + if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) { + // no free page available and no old enough unused region to be evicted + logger.info("No free regions, skipping loading regions [{}-{}]", i, lastRegion); + break; + } else { + final CacheFileRegion entry = get(cacheKey, blobLength, i); + try { + entry.incRefEnsureOpen(); + regionsToFetch.add(entry); + } catch (AlreadyClosedException e) {} + + } + } + if (regionsToFetch.isEmpty()) { + listener.onResponse(null); + return; + } + + try ( + RefCountingListener regionsListener = new RefCountingListener( + ActionListener.releaseAfter(listener, () -> regionsToFetch.forEach(AbstractRefCounted::decRef)) + ) + ) { + final List, List>> gaps = new ArrayList<>(); + for (CacheFileRegion toFetch : regionsToFetch) { + ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, toFetch.regionKey.region)); + if (regionRange.isEmpty() == false) { + List regionGaps = toFetch.tracker.waitForRange( + regionRange, + regionRange, + regionsListener.acquire() + ); + if (regionGaps.isEmpty() == false) { + gaps.add(new Tuple<>(toFetch, regionGaps)); + } + } + } + + if (gaps.isEmpty()) { + regionsListener.acquire().onResponse(null); + return; + } + final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory( + gaps.stream().flatMap(g -> g.v2().stream()).toList() + ); + logger.trace( + () -> Strings.format("fill gaps %s %s shared input stream factory", gaps, streamFactory == null ? "without" : "with") + ); + + if (streamFactory == null) { + for (Tuple, List> gapsToFetch : gaps) { + for (SparseFileTracker.Gap gap : gapsToFetch.v2()) { + fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(gap, writer, null, regionsListener.acquire())); + } + } + } else { + try ( + var sequentialGapsListener = new RefCountingListener( + ActionListener.runBefore(regionsListener.acquire(), streamFactory::close) + ) + ) { + ArrayList gapFillingTasks = new ArrayList<>(); + for (Tuple, List> gapsToFetch : gaps) { + System.err.println(gapsToFetch.v1().regionKey.region() + " " + gapsToFetch.v2()); + gapFillingTasks.addAll( + gapsToFetch.v1().gapFillingTasks(writer, sequentialGapsListener, streamFactory, gapsToFetch.v2()) + ); + } + fetchExecutor.execute(() -> { + // Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next + // gap will still be executed. + gapFillingTasks.forEach(Runnable::run); + }); + } + } + } catch (Exception e) { + listener.onFailure(e); + } + } + /** * Fetch and write in cache a range within a blob region if there is at least a free page in the cache to do so. *

@@ -936,6 +1029,17 @@ boolean tryRead(ByteBuffer buf, long offset) throws IOException { } } + List gapsToPopulate(RefCountingRunnable refs, final ByteRange rangeToWrite) { + final List gaps = tracker.waitForRange( + rangeToWrite, + rangeToWrite, + Assertions.ENABLED ? ActionListener.releaseAfter(ActionListener.running(() -> { + assert blobCacheService.regionOwners.get(nonVolatileIO()) == this; + }), refs.acquire()) : refs.acquireListener() + ); + return gaps; + } + /** * Populates a range in cache if the range is not available nor pending to be available in cache. * @@ -991,19 +1095,13 @@ void populate( } else { try ( var sequentialGapsListener = new RefCountingListener( - ActionListener.runBefore(listener.map(unused -> true), streamFactory::close) + ActionListener.runBefore( + listener.map(unused -> true), + () -> Releasables.close(refs.acquire(), streamFactory::close) + ) ) ) { - final List gapFillingTasks = gaps.stream() - .map( - gap -> fillGapRunnable( - gap, - writer, - streamFactory, - ActionListener.releaseAfter(sequentialGapsListener.acquire(), refs.acquire()) - ) - ) - .toList(); + List gapFillingTasks = gapFillingTasks(writer, sequentialGapsListener, streamFactory, gaps); executor.execute(() -> { // Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next // gap will still be executed. @@ -1017,6 +1115,15 @@ void populate( } } + List gapFillingTasks( + RangeMissingHandler writer, + RefCountingListener sequentialGapsListener, + SourceInputStreamFactory streamFactory, + List gaps + ) { + return gaps.stream().map(gap -> fillGapRunnable(gap, writer, streamFactory, sequentialGapsListener.acquire())).toList(); + } + void populateAndRead( final ByteRange rangeToWrite, final ByteRange rangeToRead, @@ -1719,7 +1826,8 @@ private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) { } SharedBytes.IO io = entry.chunk.nonVolatileIO(); assert io != null || entry.chunk.isEvicted(); - assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted(); + assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted() + : io + " " + regionOwners.get(io) + " " + entry.chunk + " " + entry.chunk.isEvicted(); return true; } From 369a3720b49331446b8bbbf552cffccbbe8d5e90 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 10 Dec 2024 17:56:23 -0700 Subject: [PATCH 2/6] WIP --- .../shared/SharedBlobCacheService.java | 72 ++++++++++++------- .../shared/SharedBlobCacheServiceTests.java | 2 +- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index e11054d4a5a9d..0529455f2e1d9 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -626,7 +626,7 @@ public void maybeFetchRegions( ActionListener.releaseAfter(listener, () -> regionsToFetch.forEach(AbstractRefCounted::decRef)) ) ) { - final List, List>> gaps = new ArrayList<>(); + final List, RegionGaps>> gaps = new ArrayList<>(); for (CacheFileRegion toFetch : regionsToFetch) { ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, toFetch.regionKey.region)); if (regionRange.isEmpty() == false) { @@ -636,7 +636,7 @@ public void maybeFetchRegions( regionsListener.acquire() ); if (regionGaps.isEmpty() == false) { - gaps.add(new Tuple<>(toFetch, regionGaps)); + gaps.add(new Tuple<>(toFetch, new RegionGaps(toFetch.regionKey.region(), regionGaps))); } } } @@ -645,16 +645,14 @@ public void maybeFetchRegions( regionsListener.acquire().onResponse(null); return; } - final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory( - gaps.stream().flatMap(g -> g.v2().stream()).toList() - ); + final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps.stream().map(Tuple::v2).toList()); logger.trace( () -> Strings.format("fill gaps %s %s shared input stream factory", gaps, streamFactory == null ? "without" : "with") ); if (streamFactory == null) { - for (Tuple, List> gapsToFetch : gaps) { - for (SparseFileTracker.Gap gap : gapsToFetch.v2()) { + for (Tuple, RegionGaps> gapsToFetch : gaps) { + for (SparseFileTracker.Gap gap : gapsToFetch.v2().gaps()) { fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(gap, writer, null, regionsListener.acquire())); } } @@ -665,10 +663,10 @@ public void maybeFetchRegions( ) ) { ArrayList gapFillingTasks = new ArrayList<>(); - for (Tuple, List> gapsToFetch : gaps) { - System.err.println(gapsToFetch.v1().regionKey.region() + " " + gapsToFetch.v2()); + for (Tuple, RegionGaps> gapsToFetch : gaps) { + int offset = (gapsToFetch.v1().regionKey.region() - firstRegion) * regionSize; gapFillingTasks.addAll( - gapsToFetch.v1().gapFillingTasks(writer, sequentialGapsListener, streamFactory, gapsToFetch.v2()) + gapsToFetch.v1().gapFillingTasks(offset, writer, sequentialGapsListener, streamFactory, gapsToFetch.v2().gaps()) ); } fetchExecutor.execute(() -> { @@ -1071,7 +1069,9 @@ void populate( listener.onResponse(false); return; } - final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps); + final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory( + new RegionGaps(regionKey.region(), gaps) + ); logger.trace( () -> Strings.format( "fill gaps %s %s shared input stream factory", @@ -1101,7 +1101,7 @@ void populate( ) ) ) { - List gapFillingTasks = gapFillingTasks(writer, sequentialGapsListener, streamFactory, gaps); + List gapFillingTasks = gapFillingTasks(0, writer, sequentialGapsListener, streamFactory, gaps); executor.execute(() -> { // Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next // gap will still be executed. @@ -1115,15 +1115,6 @@ void populate( } } - List gapFillingTasks( - RangeMissingHandler writer, - RefCountingListener sequentialGapsListener, - SourceInputStreamFactory streamFactory, - List gaps - ) { - return gaps.stream().map(gap -> fillGapRunnable(gap, writer, streamFactory, sequentialGapsListener.acquire())).toList(); - } - void populateAndRead( final ByteRange rangeToWrite, final ByteRange rangeToRead, @@ -1157,7 +1148,9 @@ void populateAndRead( ); if (gaps.isEmpty() == false) { - final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps); + final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory( + new RegionGaps(regionKey.region(), gaps) + ); logger.trace( () -> Strings.format( "fill gaps %s %s shared input stream factory", @@ -1189,7 +1182,29 @@ void populateAndRead( } } + List gapFillingTasks( + int gapOffset, + RangeMissingHandler writer, + RefCountingListener sequentialGapsListener, + SourceInputStreamFactory streamFactory, + List gaps + ) { + return gaps.stream() + .map(gap -> fillGapRunnable(gapOffset, gap, writer, streamFactory, sequentialGapsListener.acquire())) + .toList(); + } + + private Runnable fillGapRunnable( + SparseFileTracker.Gap gap, + RangeMissingHandler writer, + @Nullable SourceInputStreamFactory streamFactory, + ActionListener listener + ) { + return fillGapRunnable(0, gap, writer, streamFactory, listener); + } + private Runnable fillGapRunnable( + int gapOffset, SparseFileTracker.Gap gap, RangeMissingHandler writer, @Nullable SourceInputStreamFactory streamFactory, @@ -1204,7 +1219,7 @@ private Runnable fillGapRunnable( ioRef, start, streamFactory, - start, + gapOffset + start, Math.toIntExact(gap.end() - start), progress -> gap.onProgress(start + progress), l.map(unused -> { @@ -1233,6 +1248,8 @@ protected void alreadyClosed() { } } + public record RegionGaps(int region, List gaps) {} + public class CacheFile { private final KeyType cacheKey; @@ -1510,6 +1527,11 @@ public interface RangeAvailableHandler { @FunctionalInterface public interface RangeMissingHandler { + + default SourceInputStreamFactory sharedInputStreamFactory(RegionGaps gaps) { + return sharedInputStreamFactory(List.of(gaps)); + } + /** * Attempt to get a shared {@link SourceInputStreamFactory} for the given list of Gaps so that all of them * can be filled from the input stream created from the factory. If a factory is returned, the gaps must be @@ -1520,7 +1542,7 @@ public interface RangeMissingHandler { * its own input stream. */ @Nullable - default SourceInputStreamFactory sharedInputStreamFactory(List gaps) { + default SourceInputStreamFactory sharedInputStreamFactory(List gaps) { return null; } @@ -1570,7 +1592,7 @@ protected DelegatingRangeMissingHandler(RangeMissingHandler delegate) { } @Override - public SourceInputStreamFactory sharedInputStreamFactory(List gaps) { + public SourceInputStreamFactory sharedInputStreamFactory(RegionGaps gaps) { return delegate.sharedInputStreamFactory(gaps); } diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index b6f5b550aea90..1d281eeaaaac3 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -1489,7 +1489,7 @@ public void close() { final AtomicInteger position = new AtomicInteger(-1); @Override - public SourceInputStreamFactory sharedInputStreamFactory(List gaps) { + public SourceInputStreamFactory sharedInputStreamFactory(SharedBlobCacheService.RegionGaps gaps) { return dummyStreamFactory; } From 19deaacc43dda51aef2465629d6bde2ba0362098 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 11 Dec 2024 12:28:23 -0700 Subject: [PATCH 3/6] Wip --- .../blobcache/shared/SharedBlobCacheService.java | 13 +++++++------ .../shared/SharedBlobCacheServiceTests.java | 1 - 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 0529455f2e1d9..70cda117da85d 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -626,9 +626,10 @@ public void maybeFetchRegions( ActionListener.releaseAfter(listener, () -> regionsToFetch.forEach(AbstractRefCounted::decRef)) ) ) { - final List, RegionGaps>> gaps = new ArrayList<>(); + final List, RegionGaps>> gaps = new ArrayList<>(regionsToFetch.size()); for (CacheFileRegion toFetch : regionsToFetch) { - ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, toFetch.regionKey.region)); + int region = toFetch.regionKey.region(); + ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, region)); if (regionRange.isEmpty() == false) { List regionGaps = toFetch.tracker.waitForRange( regionRange, @@ -636,7 +637,7 @@ public void maybeFetchRegions( regionsListener.acquire() ); if (regionGaps.isEmpty() == false) { - gaps.add(new Tuple<>(toFetch, new RegionGaps(toFetch.regionKey.region(), regionGaps))); + gaps.add(new Tuple<>(toFetch, new RegionGaps((long) region * regionSize, regionGaps))); } } } @@ -1070,7 +1071,7 @@ void populate( return; } final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory( - new RegionGaps(regionKey.region(), gaps) + new RegionGaps((long) regionKey.region() * blobCacheService.getRegionSize(), gaps) ); logger.trace( () -> Strings.format( @@ -1149,7 +1150,7 @@ void populateAndRead( if (gaps.isEmpty() == false) { final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory( - new RegionGaps(regionKey.region(), gaps) + new RegionGaps((long) regionKey.region() * blobCacheService.getRegionSize(), gaps) ); logger.trace( () -> Strings.format( @@ -1248,7 +1249,7 @@ protected void alreadyClosed() { } } - public record RegionGaps(int region, List gaps) {} + public record RegionGaps(long regionOffset, List gaps) {} public class CacheFile { diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index 1d281eeaaaac3..5784901b484c0 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.blobcache.BlobCacheMetrics; import org.elasticsearch.blobcache.BlobCacheUtils; import org.elasticsearch.blobcache.common.ByteRange; -import org.elasticsearch.blobcache.common.SparseFileTracker; import org.elasticsearch.blobcache.shared.SharedBlobCacheService.RangeMissingHandler; import org.elasticsearch.blobcache.shared.SharedBlobCacheService.SourceInputStreamFactory; import org.elasticsearch.cluster.node.DiscoveryNodeRole; From 38ea3c0e5cdbdb81590af6a17cc918759e1b4a66 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 11 Dec 2024 15:42:17 -0700 Subject: [PATCH 4/6] Change --- .../shared/SharedBlobCacheService.java | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 70cda117da85d..0967bea94c6a3 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -592,6 +592,26 @@ public void maybeFetchRegion( } } + /** + * Fetch and write in cache multiple region of a blob if there are enough free pages in the cache to do so. + *

+ * This method returns as soon as the download tasks are instantiated, but the tasks themselves + * are run on the bulk executor. + *

+ * If an exception is thrown from the writer then the cache entry being downloaded is freed + * and unlinked + * + * @param cacheKey the key to fetch data for + * @param firstRegion the first region of the blob to fetch + * @param lastRegion the last region of the blob to fetch + * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region) + * @param writer a writer that handles writing of newly downloaded data to the shared cache + * @param fetchExecutor an executor to use for reading from the blob store + * @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in + * which case the data is available in cache. The listener is completed with {@code false} in every other cases: if + * the region to write is already available in cache, if the region is pending fetching via another thread or if + * there is not enough free pages to fetch the region. + */ public void maybeFetchRegions( final KeyType cacheKey, final int firstRegion, @@ -601,7 +621,8 @@ public void maybeFetchRegions( final Executor fetchExecutor, final ActionListener listener ) { - ArrayList> regionsToFetch = new ArrayList<>(lastRegion + 1 - firstRegion); + int regionCount = lastRegion + 1 - firstRegion; + ArrayList> regionsToFetch = new ArrayList<>(regionCount); for (int i = firstRegion; i <= lastRegion; i++) { if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) { // no free page available and no old enough unused region to be evicted @@ -612,7 +633,9 @@ public void maybeFetchRegions( try { entry.incRefEnsureOpen(); regionsToFetch.add(entry); - } catch (AlreadyClosedException e) {} + } catch (AlreadyClosedException e) { + // Fine to ignore. It is only added to the list after we have inc() + } } } @@ -648,7 +671,11 @@ public void maybeFetchRegions( } final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps.stream().map(Tuple::v2).toList()); logger.trace( - () -> Strings.format("fill gaps %s %s shared input stream factory", gaps, streamFactory == null ? "without" : "with") + () -> Strings.format( + "fill regions gaps %s %s shared input stream factory", + gaps, + streamFactory == null ? "without" : "with" + ) ); if (streamFactory == null) { @@ -658,10 +685,9 @@ public void maybeFetchRegions( } } } else { + var regionsToRelease = regionsListener.acquire(); try ( - var sequentialGapsListener = new RefCountingListener( - ActionListener.runBefore(regionsListener.acquire(), streamFactory::close) - ) + var sequentialGapsListener = new RefCountingListener(ActionListener.runBefore(regionsToRelease, streamFactory::close)) ) { ArrayList gapFillingTasks = new ArrayList<>(); for (Tuple, RegionGaps> gapsToFetch : gaps) { @@ -678,6 +704,7 @@ public void maybeFetchRegions( } } } catch (Exception e) { + // TODO: Double call? listener.onFailure(e); } } @@ -1028,17 +1055,6 @@ boolean tryRead(ByteBuffer buf, long offset) throws IOException { } } - List gapsToPopulate(RefCountingRunnable refs, final ByteRange rangeToWrite) { - final List gaps = tracker.waitForRange( - rangeToWrite, - rangeToWrite, - Assertions.ENABLED ? ActionListener.releaseAfter(ActionListener.running(() -> { - assert blobCacheService.regionOwners.get(nonVolatileIO()) == this; - }), refs.acquire()) : refs.acquireListener() - ); - return gaps; - } - /** * Populates a range in cache if the range is not available nor pending to be available in cache. * @@ -1094,11 +1110,12 @@ void populate( } } } else { + var refsToRelease = refs.acquire(); try ( var sequentialGapsListener = new RefCountingListener( ActionListener.runBefore( listener.map(unused -> true), - () -> Releasables.close(refs.acquire(), streamFactory::close) + () -> Releasables.close(refsToRelease, streamFactory::close) ) ) ) { @@ -1593,7 +1610,7 @@ protected DelegatingRangeMissingHandler(RangeMissingHandler delegate) { } @Override - public SourceInputStreamFactory sharedInputStreamFactory(RegionGaps gaps) { + public SourceInputStreamFactory sharedInputStreamFactory(List gaps) { return delegate.sharedInputStreamFactory(gaps); } @@ -1849,8 +1866,7 @@ private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) { } SharedBytes.IO io = entry.chunk.nonVolatileIO(); assert io != null || entry.chunk.isEvicted(); - assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted() - : io + " " + regionOwners.get(io) + " " + entry.chunk + " " + entry.chunk.isEvicted(); + assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted(); return true; } From d08af89d28a3e461e1253552e24106d598a8d5b7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 11 Dec 2024 17:29:07 -0700 Subject: [PATCH 5/6] Fixes --- .../shared/SharedBlobCacheService.java | 104 +++++++++--------- 1 file changed, 54 insertions(+), 50 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 0967bea94c6a3..7d14ffa8da04f 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -629,8 +629,8 @@ public void maybeFetchRegions( logger.info("No free regions, skipping loading regions [{}-{}]", i, lastRegion); break; } else { - final CacheFileRegion entry = get(cacheKey, blobLength, i); try { + final CacheFileRegion entry = get(cacheKey, blobLength, i); entry.incRefEnsureOpen(); regionsToFetch.add(entry); } catch (AlreadyClosedException e) { @@ -649,63 +649,67 @@ public void maybeFetchRegions( ActionListener.releaseAfter(listener, () -> regionsToFetch.forEach(AbstractRefCounted::decRef)) ) ) { - final List, RegionGaps>> gaps = new ArrayList<>(regionsToFetch.size()); - for (CacheFileRegion toFetch : regionsToFetch) { - int region = toFetch.regionKey.region(); - ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, region)); - if (regionRange.isEmpty() == false) { - List regionGaps = toFetch.tracker.waitForRange( - regionRange, - regionRange, - regionsListener.acquire() - ); - if (regionGaps.isEmpty() == false) { - gaps.add(new Tuple<>(toFetch, new RegionGaps((long) region * regionSize, regionGaps))); + try { + final List, RegionGaps>> gaps = new ArrayList<>(regionsToFetch.size()); + for (CacheFileRegion toFetch : regionsToFetch) { + int region = toFetch.regionKey.region(); + ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, region)); + if (regionRange.isEmpty() == false) { + List regionGaps = toFetch.tracker.waitForRange( + regionRange, + regionRange, + regionsListener.acquire() + ); + if (regionGaps.isEmpty() == false) { + gaps.add(new Tuple<>(toFetch, new RegionGaps((long) region * regionSize, regionGaps))); + } } } - } - if (gaps.isEmpty()) { - regionsListener.acquire().onResponse(null); - return; - } - final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps.stream().map(Tuple::v2).toList()); - logger.trace( - () -> Strings.format( - "fill regions gaps %s %s shared input stream factory", - gaps, - streamFactory == null ? "without" : "with" - ) - ); - - if (streamFactory == null) { - for (Tuple, RegionGaps> gapsToFetch : gaps) { - for (SparseFileTracker.Gap gap : gapsToFetch.v2().gaps()) { - fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(gap, writer, null, regionsListener.acquire())); - } + if (gaps.isEmpty()) { + regionsListener.acquire().onResponse(null); + return; } - } else { - var regionsToRelease = regionsListener.acquire(); - try ( - var sequentialGapsListener = new RefCountingListener(ActionListener.runBefore(regionsToRelease, streamFactory::close)) - ) { - ArrayList gapFillingTasks = new ArrayList<>(); + final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps.stream().map(Tuple::v2).toList()); + logger.trace( + () -> Strings.format( + "fill regions gaps %s %s shared input stream factory", + gaps, + streamFactory == null ? "without" : "with" + ) + ); + + if (streamFactory == null) { for (Tuple, RegionGaps> gapsToFetch : gaps) { - int offset = (gapsToFetch.v1().regionKey.region() - firstRegion) * regionSize; - gapFillingTasks.addAll( - gapsToFetch.v1().gapFillingTasks(offset, writer, sequentialGapsListener, streamFactory, gapsToFetch.v2().gaps()) - ); + for (SparseFileTracker.Gap gap : gapsToFetch.v2().gaps()) { + fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(gap, writer, null, regionsListener.acquire())); + } + } + } else { + var regionsToRelease = regionsListener.acquire(); + try ( + var sequentialGapsListener = new RefCountingListener( + ActionListener.runBefore(regionsToRelease, streamFactory::close) + ) + ) { + ArrayList gapFillingTasks = new ArrayList<>(); + for (Tuple, RegionGaps> gapsToFetch : gaps) { + int offset = (gapsToFetch.v1().regionKey.region() - firstRegion) * regionSize; + gapFillingTasks.addAll( + gapsToFetch.v1() + .gapFillingTasks(offset, writer, sequentialGapsListener, streamFactory, gapsToFetch.v2().gaps()) + ); + } + fetchExecutor.execute(() -> { + // Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next + // gap will still be executed. + gapFillingTasks.forEach(Runnable::run); + }); } - fetchExecutor.execute(() -> { - // Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next - // gap will still be executed. - gapFillingTasks.forEach(Runnable::run); - }); } + } catch (Exception e) { + regionsListener.acquire().onFailure(e); } - } catch (Exception e) { - // TODO: Double call? - listener.onFailure(e); } } From af68a83f02c9da93d6a3aa5c2c76954d7f4b217c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 11 Dec 2024 21:28:16 -0700 Subject: [PATCH 6/6] Fix --- .../blobcache/shared/SharedBlobCacheService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 7d14ffa8da04f..b0e3ad5e61d0b 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -682,7 +682,8 @@ public void maybeFetchRegions( if (streamFactory == null) { for (Tuple, RegionGaps> gapsToFetch : gaps) { for (SparseFileTracker.Gap gap : gapsToFetch.v2().gaps()) { - fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(gap, writer, null, regionsListener.acquire())); + int offset = Math.toIntExact(gapsToFetch.v2().regionOffset()); + fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(offset, gap, writer, null, regionsListener.acquire())); } } } else { @@ -694,7 +695,7 @@ public void maybeFetchRegions( ) { ArrayList gapFillingTasks = new ArrayList<>(); for (Tuple, RegionGaps> gapsToFetch : gaps) { - int offset = (gapsToFetch.v1().regionKey.region() - firstRegion) * regionSize; + int offset = Math.toIntExact(gapsToFetch.v2().regionOffset()); gapFillingTasks.addAll( gapsToFetch.v1() .gapFillingTasks(offset, writer, sequentialGapsListener, streamFactory, gapsToFetch.v2().gaps())