Skip to content

Commit 6c1e9f4

Browse files
authored
MSQ TimeBoundary optimization. (#19012)
Implement two timeBoundary-style optimizations for MSQ: 1) Filter base inputs to include only the earliest (for min) or latest (for max) segments. 2) Use TimeBoundaryInspector when available. Unlike the native query path, the SQL planner still emits a groupBy query or groupBy stage. The decisions about what to optimize happen at execution time. This makes the optimization simpler, as there is no need for a special timeBoundary query type that must be capable of operating over all kinds of data. It also simplifies planning.
1 parent e3b8e7c commit 6c1e9f4

7 files changed

Lines changed: 968 additions & 10 deletions

File tree

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,15 @@ protected abstract FrameProcessor<Object> makeProcessor(
223223
FrameContext providerThingy
224224
);
225225

226+
/**
227+
* Filters the physical input slices before they are used to create a {@link ReadableInputQueue}.
228+
* Subclasses can override this to reduce the set of segments that need to be read.
229+
*/
230+
protected List<PhysicalInputSlice> filterBaseInput(final List<PhysicalInputSlice> slices)
231+
{
232+
return slices;
233+
}
234+
226235
/**
227236
* Read base inputs, where "base" is meant in the same sense as in {@link ExecutionVertex}: the primary datasource
228237
* that drives query processing.
@@ -231,7 +240,7 @@ protected abstract FrameProcessor<Object> makeProcessor(
231240
* segments. Once {@link ReadableInputQueue#nextInput()} or {@link ReadableInputQueue#start()} is called,
232241
* the queue must be closed when done being used.
233242
*/
234-
private static ReadableInputQueue makeBaseInputQueue(
243+
private ReadableInputQueue makeBaseInputQueue(
235244
final List<InputSlice> inputSlices,
236245
final ExecutionContext context
237246
)
@@ -252,12 +261,13 @@ private static ReadableInputQueue makeBaseInputQueue(
252261
}
253262
}
254263

264+
final List<PhysicalInputSlice> filteredSlices = filterBaseInput(physicalInputSlices);
255265
final Integer segmentLoadAheadCount =
256266
MultiStageQueryContext.getSegmentLoadAheadCount(context.workOrder().getWorkerContext());
257267
return new ReadableInputQueue(
258268
stageDef.getId().getQueryId(),
259269
new StandardPartitionReader(context),
260-
physicalInputSlices,
270+
filteredSlices,
261271
segmentLoadAheadCount != null ? segmentLoadAheadCount : context.threadCount()
262272
);
263273
}

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,22 @@ protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder segmen
184184
closer.register(() -> segmentHolder.getInputCounters().addFile(rowCount, 0));
185185
}
186186

187-
final Sequence<ResultRow> rowSequence =
188-
groupingEngine.process(
189-
query.withQuerySegmentSpec(new SpecificSegmentSpec(segmentHolder.getDescriptor())),
190-
Objects.requireNonNull(segment.as(CursorFactory.class)),
191-
segment.as(TimeBoundaryInspector.class),
192-
bufferPool,
193-
null
194-
);
187+
final TimeBoundaryInspector tbi = segment.as(TimeBoundaryInspector.class);
188+
189+
final Sequence<ResultRow> rowSequence;
190+
if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(query, tbi, segmentHolder.getDescriptor())) {
191+
// Resolve this query using the TimeBoundaryInspector, no need for a cursor.
192+
rowSequence = Sequences.simple(List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query, tbi)));
193+
} else {
194+
// Resolve this query using a cursor.
195+
rowSequence = groupingEngine.process(
196+
query.withQuerySegmentSpec(new SpecificSegmentSpec(segmentHolder.getDescriptor())),
197+
Objects.requireNonNull(segment.as(CursorFactory.class)),
198+
tbi,
199+
bufferPool,
200+
null
201+
);
202+
}
195203

