Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public CombineGroupByOperator(List<Operator> operators, BrokerRequest brokerRequ
*/
@Override
protected IntermediateResultsBlock getNextBlock() {
ConcurrentHashMap<String, Object[]> resultsMap = new ConcurrentHashMap<>();
AtomicInteger numGroups = new AtomicInteger();
List<Map<String, Object>> results = new ArrayList<>();
ConcurrentLinkedQueue<ProcessingException> mergedProcessingExceptions = new ConcurrentLinkedQueue<>();

AggregationFunctionContext[] aggregationFunctionContexts =
Expand All @@ -113,6 +112,7 @@ protected IntermediateResultsBlock getNextBlock() {
AggregationFunction[] aggregationFunctions = new AggregationFunction[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
aggregationFunctions[i] = aggregationFunctionContexts[i].getAggregationFunction();
results.add(new ConcurrentHashMap<>(1000, 0.2f, 1000));
}

// We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished
Expand Down Expand Up @@ -154,26 +154,27 @@ public void runJob() {
// Merge aggregation group-by result.
AggregationGroupByResult aggregationGroupByResult = intermediateResultsBlock.getAggregationGroupByResult();
if (aggregationGroupByResult != null) {
// Iterate over the group-by keys, for each key, update the group-by result in the resultsMap.
Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
while (groupKeyIterator.hasNext()) {
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
resultsMap.compute(groupKey._stringKey, (key, value) -> {
if (value == null) {
if (numGroups.getAndIncrement() < _interSegmentNumGroupsLimit) {
value = new Object[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
value[i] = aggregationGroupByResult.getResultForKey(groupKey, i);
int index = 0;
for (Map<String, Object> resultsMap : results) {
// Iterate over the group-by keys, for each key, update the group-by result in the resultsMap.
Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
final int i = index;
AtomicInteger numGroups = new AtomicInteger();
while (groupKeyIterator.hasNext()) {
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
resultsMap.compute(groupKey._stringKey, (key, value) -> {
if (value == null) {
if (numGroups.getAndIncrement() < _interSegmentNumGroupsLimit) {
return aggregationGroupByResult.getResultForKey(groupKey, i);
}
} else {
return aggregationFunctions[i].merge(value, aggregationGroupByResult.getResultForKey(groupKey, i));
}
} else {
for (int i = 0; i < numAggregationFunctions; i++) {
value[i] = aggregationFunctions[i]
.merge(value[i], aggregationGroupByResult.getResultForKey(groupKey, i));
}
}
return value;
});
return value;
});
}

index ++;
}
}
} catch (Exception e) {
Expand All @@ -200,8 +201,11 @@ public void runJob() {
// Trim the results map.
AggregationGroupByTrimmingService aggregationGroupByTrimmingService =
new AggregationGroupByTrimmingService(aggregationFunctions, (int) _brokerRequest.getGroupBy().getTopN());

int resultSize = (numAggregationFunctions == 0) ? 0 : results.get(0).size();

List<Map<String, Object>> trimmedResults =
aggregationGroupByTrimmingService.trimIntermediateResultsMap(resultsMap);
aggregationGroupByTrimmingService.trimIntermediateResults(results);
IntermediateResultsBlock mergedBlock =
new IntermediateResultsBlock(aggregationFunctionContexts, trimmedResults, true);

Expand All @@ -227,7 +231,7 @@ public void runJob() {

// TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we
// are comparing number of groups across segments with the groups limit for each segment.
if (resultsMap.size() >= _innerSegmentNumGroupsLimit) {
if (resultSize >= _innerSegmentNumGroupsLimit) {
mergedBlock.setNumGroupsLimitReached(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,42 @@ public List<Map<String, Object>> trimIntermediateResultsMap(@Nonnull Map<String,
return Arrays.asList(trimmedResultMaps);
}

@Nonnull
public List<Map<String, Object>> trimIntermediateResults(@Nonnull List<Map<String, Object>> intermediateResults) {
int numAggregationFunctions = _aggregationFunctions.length;

if (intermediateResults.size() == 0) {
return intermediateResults;
}

int numGroups = intermediateResults.get(0).size();
if (numGroups > _trimThreshold) {
List<Map<String, Object>> trimmedResultMaps = new ArrayList<>(numAggregationFunctions);

// Trim the result only if number of groups is larger than the threshold
Sorter[] sorters = new Sorter[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
AggregationFunction aggregationFunction = _aggregationFunctions[i];
Sorter sorter = getSorter(_trimSize, aggregationFunction, aggregationFunction.isIntermediateResultComparable());
for (Map.Entry<String, Object> entry : intermediateResults.get(i).entrySet()) {
sorter.add(entry.getKey(), entry.getValue());
}
sorters[i] = sorter;
}

// Dump trimmed results into maps
for (int i = 0; i < numAggregationFunctions; i++) {
Map<String, Object> trimmedResultMap = new HashMap<>(_trimSize);
sorters[i].dumpToMap(trimmedResultMap);
trimmedResultMaps.add(trimmedResultMap);
}

return trimmedResultMaps;
}

return intermediateResults;
}

/**
* Given an array of maps from group key to final result for each aggregation function, trim the results to topN size.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
* </ul>
*/
public abstract class BaseMultiValueQueriesTest extends BaseQueriesTest {
private static final String AVRO_DATA = "data" + File.separator + "test_data-mv.avro";
private static final String AVRO_DATA = "data/test_data-mv.avro";
private static final String SEGMENT_NAME = "testTable_1756015683_1756015683";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "MultiValueQueriesTest");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
package org.apache.pinot.queries;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

import org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.GroupByResult;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.startree.hll.HllUtil;
import org.testng.annotations.Test;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.*;


public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValueQueriesTest {
Expand Down Expand Up @@ -407,4 +411,24 @@ public void testNumGroupsLimit() {
brokerResponse = getBrokerResponseForQuery(query, new InstancePlanMakerImplV2(1000, 1000));
assertTrue(brokerResponse.isNumGroupsLimitReached());
}

@Test
public void testNumGroupsMultiLimit() {
String query = "SELECT COUNT(*), SUM(column1) FROM testTable GROUP BY column7";

BrokerResponseNative brokerResponse = getBrokerResponseForQuery(query);
assertFalse(brokerResponse.isNumGroupsLimitReached());

List<AggregationResult> results = brokerResponse.getAggregationResults();
Iterator<GroupByResult> resultsIter = results.get(0).getGroupByResult().iterator();
assertEquals(resultsIter.next().getValue(), "199756");
assertEquals(resultsIter.next().getValue(), "29944");

resultsIter = results.get(1).getGroupByResult().iterator();
assertEquals(resultsIter.next().getValue(), "190754303720564.00000");
assertEquals(resultsIter.next().getValue(), "31917445702108.00000");

brokerResponse = getBrokerResponseForQuery(query, new InstancePlanMakerImplV2(5, 5));
assertTrue(brokerResponse.isNumGroupsLimitReached());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ public void setUp() {
_trimmingService = new AggregationGroupByTrimmingService(AGGREGATION_FUNCTIONS, GROUP_BY_TOP_N);
}

private int checkTrimmedResults(List<Map<String, Object>> trimmedIntermediateResultMaps) {
Map<String, Object> trimmedSumResultMap = trimmedIntermediateResultMaps.get(0);
Map<String, Object> trimmedDistinctCountResultMap = trimmedIntermediateResultMaps.get(1);
int trimSize = trimmedSumResultMap.size();
Assert.assertEquals(trimmedDistinctCountResultMap.size(), trimSize, ERROR_MESSAGE);
for (int i = NUM_GROUPS - trimSize; i < NUM_GROUPS; i++) {
String group = _groups.get(i);
Assert.assertEquals(((Double) trimmedSumResultMap.get(group)).intValue(), i, ERROR_MESSAGE);
Assert.assertEquals(((IntOpenHashSet) trimmedDistinctCountResultMap.get(group)).size(),
i / (NUM_GROUPS / MAX_SIZE_OF_SET) + 1, ERROR_MESSAGE);
}

return trimSize;
}

@SuppressWarnings("unchecked")
@Test
public void testTrimming() {
Expand All @@ -95,24 +110,15 @@ public void testTrimming() {
}
List<Map<String, Object>> trimmedIntermediateResultMaps =
_trimmingService.trimIntermediateResultsMap(intermediateResultsMap);
Map<String, Object> trimmedSumResultMap = trimmedIntermediateResultMaps.get(0);
Map<String, Object> trimmedDistinctCountResultMap = trimmedIntermediateResultMaps.get(1);
int trimSize = trimmedSumResultMap.size();
Assert.assertEquals(trimmedDistinctCountResultMap.size(), trimSize, ERROR_MESSAGE);
for (int i = NUM_GROUPS - trimSize; i < NUM_GROUPS; i++) {
String group = _groups.get(i);
Assert.assertEquals(((Double) trimmedSumResultMap.get(group)).intValue(), i, ERROR_MESSAGE);
Assert.assertEquals(((IntOpenHashSet) trimmedDistinctCountResultMap.get(group)).size(),
i / (NUM_GROUPS / MAX_SIZE_OF_SET) + 1, ERROR_MESSAGE);
}
int trimSize = checkTrimmedResults(trimmedIntermediateResultMaps);

// Test Broker side trimming
Map<String, Comparable> finalDistinctCountResultMap = new HashMap<>(trimSize);
for (Map.Entry<String, Object> entry : trimmedDistinctCountResultMap.entrySet()) {
for (Map.Entry<String, Object> entry : trimmedIntermediateResultMaps.get(1).entrySet()) {
finalDistinctCountResultMap.put(entry.getKey(), ((IntOpenHashSet) entry.getValue()).size());
}
List[] groupByResultLists =
_trimmingService.trimFinalResults(new Map[]{trimmedSumResultMap, finalDistinctCountResultMap});
_trimmingService.trimFinalResults(new Map[]{trimmedIntermediateResultMaps.get(0), finalDistinctCountResultMap});
List<GroupByResult> sumGroupByResultList = groupByResultLists[0];
List<GroupByResult> distinctCountGroupByResultList = groupByResultLists[1];
for (int i = 0; i < GROUP_BY_TOP_N; i++) {
Expand All @@ -132,6 +138,39 @@ public void testTrimming() {
}
}

@Test
public void testTrimmingResults() {
// Test Server side trimming
List<Map<String, Object>> intermediateResults = new ArrayList<>();
for (int i = 0; i < 2; i++) {
intermediateResults.add(new HashMap<>(NUM_GROUPS));
}

for (int i = 0; i < NUM_GROUPS; i++) {
IntOpenHashSet set = new IntOpenHashSet();
for (int j = 0; j <= i; j += NUM_GROUPS / MAX_SIZE_OF_SET) {
set.add(j);
}
intermediateResults.get(0).put(_groups.get(i), (double) i);
intermediateResults.get(1).put(_groups.get(i), set);
}

List<Map<String, Object>> trimmedIntermediateResultMaps =
_trimmingService.trimIntermediateResults(intermediateResults);
Map<String, Object> trimmedSumResultMap = trimmedIntermediateResultMaps.get(0);
Map<String, Object> trimmedDistinctCountResultMap = trimmedIntermediateResultMaps.get(1);
int trimSize = trimmedSumResultMap.size();
Assert.assertEquals(trimmedDistinctCountResultMap.size(), trimSize, ERROR_MESSAGE);
for (int i = NUM_GROUPS - trimSize; i < NUM_GROUPS; i++) {
String group = _groups.get(i);
Assert.assertEquals(((Double) trimmedSumResultMap.get(group)).intValue(), i, ERROR_MESSAGE);
Assert.assertEquals(((IntOpenHashSet) trimmedDistinctCountResultMap.get(group)).size(),
i / (NUM_GROUPS / MAX_SIZE_OF_SET) + 1, ERROR_MESSAGE);
}

checkTrimmedResults(trimmedIntermediateResultMaps);
}

private static String buildGroupString(List<String> group) {
StringBuilder groupStringBuilder = new StringBuilder();
for (int i = 0; i < NUM_GROUP_KEYS; i++) {
Expand Down