Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.SetOnce;
import org.opensearch.common.annotation.PublicApi;
Expand All @@ -45,6 +46,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.aggregations.support.AggregationPath;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.aggregation.ProfilingAggregator;
import org.opensearch.search.sort.SortOrder;

import java.io.IOException;
Expand Down Expand Up @@ -113,6 +115,32 @@ public InternalAggregation getPostCollectionAggregation() {
*/
public abstract Aggregator subAggregator(String name);

/**
* Subclasses may override this method if they have an efficient way of computing their aggregation for the given
* segment (versus collecting matching documents). If this method returns true, collection for the given segment
* will be terminated, rather than executing normally.
* <p>
* If this method returns true, the aggregator's state should be identical to what it would be if matching
* documents from the segment were fully collected. If this method returns false, the aggregator's state should
* be unchanged from before this method is called.
* @param ctx the context for the given segment
* @return true if and only if results for this segment have been precomputed
*/
public boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
return false;
}

/**
* Subclasses can choose to override this method to retrieve the leaf collected without precomputing the
* aggregation. Used in {@link ProfilingAggregator}
* @param ctx
* @return
* @throws IOException
*/
public LeafBucketCollector getLeafCollectorWithoutPrecompute(LeafReaderContext ctx) throws IOException {
return null;
};

/**
* Resolve the next step of the sort path as though this aggregation
* supported sorting. This is usually the "first step" when resolving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws
return getLeafCollector(ctx, sub);
}

@Override
public final LeafBucketCollector getLeafCollectorWithoutPrecompute(LeafReaderContext ctx) throws IOException {
preGetSubLeafCollectors(ctx);
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
return getLeafCollector(ctx, sub);
}

/**
* Can be overridden by aggregator implementations that like the perform an operation before the leaf collectors
* of children aggregators are instantiated for the next segment.
Expand All @@ -221,21 +228,6 @@ protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException
*/
protected void doPreCollection() throws IOException {}

/**
* Subclasses may override this method if they have an efficient way of computing their aggregation for the given
* segment (versus collecting matching documents). If this method returns true, collection for the given segment
* will be terminated, rather than executing normally.
* <p>
* If this method returns true, the aggregator's state should be identical to what it would be if matching
* documents from the segment were fully collected. If this method returns false, the aggregator's state should
* be unchanged from before this method is called.
* @param ctx the context for the given segment
* @return true if and only if results for this segment have been precomputed
*/
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
return false;
}

