Skip to content

Commit 8ce2f36

Browse files
authored
feat: add async CursorFactory API and migrate MSQ frame processors to use it (#19397)
changes: * add `AsyncCursorHolder` to manage async loading lifecycle for a cursor holder until ownership of the `CursorHolder` it produces can be transferred to the consumer (see javadoc for details) * add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning `AsyncCursorHolder.completed(makeCursorHolder(spec))` so existing implementations remain async-correct without changes * add `GroupingEngine.makeCursorHolderAsync` returning `AsyncCursorHolder`, and extracting shared `processWithCursorHolder` helper from `GroupingEngine.process()`, so that a caller which can yield and then resume can wait for the `CursorHolder` to be ready and later process it * migrate `ScanQueryFrameProcessor.runWithSegment` to call `makeCursorHolderAsync` and yield via `ReturnOrAwait` while the load is pending * migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to call `GroupingEngine.makeCursorHolderAsync` and yield via `ReturnOrAwait` while loading
1 parent 12e31b2 commit 8ce2f36

7 files changed

Lines changed: 906 additions & 39 deletions

File tree

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java

Lines changed: 64 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.common.collect.Iterables;
2323
import com.google.common.util.concurrent.ListenableFuture;
24+
import com.google.common.util.concurrent.SettableFuture;
2425
import org.apache.druid.collections.NonBlockingPool;
2526
import org.apache.druid.collections.ResourceHolder;
2627
import org.apache.druid.common.guava.FutureUtils;
@@ -58,8 +59,10 @@
5859
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper;
5960
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
6061
import org.apache.druid.query.spec.SpecificSegmentSpec;
62+
import org.apache.druid.segment.AsyncCursorHolder;
6163
import org.apache.druid.segment.ColumnSelectorFactory;
6264
import org.apache.druid.segment.CursorFactory;
65+
import org.apache.druid.segment.CursorHolder;
6366
import org.apache.druid.segment.Segment;
6467
import org.apache.druid.segment.SegmentMapFunction;
6568
import org.apache.druid.segment.TimeBoundaryInspector;
@@ -94,6 +97,18 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
9497
private SegmentsInputSlice handedOffSegments = null;
9598
private Yielder<Yielder<ResultRow>> currentResultsYielder;
9699
private ListenableFuture<DataServerQueryResult<ResultRow>> dataServerQueryResultFuture;
100+
@Nullable
101+
private CursorFactory currentCursorFactory;
102+
@Nullable
103+
private TimeBoundaryInspector currentTimeBoundaryInspector;
104+
/**
105+
* In-flight {@link GroupingEngine#makeCursorHolderAsync} handle for the current segment, when {@link #resultYielder}
106+
* has not yet been derived. Registered on {@link #closer} so the produced {@link CursorHolder} is always disposed
107+
* regardless of where the underlying load is in its lifecycle. Cleared after we transfer ownership of the holder to
108+
* {@link GroupingEngine#processCursorHolder} (which moves it onto the resulting Sequence's baggage closer).
109+
*/
110+
@Nullable
111+
private AsyncCursorHolder asyncCursorHolder;
97112

98113
public GroupByPreShuffleFrameProcessor(
99114
final GroupByQuery query,
@@ -185,28 +200,60 @@ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(DataServerQue
185200
protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder segmentHolder) throws IOException
186201
{
187202
if (resultYielder == null) {
188-
final Segment segment = mapSegment(segmentHolder, closer);
189-
final TimeBoundaryInspector tbi = segment.as(TimeBoundaryInspector.class);
190-
final Sequence<ResultRow> rowSequence;
203+
if (asyncCursorHolder == null && currentCursorFactory == null) {
204+
// First invocation for this segment: map it, check the TimeBoundary fast path, otherwise kick off the async
205+
// cursor-holder load and cache the cursor factory + time-boundary inspector for the follow-up invocation.
206+
final Segment segment = mapSegment(segmentHolder, closer);
207+
currentTimeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
208+
209+
if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(
210+
query,
211+
currentTimeBoundaryInspector,
212+
segmentHolder.getDescriptor()
213+
)) {
214+
// Resolve this query using the TimeBoundaryInspector, no need for a cursor.
215+
resultYielder = Yielders.each(
216+
Sequences.simple(
217+
List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query, currentTimeBoundaryInspector))
218+
)
219+
);
220+
} else {
221+
currentCursorFactory = Objects.requireNonNull(segment.as(CursorFactory.class));
222+
// Resolve this query using a cursor.
223+
final GroupByQuery segmentQuery = (GroupByQuery) query
224+
.withQuerySegmentSpec(new SpecificSegmentSpec(segmentHolder.getDescriptor()))
225+
.optimizeForSegment(new PerSegmentQueryOptimizationContext(segmentHolder.getDescriptor()));
226+
asyncCursorHolder = closer.register(
227+
groupingEngine.makeCursorHolderAsync(
228+
segmentQuery,
229+
currentCursorFactory,
230+
null
231+
)
232+
);
233+
}
234+
}
191235

192-
if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(query, tbi, segmentHolder.getDescriptor())) {
193-
// Resolve this query using the TimeBoundaryInspector, no need for a cursor.
194-
rowSequence = Sequences.simple(List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query, tbi)));
195-
} else {
196-
// Resolve this query using a cursor.
197-
final GroupByQuery segmentQuery = (GroupByQuery) query
198-
.withQuerySegmentSpec(new SpecificSegmentSpec(segmentHolder.getDescriptor()))
199-
.optimizeForSegment(new PerSegmentQueryOptimizationContext(segmentHolder.getDescriptor()));
200-
rowSequence = groupingEngine.process(
201-
segmentQuery,
202-
Objects.requireNonNull(segment.as(CursorFactory.class)),
203-
tbi,
236+
if (asyncCursorHolder != null) {
237+
if (!asyncCursorHolder.isReady()) {
238+
final SettableFuture<?> awaitFuture = SettableFuture.create();
239+
asyncCursorHolder.addReadyCallback(() -> awaitFuture.set(null));
240+
return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture));
241+
}
242+
// The holder is ready, ownership of the holder transitions onto the returned Sequence's baggage closer
243+
final CursorHolder holder = asyncCursorHolder.release();
244+
asyncCursorHolder = null;
245+
// currentCursorFactory is non-null whenever asyncCursorHolder is non-null (both are set together in the
246+
// first-invocation branch above). The requireNonNull pins the invariant for static analysis.
247+
final Sequence<ResultRow> rowSequence = groupingEngine.processCursorHolder(
248+
query.withQuerySegmentSpec(new SpecificSegmentSpec(segmentHolder.getDescriptor())),
249+
Objects.requireNonNull(currentCursorFactory),
250+
holder,
251+
currentTimeBoundaryInspector,
204252
bufferPool,
205253
null
206254
);
255+
resultYielder = Yielders.each(rowSequence);
207256
}
208-
209-
resultYielder = Yielders.each(rowSequence);
210257
}
211258

