Skip to content

Commit 905f6d2

Browse files
authored
wire lazyLoadOnStart back up to segment loader config (apache#18637)
1 parent 754e03d commit 905f6d2

3 files changed

Lines changed: 69 additions & 20 deletions

File tree

server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -639,9 +639,11 @@ private SegmentCacheEntry assignLocationAndMount(
639639
if (cacheEntry.checkExists(location.getPath())) {
640640
if (location.isReserved(cacheEntry.id) || location.reserve(cacheEntry)) {
641641
final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id);
642-
entry.lazyLoadCallback = segmentLoadFailCallback;
643-
entry.mount(location);
644-
return entry;
642+
if (entry != null) {
643+
entry.lazyLoadCallback = segmentLoadFailCallback;
644+
entry.mount(location);
645+
return entry;
646+
}
645647
} else {
646648
// entry is not reserved, clean it up
647649
deleteCacheEntryDirectory(cacheEntry.toPotentialLocation(location.getPath()));
@@ -658,9 +660,11 @@ private SegmentCacheEntry assignLocationAndMount(
658660
if (location.reserve(cacheEntry)) {
659661
try {
660662
final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id);
661-
entry.lazyLoadCallback = segmentLoadFailCallback;
662-
entry.mount(location);
663-
return entry;
663+
if (entry != null) {
664+
entry.lazyLoadCallback = segmentLoadFailCallback;
665+
entry.mount(location);
666+
return entry;
667+
}
664668
}
665669
catch (SegmentLoadingException e) {
666670
log.warn(e, "Failed to load segment[%s] in location[%s], trying next location", cacheEntry.id, location.getPath());
@@ -831,7 +835,9 @@ public void mount(StorageLocation mountLocation) throws SegmentLoadingException
831835
}
832836
final SegmentizerFactory factory = getSegmentFactory(storageDir);
833837

834-
final Segment segment = factory.factorize(dataSegment, storageDir, false, lazyLoadCallback);
838+
@SuppressWarnings("ObjectEquality")
839+
final boolean lazy = config.isLazyLoadOnStart() && lazyLoadCallback != SegmentLazyLoadFailCallback.NOOP;
840+
final Segment segment = factory.factorize(dataSegment, storageDir, lazy, lazyLoadCallback);
835841
// wipe load callback after calling
836842
lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP;
837843
referenceProvider = ReferenceCountedSegmentProvider.of(segment);

server/src/main/java/org/apache/druid/server/ServerManager.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -308,21 +308,27 @@ private ArrayList<SegmentReference> getSegmentReferences(
308308
final ListenableFuture<ReferenceCountedObjectProvider<Segment>> future = futures.get(i);
309309
final ReferenceCountedObjectProvider<Segment> referenceProvider =
310310
future.get(timeoutAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
311-
final Optional<Segment> segment = referenceProvider.acquireReference();
312-
try {
313-
final Optional<Segment> mappedSegment = segmentMapFunction.apply(segment).map(safetyNet::register);
311+
if (referenceProvider == null) {
314312
segmentReferences.add(
315-
new SegmentReference(
316-
segmentAndDescriptor.getDescriptor(),
317-
mappedSegment,
318-
action
319-
)
313+
new SegmentReference(segmentAndDescriptor.getDescriptor(), Optional.empty(), action)
320314
);
321-
}
322-
catch (Throwable t) {
323-
// if applying the mapFn failed, attach the base segment to the closer and rethrow
324-
segment.ifPresent(safetyNet::register);
325-
throw t;
315+
} else {
316+
final Optional<Segment> segment = referenceProvider.acquireReference();
317+
try {
318+
final Optional<Segment> mappedSegment = segmentMapFunction.apply(segment).map(safetyNet::register);
319+
segmentReferences.add(
320+
new SegmentReference(
321+
segmentAndDescriptor.getDescriptor(),
322+
mappedSegment,
323+
action
324+
)
325+
);
326+
}
327+
catch (Throwable t) {
328+
// if applying the mapFn failed, attach the base segment to the closer and rethrow
329+
segment.ifPresent(safetyNet::register);
330+
throw t;
331+
}
326332
}
327333
}
328334
catch (Throwable t) {

server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,43 @@ public void testGetBootstrapSegment() throws SegmentLoadingException
839839
Assert.assertEquals(dataSegment.getInterval(), actualBootstrapSegment.getDataInterval());
840840
}
841841

842+
843+
@Test
844+
public void testGetBootstrapSegmentLazy() throws SegmentLoadingException
845+
{
846+
final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheDir, 10000L, null);
847+
final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig()
848+
{
849+
@Override
850+
public boolean isLazyLoadOnStart()
851+
{
852+
return true;
853+
}
854+
855+
@Override
856+
public List<StorageLocationConfig> getLocations()
857+
{
858+
return List.of(locationConfig);
859+
}
860+
};
861+
final List<StorageLocation> storageLocations = loaderConfig.toStorageLocations();
862+
SegmentLocalCacheManager manager = new SegmentLocalCacheManager(
863+
storageLocations,
864+
loaderConfig,
865+
new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
866+
TestHelper.getTestIndexIO(jsonMapper, ColumnConfig.DEFAULT),
867+
jsonMapper
868+
);
869+
870+
final DataSegment dataSegment = TestSegmentUtils.makeSegment("foo", "v1", Intervals.of("2020/2021"));
871+
872+
manager.bootstrap(dataSegment, () -> {});
873+
Segment actualBootstrapSegment = manager.acquireCachedSegment(dataSegment).orElse(null);
874+
Assert.assertNotNull(actualBootstrapSegment);
875+
Assert.assertEquals(dataSegment.getId(), actualBootstrapSegment.getId());
876+
Assert.assertEquals(dataSegment.getInterval(), actualBootstrapSegment.getDataInterval());
877+
}
878+
842879
@Test
843880
public void testGetSegmentVirtualStorage() throws Exception
844881
{

0 commit comments

Comments
 (0)