-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Support reading multiple regions with single fetch #118496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9379eb5
f4bec91
369a372
19deaac
5562861
38ea3c0
d08af89
af68a83
ec87064
ad35c29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,8 +32,10 @@ | |
import org.elasticsearch.core.Assertions; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.core.Releasable; | ||
import org.elasticsearch.core.Releasables; | ||
import org.elasticsearch.core.Strings; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.core.Tuple; | ||
import org.elasticsearch.env.Environment; | ||
import org.elasticsearch.env.NodeEnvironment; | ||
import org.elasticsearch.monitor.fs.FsProbe; | ||
|
@@ -590,6 +592,128 @@ public void maybeFetchRegion( | |
} | ||
} | ||
|
||
/** | ||
* Fetch and write in cache multiple region of a blob if there are enough free pages in the cache to do so. | ||
* <p> | ||
* This method returns as soon as the download tasks are instantiated, but the tasks themselves | ||
* are run on the bulk executor. | ||
* <p> | ||
* If an exception is thrown from the writer then the cache entry being downloaded is freed | ||
* and unlinked | ||
* | ||
* @param cacheKey the key to fetch data for | ||
* @param firstRegion the first region of the blob to fetch | ||
* @param lastRegion the last region of the blob to fetch | ||
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region) | ||
* @param writer a writer that handles writing of newly downloaded data to the shared cache | ||
* @param fetchExecutor an executor to use for reading from the blob store | ||
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in | ||
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if | ||
* the region to write is already available in cache, if the region is pending fetching via another thread or if | ||
* there is not enough free pages to fetch the region. | ||
*/ | ||
public void maybeFetchRegions( | ||
final KeyType cacheKey, | ||
final int firstRegion, | ||
final int lastRegion, | ||
final long blobLength, | ||
final RangeMissingHandler writer, | ||
final Executor fetchExecutor, | ||
final ActionListener<Void> listener | ||
) { | ||
int regionCount = lastRegion + 1 - firstRegion; | ||
ArrayList<CacheFileRegion<KeyType>> regionsToFetch = new ArrayList<>(regionCount); | ||
for (int i = firstRegion; i <= lastRegion; i++) { | ||
if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) { | ||
// no free page available and no old enough unused region to be evicted | ||
logger.info("No free regions, skipping loading regions [{}-{}]", i, lastRegion); | ||
break; | ||
} else { | ||
try { | ||
final CacheFileRegion<KeyType> entry = get(cacheKey, blobLength, i); | ||
entry.incRefEnsureOpen(); | ||
regionsToFetch.add(entry); | ||
} catch (AlreadyClosedException e) { | ||
// Fine to ignore. It is only added to the list after we have inc() | ||
} | ||
|
||
} | ||
} | ||
if (regionsToFetch.isEmpty()) { | ||
listener.onResponse(null); | ||
return; | ||
} | ||
|
||
try ( | ||
RefCountingListener regionsListener = new RefCountingListener( | ||
ActionListener.releaseAfter(listener, () -> regionsToFetch.forEach(AbstractRefCounted::decRef)) | ||
) | ||
) { | ||
try { | ||
final List<Tuple<CacheFileRegion<KeyType>, RegionGaps>> gaps = new ArrayList<>(regionsToFetch.size()); | ||
for (CacheFileRegion<KeyType> toFetch : regionsToFetch) { | ||
int region = toFetch.regionKey.region(); | ||
ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, region)); | ||
if (regionRange.isEmpty() == false) { | ||
List<SparseFileTracker.Gap> regionGaps = toFetch.tracker.waitForRange( | ||
regionRange, | ||
regionRange, | ||
regionsListener.acquire() | ||
); | ||
if (regionGaps.isEmpty() == false) { | ||
gaps.add(new Tuple<>(toFetch, new RegionGaps((long) region * regionSize, regionGaps))); | ||
} | ||
} | ||
} | ||
|
||
if (gaps.isEmpty()) { | ||
regionsListener.acquire().onResponse(null); | ||
return; | ||
} | ||
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps.stream().map(Tuple::v2).toList()); | ||
logger.trace( | ||
() -> Strings.format( | ||
"fill regions gaps %s %s shared input stream factory", | ||
gaps, | ||
streamFactory == null ? "without" : "with" | ||
) | ||
); | ||
|
||
if (streamFactory == null) { | ||
for (Tuple<CacheFileRegion<KeyType>, RegionGaps> gapsToFetch : gaps) { | ||
for (SparseFileTracker.Gap gap : gapsToFetch.v2().gaps()) { | ||
int offset = Math.toIntExact(gapsToFetch.v2().regionOffset()); | ||
fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(offset, gap, writer, null, regionsListener.acquire())); | ||
} | ||
} | ||
} else { | ||
var regionsToRelease = regionsListener.acquire(); | ||
try ( | ||
var sequentialGapsListener = new RefCountingListener( | ||
ActionListener.runBefore(regionsToRelease, streamFactory::close) | ||
) | ||
) { | ||
ArrayList<Runnable> gapFillingTasks = new ArrayList<>(); | ||
for (Tuple<CacheFileRegion<KeyType>, RegionGaps> gapsToFetch : gaps) { | ||
int offset = Math.toIntExact(gapsToFetch.v2().regionOffset()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems unsafe, I think we need this offset to be a long and this has to pass through all the way to |
||
gapFillingTasks.addAll( | ||
gapsToFetch.v1() | ||
.gapFillingTasks(offset, writer, sequentialGapsListener, streamFactory, gapsToFetch.v2().gaps()) | ||
); | ||
} | ||
fetchExecutor.execute(() -> { | ||
// Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next | ||
// gap will still be executed. | ||
gapFillingTasks.forEach(Runnable::run); | ||
}); | ||
} | ||
} | ||
} catch (Exception e) { | ||
regionsListener.acquire().onFailure(e); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Fetch and write in cache a range within a blob region if there is at least a free page in the cache to do so. | ||
* <p> | ||
|
@@ -967,7 +1091,9 @@ void populate( | |
listener.onResponse(false); | ||
return; | ||
} | ||
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps); | ||
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory( | ||
new RegionGaps((long) regionKey.region() * blobCacheService.getRegionSize(), gaps) | ||
); | ||
logger.trace( | ||
() -> Strings.format( | ||
"fill gaps %s %s shared input stream factory", | ||
|
@@ -989,21 +1115,16 @@ void populate( | |
} | ||
} | ||
} else { | ||
var refsToRelease = refs.acquire(); | ||
try ( | ||
var sequentialGapsListener = new RefCountingListener( | ||
ActionListener.runBefore(listener.map(unused -> true), streamFactory::close) | ||
ActionListener.runBefore( | ||
listener.map(unused -> true), | ||
() -> Releasables.close(refsToRelease, streamFactory::close) | ||
) | ||
) | ||
) { | ||
final List<Runnable> gapFillingTasks = gaps.stream() | ||
.map( | ||
gap -> fillGapRunnable( | ||
gap, | ||
writer, | ||
streamFactory, | ||
ActionListener.releaseAfter(sequentialGapsListener.acquire(), refs.acquire()) | ||
) | ||
) | ||
.toList(); | ||
List<Runnable> gapFillingTasks = gapFillingTasks(0, writer, sequentialGapsListener, streamFactory, gaps); | ||
executor.execute(() -> { | ||
// Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next | ||
// gap will still be executed. | ||
|
@@ -1050,7 +1171,9 @@ void populateAndRead( | |
); | ||
|
||
if (gaps.isEmpty() == false) { | ||
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps); | ||
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory( | ||
new RegionGaps((long) regionKey.region() * blobCacheService.getRegionSize(), gaps) | ||
); | ||
logger.trace( | ||
() -> Strings.format( | ||
"fill gaps %s %s shared input stream factory", | ||
|
@@ -1082,7 +1205,29 @@ void populateAndRead( | |
} | ||
} | ||
|
||
List<Runnable> gapFillingTasks( | ||
int gapOffset, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this not the region offset more than the gap offset? |
||
RangeMissingHandler writer, | ||
RefCountingListener sequentialGapsListener, | ||
SourceInputStreamFactory streamFactory, | ||
List<SparseFileTracker.Gap> gaps | ||
) { | ||
return gaps.stream() | ||
.map(gap -> fillGapRunnable(gapOffset, gap, writer, streamFactory, sequentialGapsListener.acquire())) | ||
.toList(); | ||
} | ||
|
||
private Runnable fillGapRunnable( | ||
SparseFileTracker.Gap gap, | ||
RangeMissingHandler writer, | ||
@Nullable SourceInputStreamFactory streamFactory, | ||
ActionListener<Void> listener | ||
) { | ||
return fillGapRunnable(0, gap, writer, streamFactory, listener); | ||
} | ||
|
||
private Runnable fillGapRunnable( | ||
int gapOffset, | ||
SparseFileTracker.Gap gap, | ||
RangeMissingHandler writer, | ||
@Nullable SourceInputStreamFactory streamFactory, | ||
|
@@ -1097,7 +1242,7 @@ private Runnable fillGapRunnable( | |
ioRef, | ||
start, | ||
streamFactory, | ||
start, | ||
gapOffset + start, | ||
Math.toIntExact(gap.end() - start), | ||
progress -> gap.onProgress(start + progress), | ||
l.<Void>map(unused -> { | ||
|
@@ -1126,6 +1271,8 @@ protected void alreadyClosed() { | |
} | ||
} | ||
|
||
public record RegionGaps(long regionOffset, List<SparseFileTracker.Gap> gaps) {} | ||
|
||
public class CacheFile { | ||
|
||
private final KeyType cacheKey; | ||
|
@@ -1403,6 +1550,11 @@ public interface RangeAvailableHandler { | |
|
||
@FunctionalInterface | ||
public interface RangeMissingHandler { | ||
|
||
default SourceInputStreamFactory sharedInputStreamFactory(RegionGaps gaps) { | ||
return sharedInputStreamFactory(List.of(gaps)); | ||
} | ||
|
||
/** | ||
* Attempt to get a shared {@link SourceInputStreamFactory} for the given list of Gaps so that all of them | ||
* can be filled from the input stream created from the factory. If a factory is returned, the gaps must be | ||
|
@@ -1413,7 +1565,7 @@ public interface RangeMissingHandler { | |
* its own input stream. | ||
*/ | ||
@Nullable | ||
default SourceInputStreamFactory sharedInputStreamFactory(List<SparseFileTracker.Gap> gaps) { | ||
default SourceInputStreamFactory sharedInputStreamFactory(List<RegionGaps> gaps) { | ||
return null; | ||
} | ||
|
||
|
@@ -1463,7 +1615,7 @@ protected DelegatingRangeMissingHandler(RangeMissingHandler delegate) { | |
} | ||
|
||
@Override | ||
public SourceInputStreamFactory sharedInputStreamFactory(List<SparseFileTracker.Gap> gaps) { | ||
public SourceInputStreamFactory sharedInputStreamFactory(List<RegionGaps> gaps) { | ||
return delegate.sharedInputStreamFactory(gaps); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should break it up if we hit a region that needs no data read? We could in principle risk having everything but the first and last region of a file and thus we could download an excess amount of extra data.
We can also build this into the shared stream handler.
Or forego it for now. I'd be ok with that too, but wanted to get views on it.