196204
resultYielder = Yielders.each(rowSequence);
197205
}

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,16 @@
2828
import org.apache.druid.frame.processor.FrameProcessor;
2929
import org.apache.druid.frame.write.FrameWriterFactory;
3030
import org.apache.druid.msq.exec.FrameContext;
31+
import org.apache.druid.msq.input.LoadableSegment;
32+
import org.apache.druid.msq.input.PhysicalInputSlice;
3133
import org.apache.druid.msq.querykit.BaseLeafStageProcessor;
3234
import org.apache.druid.msq.querykit.ReadableInput;
3335
import org.apache.druid.query.groupby.GroupByQuery;
3436
import org.apache.druid.segment.SegmentMapFunction;
37+
import org.joda.time.Interval;
38+
39+
import java.util.ArrayList;
40+
import java.util.List;
3541

3642
@JsonTypeName("groupByPreShuffle")
3743
public class GroupByPreShuffleStageProcessor extends BaseLeafStageProcessor
@@ -71,6 +77,63 @@ protected FrameProcessor<Object> makeProcessor(
7177
);
7278
}
7379

80+
@Override
81+
protected List<PhysicalInputSlice> filterBaseInput(final List<PhysicalInputSlice> slices)
82+
{
83+
if (!GroupByTimeBoundaryUtils.isTimeBoundaryQuery(query)) {
84+
return slices;
85+
}
86+
87+
// This is a time-boundary style query (see GroupByTimeBoundaryUtils.isTimeBoundaryQuery).
88+
// This means we can look at just the earliest (for min) and latest (for max) segments,
89+
// ignoring the ones in the middle.
90+
final boolean needsMin = GroupByTimeBoundaryUtils.needsMinTime(query);
91+
final boolean needsMax = GroupByTimeBoundaryUtils.needsMaxTime(query);
92+
93+
final List<PhysicalInputSlice> filteredSlices = new ArrayList<>(slices.size());
94+
95+
for (final PhysicalInputSlice slice : slices) {
96+
final List<LoadableSegment> segments = slice.getLoadableSegments();
97+
98+
if (segments.size() <= 1) {
99+
filteredSlices.add(slice);
100+
continue;
101+
}
102+
103+
// Find the earliest and latest intervals by start time.
104+
Interval minInterval = null;
105+
Interval maxInterval = null;
106+
107+
for (final LoadableSegment segment : segments) {
108+
final Interval interval = segment.descriptor().getInterval();
109+
if (needsMin) {
110+
if (minInterval == null || interval.getStart().isBefore(minInterval.getStart())) {
111+
minInterval = interval;
112+
}
113+
}
114+
if (needsMax) {
115+
if (maxInterval == null || interval.getEnd().isAfter(maxInterval.getEnd())) {
116+
maxInterval = interval;
117+
}
118+
}
119+
}
120+
121+
// Keep only segments whose interval overlaps with the earliest or latest interval.
122+
final List<LoadableSegment> kept = new ArrayList<>();
123+
for (final LoadableSegment segment : segments) {
124+
final Interval segmentInterval = segment.descriptor().getInterval();
125+
if ((minInterval != null && segmentInterval.overlaps(minInterval))
126+
|| (maxInterval != null && segmentInterval.overlaps(maxInterval))) {
127+
kept.add(segment);
128+
}
129+
}
130+
131+
filteredSlices.add(new PhysicalInputSlice(slice.getReadablePartitions(), kept, slice.getQueryableServers()));
132+
}
133+
134+
return filteredSlices;
135+
}
136+
74137
@Override
75138
public boolean usesProcessingBuffers()
76139
{
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.msq.querykit.groupby;
21+
22+
import org.apache.druid.java.util.common.granularity.Granularities;
23+
import org.apache.druid.query.SegmentDescriptor;
24+
import org.apache.druid.query.aggregation.AggregatorFactory;
25+
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
26+
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
27+
import org.apache.druid.query.aggregation.SimpleLongAggregatorFactory;
28+
import org.apache.druid.query.groupby.GroupByQuery;
29+
import org.apache.druid.query.groupby.ResultRow;
30+
import org.apache.druid.segment.TimeBoundaryInspector;
31+
import org.apache.druid.segment.column.ColumnHolder;
32+
33+
import javax.annotation.Nullable;
34+
import java.util.List;
35+
36+
/**
37+
* Utility methods for detecting and optimizing GroupBy queries that are effectively time boundary queries:
38+
* no dimensions, {@link Granularities#ALL}, and only {@code MIN(__time)} / {@code MAX(__time)} aggregators.
39+
*/
40+
public class GroupByTimeBoundaryUtils
41+
{
42+
/**
43+
* Returns true if the query is a "time boundary" GroupBy: no dimensions, {@link Granularities#ALL},
44+
* no filter, at least one aggregator, and every aggregator is {@link LongMinAggregatorFactory} or
45+
* {@link LongMaxAggregatorFactory} on {@link ColumnHolder#TIME_COLUMN_NAME}.
46+
*/
47+
public static boolean isTimeBoundaryQuery(final GroupByQuery query)
48+
{
49+
if (!query.getDimensions().isEmpty()) {
50+
return false;
51+
}
52+
53+
if (!Granularities.ALL.equals(query.getGranularity())) {
54+
return false;
55+
}
56+
57+
if (query.getDimFilter() != null) {
58+
return false;
59+
}
60+
61+
final List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
62+
63+
if (aggregatorSpecs.isEmpty()) {
64+
return false;
65+
}
66+
67+
for (final AggregatorFactory agg : aggregatorSpecs) {
68+
if (!isTimeBoundaryAggregator(agg)) {
69+
return false;
70+
}
71+
}
72+
73+
return true;
74+
}
75+
76+
/**
77+
* Returns true if the query needs the minimum time (has at least one {@link LongMinAggregatorFactory}
78+
* on {@link ColumnHolder#TIME_COLUMN_NAME}).
79+
*/
80+
public static boolean needsMinTime(final GroupByQuery query)
81+
{
82+
for (final AggregatorFactory agg : query.getAggregatorSpecs()) {
83+
if (isTimeBoundaryAggregator(agg) && agg instanceof LongMinAggregatorFactory) {
84+
return true;
85+
}
86+
}
87+
return false;
88+
}
89+
90+
/**
91+
* Returns true if the query needs the maximum time (has at least one {@link LongMaxAggregatorFactory}
92+
* on {@link ColumnHolder#TIME_COLUMN_NAME}).
93+
*/
94+
public static boolean needsMaxTime(final GroupByQuery query)
95+
{
96+
for (final AggregatorFactory agg : query.getAggregatorSpecs()) {
97+
if (isTimeBoundaryAggregator(agg) && agg instanceof LongMaxAggregatorFactory) {
98+
return true;
99+
}
100+
}
101+
return false;
102+
}
103+
104+
/**
105+
* Returns true if the {@link TimeBoundaryInspector} can be used to answer the query without scanning data.
106+
* Requires that the query is a time boundary query, the inspector is non-null and exact, and that the
107+
* descriptor's interval fully contains the inspector's min/max interval.
108+
*/
109+
public static boolean canUseTimeBoundaryInspector(
110+
final GroupByQuery query,
111+
@Nullable final TimeBoundaryInspector tbi,
112+
final SegmentDescriptor descriptor
113+
)
114+
{
115+
return isTimeBoundaryQuery(query)
116+
&& tbi != null
117+
&& tbi.isMinMaxExact()
118+
&& descriptor.getInterval().contains(tbi.getMinMaxInterval());
119+
}
120+
121+
/**
122+
* Constructs a {@link ResultRow} from the time boundary inspector, filling each aggregator position
123+
* with the appropriate min or max time.
124+
*/
125+
public static ResultRow computeTimeBoundaryResult(final GroupByQuery query, final TimeBoundaryInspector tbi)
126+
{
127+
final int size = query.getResultRowSizeWithoutPostAggregators();
128+
final ResultRow row = ResultRow.create(size);
129+
final int aggStart = query.getResultRowAggregatorStart();
130+
final List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
131+
132+
for (int i = 0; i < aggregatorSpecs.size(); i++) {
133+
final AggregatorFactory agg = aggregatorSpecs.get(i);
134+
135+
if (agg instanceof LongMinAggregatorFactory) {
136+
row.set(aggStart + i, tbi.getMinTime().getMillis());
137+
} else if (agg instanceof LongMaxAggregatorFactory) {
138+
row.set(aggStart + i, tbi.getMaxTime().getMillis());
139+
}
140+
}
141+
142+
return row;
143+
}
144+
145+
private static boolean isTimeBoundaryAggregator(final AggregatorFactory agg)
146+
{
147+
return (agg instanceof LongMinAggregatorFactory || agg instanceof LongMaxAggregatorFactory)
148+
&& ColumnHolder.TIME_COLUMN_NAME.equals(((SimpleLongAggregatorFactory) agg).getFieldName());
149+
}
150+
}

multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
import org.apache.druid.query.aggregation.CountAggregatorFactory;
6464
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
6565
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
66+
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
67+
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
6668
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
6769
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
6870
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
@@ -75,6 +77,7 @@
7577
import org.apache.druid.query.ordering.StringComparators;
7678
import org.apache.druid.query.policy.Policy;
7779
import org.apache.druid.query.scan.ScanQuery;
80+
import org.apache.druid.segment.column.ColumnHolder;
7881
import org.apache.druid.segment.column.ColumnType;
7982
import org.apache.druid.segment.column.RowSignature;
8083
import org.apache.druid.segment.join.JoinType;
@@ -2954,4 +2957,58 @@ public boolean isPageSizeLimited(String contextName)
29542957
{
29552958
return QUERY_RESULTS_WITH_DURABLE_STORAGE.equals(contextName);
29562959
}
2960+
2961+
@MethodSource("data")
2962+
@ParameterizedTest(name = "{index}:with context {0}")
2963+
public void testTimeBoundaryGroupBy(String contextName, Map<String, Object> context)
2964+
{
2965+
final RowSignature rowSignature = RowSignature.builder()
2966+
.add("EXPR$0", ColumnType.LONG)
2967+
.add("EXPR$1", ColumnType.LONG)
2968+
.build();
2969+
2970+
testSelectQuery()
2971+
.setSql("SELECT MIN(__time), MAX(__time) FROM foo")
2972+
.setExpectedMSQSpec(
2973+
LegacyMSQSpec.builder()
2974+
.query(
2975+
GroupByQuery.builder()
2976+
.setDataSource(CalciteTests.DATASOURCE1)
2977+
.setInterval(querySegmentSpec(Filtration.eternity()))
2978+
.setGranularity(Granularities.ALL)
2979+
.setAggregatorSpecs(
2980+
aggregators(
2981+
new LongMinAggregatorFactory("a0", ColumnHolder.TIME_COLUMN_NAME),
2982+
new LongMaxAggregatorFactory("a1", ColumnHolder.TIME_COLUMN_NAME)
2983+
)
2984+
)
2985+
.setContext(context)
2986+
.build()
2987+
)
2988+
.columnMappings(
2989+
new ColumnMappings(
2990+
ImmutableList.of(
2991+
new ColumnMapping("a0", "EXPR$0"),
2992+
new ColumnMapping("a1", "EXPR$1")
2993+
)
2994+
)
2995+
)
2996+
.tuningConfig(MSQTuningConfig.defaultConfig())
2997+
.destination(isDurableStorageDestination(contextName, context)
2998+
? DurableStorageMSQDestination.INSTANCE
2999+
: TaskReportMSQDestination.INSTANCE)
3000+
.build()
3001+
)
3002+
.setExpectedRowSignature(rowSignature)
3003+
.setQueryContext(context)
3004+
.setExpectedResultRows(
3005+
ImmutableList.of(
3006+
new Object[]{
3007+
DateTimes.of("2000-01-01").getMillis(),
3008+
DateTimes.of("2001-01-03").getMillis()
3009+
}
3010+
)
3011+
)
3012+
.verifyResults();
3013+
}
29573014
}

0 commit comments

Comments
 (0)