Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233))

### Changed
- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
- Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.search.CheckedIntConsumer;
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.MathUtil;

import java.io.IOException;

/**
* DocIdStream implementation using FixedBitSet. This is duplicate of the implementation in Lucene
* and should ideally eventually be removed.
*
* @opensearch.internal
*/
public final class BitSetDocIdStream extends DocIdStream {
private final FixedBitSet bitSet;
private final int offset, max;
private int upTo;

public BitSetDocIdStream(FixedBitSet bitSet, int offset) {
this.bitSet = bitSet;
this.offset = offset;
upTo = offset;
max = MathUtil.unsignedMin(Integer.MAX_VALUE, offset + bitSet.length());
}

@Override
public boolean mayHaveRemaining() {
return upTo < max;
}

@Override
public void forEach(int upTo, CheckedIntConsumer<IOException> consumer) throws IOException {
if (upTo > this.upTo) {
upTo = Math.min(upTo, max);
bitSet.forEach(this.upTo - offset, upTo - offset, offset, consumer);
this.upTo = upTo;
}
}

@Override
public int count(int upTo) throws IOException {
if (upTo > this.upTo) {
upTo = Math.min(upTo, max);
int count = bitSet.cardinality(this.upTo - offset, upTo - offset);
this.upTo = upTo;
return count;
} else {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@

package org.opensearch.search.aggregations;

import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;
Expand Down Expand Up @@ -123,6 +125,28 @@ public void collect(int doc) throws IOException {
collect(doc, 0);
}

/**
* Bulk-collect doc IDs within owningBucketOrd.
*
* <p>Note: The provided {@link DocIdStream} may be reused across calls and should be consumed
* immediately.
*
* <p>Note: The provided {@link DocIdStream} typically holds all the docIds for the corresponding
* owningBucketOrd. This method may be called multiple times per segment (but once per owningBucketOrd).
*
* <p>While the {@link DocIdStream} for each owningBucketOrd is sorted by docIds, it is NOT GUARANTEED
* that doc IDs arrive in order across invocations for different owningBucketOrd.
*
* <p>It is NOT LEGAL for callers to mix calls to {@link #collect(DocIdStream, long)} and {@link
* #collect(int, long)}.
*
* <p>The default implementation calls {@code stream.forEach(doc -> collect(doc, owningBucketOrd))}.
*/
@ExperimentalApi
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
stream.forEach(doc -> collect(doc, owningBucketOrd));
}

@Override
public void setScorer(Scorable scorer) throws IOException {
// no-op by default
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket;

import org.apache.lucene.index.DocValuesSkipper;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.Scorable;
import org.opensearch.common.Rounding;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;

/**
* Histogram collection logic using skip list.
*
* @opensearch.internal
*/
public class HistogramSkiplistLeafCollector extends LeafBucketCollector {

private final NumericDocValues values;
private final DocValuesSkipper skipper;
private final Rounding.Prepared preparedRounding;
private final LongKeyedBucketOrds bucketOrds;
private final LeafBucketCollector sub;
private final BucketsAggregator aggregator;

/**
* Max doc ID (inclusive) up to which all docs values may map to the same bucket.
*/
private int upToInclusive = -1;

/**
* Whether all docs up to {@link #upToInclusive} values map to the same bucket.
*/
private boolean upToSameBucket;

/**
* Index in bucketOrds for docs up to {@link #upToInclusive}.
*/
private long upToBucketIndex;

public HistogramSkiplistLeafCollector(
NumericDocValues values,
DocValuesSkipper skipper,
Rounding.Prepared preparedRounding,
LongKeyedBucketOrds bucketOrds,
LeafBucketCollector sub,
BucketsAggregator aggregator
) {
this.values = values;
this.skipper = skipper;
this.preparedRounding = preparedRounding;
this.bucketOrds = bucketOrds;
this.sub = sub;
this.aggregator = aggregator;
}

@Override
public void setScorer(Scorable scorer) throws IOException {
if (sub != null) {
sub.setScorer(scorer);
}
}

private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
if (doc > skipper.maxDocID(0)) {
skipper.advance(doc);
}
upToSameBucket = false;

if (skipper.minDocID(0) > doc) {
// Corner case which happens if `doc` doesn't have a value and is between two intervals of
// the doc-value skip index.
upToInclusive = skipper.minDocID(0) - 1;
return;
}

upToInclusive = skipper.maxDocID(0);

// Now find the highest level where all docs map to the same bucket.
for (int level = 0; level < skipper.numLevels(); ++level) {
int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1;
long minBucket = preparedRounding.round(skipper.minValue(level));
long maxBucket = preparedRounding.round(skipper.maxValue(level));

if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) {
// All docs at this level have a value, and all values map to the same bucket.
upToInclusive = skipper.maxDocID(level);
upToSameBucket = true;
upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket);
if (upToBucketIndex < 0) {
upToBucketIndex = -1 - upToBucketIndex;
}
} else {
break;
}
}
}

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (doc > upToInclusive) {
advanceSkipper(doc, owningBucketOrd);
}

if (upToSameBucket) {
aggregator.incrementBucketDocCount(upToBucketIndex, 1L);
sub.collect(doc, upToBucketIndex);
} else if (values.advanceExact(doc)) {
final long value = values.longValue();
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
if (bucketIndex < 0) {
bucketIndex = -1 - bucketIndex;
aggregator.collectExistingBucket(sub, doc, bucketIndex);
} else {
aggregator.collectBucket(sub, doc, bucketIndex);
}
}
}