212259
populateFrameWriterAndFlushIfNeeded();

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.collect.ImmutableList;
2727
import com.google.common.collect.Iterables;
2828
import com.google.common.util.concurrent.ListenableFuture;
29+
import com.google.common.util.concurrent.SettableFuture;
2930
import it.unimi.dsi.fastutil.ints.IntSet;
3031
import org.apache.druid.collections.ResourceHolder;
3132
import org.apache.druid.common.guava.FutureUtils;
@@ -69,6 +70,7 @@
6970
import org.apache.druid.query.scan.ScanQueryEngine;
7071
import org.apache.druid.query.scan.ScanResultValue;
7172
import org.apache.druid.query.spec.SpecificSegmentSpec;
73+
import org.apache.druid.segment.AsyncCursorHolder;
7274
import org.apache.druid.segment.ColumnSelectorFactory;
7375
import org.apache.druid.segment.Cursor;
7476
import org.apache.druid.segment.CursorFactory;
@@ -118,6 +120,14 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
118120
private final Closer closer = Closer.create();
119121

120122
private Cursor cursor;
123+
/**
124+
* In-flight {@link CursorFactory#makeCursorHolderAsync} handle for the current segment, when {@link #cursor} has not
125+
* yet been derived. Registered on {@link #closer} as soon as it is created so the produced {@link CursorHolder} is
126+
* always disposed regardless of where the underlying load is in its lifecycle. Cleared after the holder is consumed
127+
* and ownership transitions to {@link #cursorCloser}.
128+
*/
129+
@Nullable
130+
private AsyncCursorHolder asyncCursorHolder;
121131
private ListenableFuture<DataServerQueryResult<Object[]>> dataServerQueryResultFuture;
122132
private Closeable cursorCloser;
123133
/**
@@ -297,21 +307,36 @@ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final DataSer
297307
protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder segmentHolder) throws IOException
298308
{
299309
if (cursor == null) {
300-
final Segment segment = mapSegment(segmentHolder, closer);
301-
final CursorFactory cursorFactory = segment.as(CursorFactory.class);
302-
if (cursorFactory == null) {
303-
throw DruidException.defensive(
304-
"Null cursor factory found. Probably trying to issue a query against a segment being memory unmapped."
310+
if (asyncCursorHolder == null) {
311+
final Segment segment = mapSegment(segmentHolder, closer);
312+
final CursorFactory cursorFactory = segment.as(CursorFactory.class);
313+
if (cursorFactory == null) {
314+
throw DruidException.defensive(
315+
"Null cursor factory found. Probably trying to issue a query against a segment being memory unmapped."
316+
);
317+
}
318+
319+
asyncCursorHolder = closer.register(
320+
cursorFactory.makeCursorHolderAsync(
321+
ScanQueryEngine.makeCursorBuildSpec(
322+
query.withQuerySegmentSpec(new SpecificSegmentSpec(segmentHolder.getDescriptor())),
323+
null
324+
)
325+
)
305326
);
306327
}
307328

308-
final CursorHolder nextCursorHolder =
309-
cursorFactory.makeCursorHolder(
310-
ScanQueryEngine.makeCursorBuildSpec(
311-
query.withQuerySegmentSpec(new SpecificSegmentSpec(segmentHolder.getDescriptor())),
312-
null
313-
)
314-
);
329+
if (!asyncCursorHolder.isReady()) {
330+
final SettableFuture<?> awaitFuture = SettableFuture.create();
331+
asyncCursorHolder.addReadyCallback(() -> awaitFuture.set(null));
332+
return ReturnOrAwait.awaitAllFutures(ImmutableList.of(awaitFuture));
333+
}
334+
335+
// Transfer ownership of the holder out of the AsyncCursorHolder; setNextCursor manages the holder's lifecycle
336+
// from here on. The wrapper stays registered on closer (close() is now a no-op since release was called) so
337+
// we don't need to track it further.
338+
final CursorHolder nextCursorHolder = asyncCursorHolder.release();
339+
asyncCursorHolder = null;
315340

316341
final Cursor nextCursor;
317342

0 commit comments

Comments
 (0)