@Override
public final void preCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;
Expand All @@ -45,6 +46,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "3.4.0")
public abstract class LeafBucketCollector implements LeafCollector {

public static final LeafBucketCollector NO_OP_COLLECTOR = new LeafBucketCollector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@
package org.opensearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.Timer;
import org.opensearch.search.profile.aggregation.startree.StarTreeAggregationTimingType;
import org.opensearch.search.profile.aggregation.startree.StarTreeProfileBreakdown;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.startree.filter.DimensionFilter;

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;

/**
* This interface is used to pre-compute the star tree bucket collector for each segment/leaf.
Expand Down Expand Up @@ -45,4 +53,183 @@ StarTreeBucketCollector getStarTreeBucketCollector(
default List<DimensionFilter> getDimensionFilters() {
return null;
}

/**
* If this aggregator supports star tree precomputation, this method will represent the first phase of scanning
* the star tree index and return the matching values to be added to the buckets. Can be overriden by subclasses
* supporting star tree precomputation.
* @param context the search context for the aggregator
* @param valuesSource the data for values in this aggregator
* @param ctx the context for the given segment
* @param starTree field info details of composite index
* @param metric type of metric used for aggregation (e.g. sum, max, min, etc...)
* @return
* @throws IOException
*/
default FixedBitSet scanStarTree(
SearchContext context,
ValuesSource valuesSource,
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
String metric
) throws IOException {
return StarTreeQueryHelper.scanStarTree(context, valuesSource, ctx, starTree, metric);
}

/**
* If this aggregator supports star tree precomputation, this method will represent the first phase of scanning
* the star tree index and return the matching values to be added to the buckets and also profile the time it
* takes to complete this phase. Can be overriden by subclasses supporting star tree precomputation.
* @param context the search context for the aggregator
* @param valuesSource the data for values in this aggregator
* @param ctx the context for the given segment
* @param starTree field info details of composite index
* @param metric type of metric used for aggregation (e.g. sum, max, min, etc...)
* @param profileBreakdown the profiling breakdown to record the time taken for this phase
* @return
* @throws IOException
*/
default FixedBitSet scanStarTreeProfiling(
SearchContext context,
ValuesSource valuesSource,
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
String metric,
StarTreeProfileBreakdown profileBreakdown
) throws IOException {
assert profileBreakdown != null;
Timer timer = profileBreakdown.getTimer(StarTreeAggregationTimingType.SCAN_STAR_TREE_SEGMENTS);
timer.start();
try {
return scanStarTree(context, valuesSource, ctx, starTree, metric);
} finally {
timer.stop();
}
}

/**
* For bucket aggregations, this method will return a StarTreeBucketCollector as part of the phase of
* scanning the star tree for matching entries to be added to the buckets and defining which buckets the
* collected documents should go into. Profiles the time it
* takes to complete this phase. Can be overriden by subclasses
* @param ctx the context for the given segment
* @param starTree field info details of composite index
* @param parent the {@link StarTreeBucketCollector} for the parent aggregator if any
* @param profileBreakdown the profiling breakdown to record the time taken for this phase
* @return
* @throws IOException
*/
default StarTreeBucketCollector getStarTreeBucketCollectorProfiling(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parent,
StarTreeProfileBreakdown profileBreakdown
) throws IOException {
assert profileBreakdown != null;
Timer timer = profileBreakdown.getTimer(StarTreeAggregationTimingType.SCAN_STAR_TREE_SEGMENTS);
timer.start();
try {
return getStarTreeBucketCollector(ctx, starTree, parent);
} finally {
timer.stop();
}
}

/**
* After scanning the star tree, this method will apply the aggregation operation to create the buckets.
* Can be overriden by subclasses supporting star tree precomputation.
* @param context the search context for the aggregator
* @param valuesSource the data for values in this aggregator
* @param ctx the context for the given segment
* @param starTree field info details of composite index
* @param metric type of metric used for aggregation (e.g. sum, max, min, etc...)
* @param valueConsumer consumer to accept the values in documents matching the conditions
* @param finalConsumer consumer to set the final answer after iterating over all values
* @param filteredValues bitset for document ids matching the star tree query
* @throws IOException
*/
default void buildBucketsFromStarTree(
SearchContext context,
ValuesSource valuesSource,
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
String metric,
Consumer<Long> valueConsumer,
Runnable finalConsumer,
FixedBitSet filteredValues
) throws IOException {
StarTreeQueryHelper.buildBucketsFromStarTree(
context,
valuesSource,
ctx,
starTree,
metric,
valueConsumer,
finalConsumer,
filteredValues
);
}

/**
* After scanning the star tree, this method will apply the aggregation operation to create the buckets
* and also profile the time it takes to complete this phase. Can be overriden by subclasses
* supporting star tree precomputation.
* @param context the search context for the aggregator
* @param valuesSource the data for values in this aggregator
* @param ctx the context for the given segment
* @param starTree field info details of composite index
* @param metric type of metric used for aggregation (e.g. sum, max, min, etc...)
* @param valueConsumer consumer to accept the values in documents matching the conditions
* @param finalConsumer consumer to set the final answer after iterating over all values
* @param filteredValues bitset for document ids matching the star tree query
* @param profileBreakdown the profiling breakdown to record the time taken for this phase
* @throws IOException
*/
default void buildBucketsFromStarTreeProfiling(
SearchContext context,
ValuesSource valuesSource,
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
String metric,
Consumer<Long> valueConsumer,
Runnable finalConsumer,
FixedBitSet filteredValues,
StarTreeProfileBreakdown profileBreakdown
) throws IOException {
Timer timer = profileBreakdown.getTimer(StarTreeAggregationTimingType.BUILD_BUCKETS_FROM_STAR_TREE);
timer.start();
try {
buildBucketsFromStarTree(context, valuesSource, ctx, starTree, metric, valueConsumer, finalConsumer, filteredValues);
} finally {
timer.stop();
}
}

/**
* This method applies the StarTreeBucketCollector to collect the documents for bucket aggregations
* @param starTreeBucketCollector the star tree bucket collector for adding the documents
* @throws IOException
*/
default void preComputeBucketsWithStarTree(StarTreeBucketCollector starTreeBucketCollector) throws IOException {
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
}

/**
* This method applies the StarTreeBucketCollector to collect the documents for bucket aggregations
* @param starTreeBucketCollector the star tree bucket collector for adding the documents
* @param profileBreakdown the profiling breakdown to record the time taken for this phase
* @throws IOException
*/
default void preComputeBucketsWithStarTreeProfiling(
StarTreeBucketCollector starTreeBucketCollector,
StarTreeProfileBreakdown profileBreakdown
) throws IOException {
Timer timer = profileBreakdown.getTimer(StarTreeAggregationTimingType.BUILD_BUCKETS_FROM_STAR_TREE);
timer.start();
try {
preComputeBucketsWithStarTree(starTreeBucketCollector);
} finally {
timer.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
}

@Override
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
public boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
finishLeaf(); // May need to wrap up previous leaf if it could not be precomputed
return filterRewriteOptimizationContext.tryOptimize(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public final DeferringBucketCollector getDeferringCollector() {
protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException;

@Override
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
public boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
return filterRewriteOptimizationContext.tryOptimize(
ctx,
this::incrementBucketDocCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.aggregation.AggregationProfileBreakdown;
import org.opensearch.search.profile.aggregation.startree.StarTreeProfileBreakdown;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.startree.filter.DimensionFilter;
import org.opensearch.search.startree.filter.MatchAllFilter;
Expand All @@ -81,6 +83,7 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.opensearch.search.aggregations.bucket.filterrewrite.AggregatorBridge.segmentMatchAll;
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;
import static org.opensearch.search.startree.StarTreeQueryHelper.getSupportedStarTree;

Expand Down Expand Up @@ -202,11 +205,28 @@ public ScoreMode scoreMode() {
}

@Override
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
public boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);

if (context.getProfilers() != null) {
StarTreeProfileBreakdown breakdown = context.getProfilers().getAggregationProfiler().getStarTreeProfileBreakdown(this);
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollectorProfiling(
ctx,
supportedStarTree,
null,
breakdown
);
preComputeBucketsWithStarTreeProfiling(starTreeBucketCollector, breakdown);
AggregationProfileBreakdown aggregationProfileBreakdown = context.getProfilers()
.getAggregationProfiler()
.getQueryBreakdown(this);
aggregationProfileBreakdown.setStarTreeProfileBreakdown(breakdown);
aggregationProfileBreakdown.setStarTreePrecomputed();
} else {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
preComputeBucketsWithStarTree(starTreeBucketCollector);
}
return true;
}

Expand Down
Loading
Loading