Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.search.aggregations.metrics;

import org.apache.lucene.index.DocValuesSkipper;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.DoubleArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.fielddata.NumericDoubleValues;
Expand Down Expand Up @@ -81,8 +83,14 @@ class MinAggregator extends NumericMetricsAggregator.SingleValue implements Star

final String pointField;
final Function<byte[], Number> pointConverter;
final String fieldName;
final boolean fieldIsFloat;

DoubleArray mins;
LongArray skipUpTo;

private int defaultCollectorsUsed = 0;
private int skipListCollectorsUsed = 0;

MinAggregator(String name, ValuesSourceConfig config, SearchContext context, Aggregator parent, Map<String, Object> metadata)
throws IOException {
Expand All @@ -92,7 +100,13 @@ class MinAggregator extends NumericMetricsAggregator.SingleValue implements Star
if (valuesSource != null) {
mins = context.bigArrays().newDoubleArray(1, false);
mins.fill(0, mins.size(), Double.POSITIVE_INFINITY);
fieldName = valuesSource.getIndexFieldName();
fieldIsFloat = valuesSource.isFloatingPoint();
} else {
fieldName = null;
fieldIsFloat = false;
}

this.format = config.format();
this.pointConverter = pointReaderIfAvailable(config);
if (pointConverter != null) {
Expand Down Expand Up @@ -156,6 +170,23 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);

// Try to use skiplist optimization if available
DocValuesSkipper skipper = null;
if (this.fieldName != null) {
skipper = ctx.reader().getDocValuesSkipper(this.fieldName);
}

// Use skiplist collector if conditions are met
if (skipper != null) {
skipListCollectorsUsed++;
this.skipUpTo = bigArrays.newLongArray(1, false);
this.skipUpTo.fill(0, this.skipUpTo.size(), -1);
return new MinSkiplistLeafCollector(values, skipper, mins, fieldIsFloat, MinAggregator.this, sub);
}
Comment on lines +183 to +186
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Memory leak: skipUpTo array is reallocated on every leaf without releasing previous allocation.

The skipUpTo array is created fresh for each leaf collector but never released until doClose(). If processing multiple leaf segments, each call to getLeafCollector() allocates a new LongArray, leaking the previous one.

Consider either:

  1. Allocating once in the constructor and resetting per-leaf
  2. Releasing the old array before allocating a new one
         // Use skiplist collector if conditions are met
         if (skipper != null) {
             skipListCollectorsUsed++;
+            if (this.skipUpTo != null) {
+                Releasables.close(this.skipUpTo);
+            }
             this.skipUpTo = bigArrays.newLongArray(1, false);
             this.skipUpTo.fill(0, this.skipUpTo.size(), -1);
             return new MinSkiplistLeafCollector(values, skipper, mins, fieldIsFloat, MinAggregator.this, sub);
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
this.skipUpTo = bigArrays.newLongArray(1, false);
this.skipUpTo.fill(0, this.skipUpTo.size(), -1);
return new MinSkiplistLeafCollector(values, skipper, mins, fieldIsFloat, MinAggregator.this, sub);
}
if (skipper != null) {
skipListCollectorsUsed++;
if (this.skipUpTo != null) {
Releasables.close(this.skipUpTo);
}
this.skipUpTo = bigArrays.newLongArray(1, false);
this.skipUpTo.fill(0, this.skipUpTo.size(), -1);
return new MinSkiplistLeafCollector(values, skipper, mins, fieldIsFloat, MinAggregator.this, sub);
}
🤖 Prompt for AI Agents
In
server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java
around lines 183 to 186, skipUpTo is reallocated on every call to
getLeafCollector() causing a memory leak because previous LongArray instances
are not released; fix by either allocating skipUpTo once in the MinAggregator
constructor and reusing/resetting its values before returning each leaf
collector (call fill or set sentinel values) or, if per-leaf allocation is
required, release the previous LongArray by calling its close/release method (or
bigArrays.releaseRef/appropriate deallocation API) before assigning a new one,
and ensure doClose() still closes the final array.


// Fall back to standard collector selection logic
defaultCollectorsUsed++;
return new LeafBucketCollectorBase(sub, allValues) {
@Override
public void collect(int doc, long bucket) throws IOException {
Expand Down Expand Up @@ -192,14 +223,166 @@ public void collectRange(int min, int max) throws IOException {
mins.set(0, minimum);
}

private void growMins(long bucket) {
if (bucket >= mins.size()) {
long from = mins.size();
mins = bigArrays.grow(mins, bucket + 1);
mins.fill(from, mins.size(), Double.POSITIVE_INFINITY);

};
}

private DoubleArray growMins(long bucket) {
if (bucket >= mins.size()) {
long from = mins.size();
mins = context.bigArrays().grow(mins, bucket + 1);
mins.fill(from, mins.size(), Double.POSITIVE_INFINITY);
}
return mins;
}


private LongArray growSkipUpTo(long bucket) {
if (bucket >= skipUpTo.size()) {
long from = skipUpTo.size();
skipUpTo = context.bigArrays().grow(skipUpTo, bucket + 1);
skipUpTo.fill(from, skipUpTo.size(), -1);
}
return skipUpTo;
}

/**
* Specialized leaf collector that uses DocValuesSkipper to efficiently skip document ranges
* that cannot improve the current minimum value.
*
* This collector leverages skip list metadata to avoid processing documents when the skip range's
* minimum value is greater than or equal to the current tracked minimum for a bucket.
*/
private static class MinSkiplistLeafCollector extends LeafBucketCollectorBase {
private final NumericDoubleValues values;
private final DocValuesSkipper skipper;
private final MinAggregator minAgg;
private DoubleArray mins;
private final boolean isFloat;
private LongArray skipUpTo;
private final LeafBucketCollector sub;
private final boolean isSubNoOp;

/**
* Constructs a new MinSkiplistLeafCollector.
*
* @param values the numeric doc values for the field
* @param skipper the doc values skipper for skip list optimization
* @param mins the array storing minimum values per bucket
* @param isFloat
* @param minAggregator
* @param sub the sub-aggregator collector
*/
MinSkiplistLeafCollector(
NumericDoubleValues values,
DocValuesSkipper skipper,
DoubleArray mins,
boolean isFloat,
MinAggregator minAggregator,
LeafBucketCollector sub
) {
super(sub, null);
this.values = values;
this.isFloat = isFloat;
this.skipper = skipper;
this.mins = mins;
this.minAgg = minAggregator;
this.sub = sub;
this.isSubNoOp = sub == LeafBucketCollector.NO_OP_COLLECTOR;

}



/**
* Advances the skipper to the appropriate position and determines the skip range
* for a specific bucket. The result is stored in the skipUpTo array:
* - Positive value: Can skip up to and including this doc ID
* - Negative value: Cannot skip, must process documents individually
*
* @param doc the current document ID
* @param owningBucketOrd the bucket ordinal for which to evaluate skip range
* @throws IOException if an I/O error occurs
*/
private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
if (doc > skipper.maxDocID(0)) {
skipper.advance(doc);
}

// Initialize to "do not skip" state
long upToInclusive = -1;

if (skipper.minDocID(0) > doc) {
// Corner case: doc is between skip intervals
// Set to (minDocID - 1) but keep negative to indicate no skipping
upToInclusive = -(skipper.minDocID(0) - 1) - 1;
mins = minAgg.growMins(owningBucketOrd);
skipUpTo = minAgg.growSkipUpTo(owningBucketOrd);
skipUpTo.set(owningBucketOrd, upToInclusive);
return;
}

upToInclusive = skipper.maxDocID(0);

// Ensure arrays are large enough
mins = minAgg.growMins(owningBucketOrd);
skipUpTo = minAgg.growSkipUpTo(owningBucketOrd);
double currentMin = mins.get(owningBucketOrd);

// Check progressively larger skip levels
boolean canSkip = false;
for (int level = 0; level < skipper.numLevels(); ++level) {
// Convert skipper's minValue (stored as sortable long) to double
long sortableLong = skipper.minValue(level);
double skipperMin = isFloat ? NumericUtils.sortableLongToDouble(sortableLong) : sortableLong;

if (skipperMin >= currentMin) {
// All values in this range are >= current min, can skip
upToInclusive = skipper.maxDocID(level);
canSkip = true;
} else {
// This range might contain better minimums
break;
}
}
};

// Store result: negative if cannot skip, positive if can skip
skipUpTo.set(owningBucketOrd, canSkip ? upToInclusive : -upToInclusive - 1);
}

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Get skipUpTo value for this bucket
mins = minAgg.growMins(owningBucketOrd);
skipUpTo = minAgg.growSkipUpTo(owningBucketOrd);
long skipUpToValue = skipUpTo.get(owningBucketOrd);

// Extract the upToInclusive boundary (handle negative encoding)
long upToInclusive = skipUpToValue >= 0 ? skipUpToValue : -skipUpToValue - 1;

// If doc > upToInclusive, we need to advance the skipper
// TODO: check if it should be doc >= or doc >
if (doc >= upToInclusive) {
advanceSkipper(doc, owningBucketOrd);
skipUpToValue = skipUpTo.get(owningBucketOrd);
}
if (!isSubNoOp) {
sub.collect(doc, owningBucketOrd);
}
// If skipUpTo >= 0, we can skip this document
if (skipUpToValue >= 0) {
return;
}
Comment on lines +364 to +375
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

TODO comment and potential off-by-one error needs resolution.

The TODO at line 364 indicates uncertainty about whether the comparison should be >= or >. This needs to be resolved before merging.

Based on the logic:

  • upToInclusive represents the last doc ID that can be skipped (inclusive)
  • When doc >= upToInclusive, we need to re-evaluate the skip range

The current >= seems incorrect. If upToInclusive is the max doc that CAN be skipped, then:

  • doc == upToInclusive means doc is still in the skippable range
  • Only doc > upToInclusive should trigger re-evaluation
-            // TODO: check if it should be doc >= or doc >
-            if (doc >= upToInclusive) {
+            if (doc > upToInclusive) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TODO: check if it should be doc >= or doc >
if (doc >= upToInclusive) {
advanceSkipper(doc, owningBucketOrd);
skipUpToValue = skipUpTo.get(owningBucketOrd);
}
if (!isSubNoOp) {
sub.collect(doc, owningBucketOrd);
}
// If skipUpTo >= 0, we can skip this document
if (skipUpToValue >= 0) {
return;
}
if (doc > upToInclusive) {
advanceSkipper(doc, owningBucketOrd);
skipUpToValue = skipUpTo.get(owningBucketOrd);
}
if (!isSubNoOp) {
sub.collect(doc, owningBucketOrd);
}
// If skipUpTo >= 0, we can skip this document
if (skipUpToValue >= 0) {
return;
}
🤖 Prompt for AI Agents
In
server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java
around lines 364 to 375, the comparison using `doc >= upToInclusive` is
incorrect given `upToInclusive` is the last doc ID that may be skipped; change
the condition to `if (doc > upToInclusive) { advanceSkipper(doc,
owningBucketOrd); skipUpToValue = skipUpTo.get(owningBucketOrd); }`, remove the
TODO, and keep the rest of the logic (sub.collect and the subsequent
skipUpToValue check) unchanged so that documents equal to upToInclusive are
still considered skippable and only docs greater than it trigger re-evaluation.


// Otherwise, process the document
if (values.advanceExact(doc)) {
double value = values.doubleValue();
double min = mins.get(owningBucketOrd);
if (value < min) {
mins.set(owningBucketOrd, value);
}
}
}
}

private void precomputeLeafUsingStarTree(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException {
Expand Down Expand Up @@ -230,9 +413,14 @@ public InternalAggregation buildEmptyAggregation() {
return new InternalMin(name, Double.POSITIVE_INFINITY, format, metadata());
}

public Map<String, Object> collectDebugInfo() {
return Map.of("defaultCollectorsUsed", defaultCollectorsUsed,
"skipListCollectorsUsed", skipListCollectorsUsed);
}

@Override
public void doClose() {
Releasables.close(mins);
Releasables.close(mins, skipUpTo);
}

/**
Expand Down Expand Up @@ -305,6 +493,9 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
@Override
public void doReset() {
mins.fill(0, mins.size(), Double.POSITIVE_INFINITY);
if (skipUpTo != null) {
skipUpTo.fill(0, skipUpTo.size(), -1);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@

package org.opensearch.search.profile.aggregation;

import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.Scorable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.profile.Timer;

Expand Down Expand Up @@ -63,6 +65,26 @@ public void collect(int doc, long bucket) throws IOException {
}
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
collectTimer.start();
try {
delegate.collect(stream, owningBucketOrd);
} finally {
collectTimer.stop();
}
}

@Override
public void collectRange(int min, int max) throws IOException {
collectTimer.start();
try {
delegate.collectRange(min, max);
} finally {
collectTimer.stop();
}
}

@Override
public void setScorer(Scorable scorer) throws IOException {
delegate.setScorer(scorer);
Expand Down
Loading
Loading