@Override
public void collect(DocIdStream stream) throws IOException {
// This will only be called if its the top agg
collect(stream, 0);
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
// This will only be called if its the sub aggregation
for (;;) {
int upToExclusive = upToInclusive + 1;
if (upToExclusive < 0) { // overflow
upToExclusive = Integer.MAX_VALUE;
}

if (upToSameBucket) {
if (sub == NO_OP_COLLECTOR) {
// stream.count maybe faster when we don't need to handle sub-aggs
long count = stream.count(upToExclusive);
aggregator.incrementBucketDocCount(upToBucketIndex, count);
} else {
final int[] count = { 0 };
stream.forEach(upToExclusive, doc -> {
sub.collect(doc, upToBucketIndex);
count[0]++;
});
aggregator.incrementBucketDocCount(upToBucketIndex, count[0]);
}
} else {
stream.forEach(upToExclusive, doc -> collect(doc, owningBucketOrd));
}

if (stream.mayHaveRemaining()) {
advanceSkipper(upToExclusive, owningBucketOrd);
} else {
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.search.aggregations.BitSetDocIdStream;
import org.opensearch.search.aggregations.BucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
Expand All @@ -23,8 +24,6 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

/**
* Range collector implementation that supports sub-aggregations by collecting doc IDs.
*/
Expand Down Expand Up @@ -85,10 +84,7 @@ public void finalizePreviousRange() {
DocIdSetIterator iterator = bitDocIdSet.iterator();
// build a new leaf collector for each bucket
LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(leafCtx);
while (iterator.nextDoc() != NO_MORE_DOCS) {
int currentDoc = iterator.docID();
sub.collect(currentDoc, bucketOrd);
}
sub.collect(new BitSetDocIdStream(bitSet, 0), bucketOrd);
logger.trace("collected sub aggregation for bucket {}", bucketOrd);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@

package org.opensearch.search.aggregations.bucket.histogram;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesSkipper;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
Expand All @@ -51,6 +54,7 @@
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.opensearch.search.aggregations.bucket.DeferringBucketCollector;
import org.opensearch.search.aggregations.bucket.HistogramSkiplistLeafCollector;
import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
Expand Down Expand Up @@ -135,6 +139,7 @@ static AutoDateHistogramAggregator build(
protected int roundingIdx;
protected Rounding.Prepared preparedRounding;

private final String fieldName;
private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;

private AutoDateHistogramAggregator(
Expand Down Expand Up @@ -218,6 +223,10 @@ protected Function<Long, Long> bucketOrdProducer() {
return (key) -> getBucketOrds().add(0, preparedRounding.round(key));
}
};

this.fieldName = (valuesSource instanceof ValuesSource.Numeric.FieldData)
? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName()
: null;
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
}

Expand Down Expand Up @@ -260,7 +269,21 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
return LeafBucketCollector.NO_OP_COLLECTOR;
}

DocValuesSkipper skipper = null;
if (this.fieldName != null) {
skipper = ctx.reader().getDocValuesSkipper(this.fieldName);
}
final SortedNumericDocValues values = valuesSource.longValues(ctx);
final NumericDocValues singleton = DocValues.unwrapSingleton(values);

if (skipper != null && singleton != null) {
// TODO: add hard bounds support
// TODO: SkipListLeafCollector should be used if the getLeafCollector invocation is from
// filterRewriteOptimizationContext when parent != null. Removing the check to collect
// performance numbers for now
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, getBucketOrds(), sub, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it turns out this preparedRound is too granular, so upToSameBucket is always false,

i.e. within the same skipList block (4k) the min != max.

Auto date histogram does some estimation of rounding, based on some starting interval, so we'll need to same within Skiplist version.

For now the game plan is:

  1. Starting with auto date at the top level, find a reasonable rounding (see getRounding() method)
  2. Then try out for sub agg, where for each top level bucket ord we'll need to cal a new rounding. One which is to use the top level skiplist to find maxDocId and use its value as max.

}

final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub);
return new LeafBucketCollectorBase(sub, values) {
@Override
Expand Down
Loading
Loading