diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java index 95e0f1b19aac..1baa5ac9be09 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByDeserializationBenchmark.java @@ -36,7 +36,6 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnType; @@ -114,7 +113,7 @@ public boolean isIntermediateResultAsMapCompat() }, null, null, - new GroupByStatsProvider() + null ); decoratedMapper = groupByQueryQueryToolChest.decorateObjectMapper(undecoratedMapper, sqlQuery); diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 300841fab5d7..2950e70de1ea 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -54,7 +54,6 @@ import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -369,7 +368,6 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergePool, config); final GroupingEngine groupingEngine = new GroupingEngine( @@ -378,8 +376,7 @@ public String getFormatString() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - QueryBenchmarkUtil.NOOP_QUERYWATCHER, - groupByStatsProvider + QueryBenchmarkUtil.NOOP_QUERYWATCHER ); factory = new GroupByQueryRunnerFactory( diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 2a3aede2a904..63e55e85adf3 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -81,7 +81,6 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -346,7 +345,6 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory( bufferSupplier, processingConfig.getNumMergeBuffers() ); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergeBufferPool, config); final GroupingEngine groupingEngine = new GroupingEngine( @@ -355,8 +353,7 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory( groupByResourcesReservationPool, mapper, mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - groupByStatsProvider + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); return new GroupByQueryRunnerFactory(groupingEngine, toolChest, bufferPool); diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java index 2056ffbea0d3..4e22333d4e9c 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java @@ -65,7 +65,6 @@ import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -486,7 +485,6 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergePool, config); final GroupingEngine groupingEngine = new GroupingEngine( @@ -495,8 +493,7 @@ public String getFormatString() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - QueryBenchmarkUtil.NOOP_QUERYWATCHER, - groupByStatsProvider + QueryBenchmarkUtil.NOOP_QUERYWATCHER ); factory = new GroupByQueryRunnerFactory( diff --git a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java index 46248da5a7ec..dfbc3654d949 100644 --- a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java +++ b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java @@ -44,7 +44,6 @@ import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -74,7 +73,6 @@ public void setup() throws IOException final BlockingPool mergePool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergePool, config); @@ -110,8 +108,7 @@ public int getNumThreads() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new DefaultObjectMapper(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - groupByStatsProvider + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java index 75c7c4a5a1ba..fb768d9ea48e 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java @@ -62,7 +62,6 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.segment.column.ColumnType; @@ -278,9 +277,7 @@ public DatasketchesProjectionTest( ), TestHelper.makeJsonMapper(), TestHelper.makeSmileMapper(), - (query, future) -> { - }, - new GroupByStatsProvider() + (query, future) -> {} ); } diff --git a/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryRunner.java index 4baef60639ff..9673be4ebe2d 100644 --- a/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryRunner.java @@ -34,6 +34,8 @@ */ public class MetricsEmittingQueryRunner implements QueryRunner { + public static final ObjLongConsumer> NOOP_METRIC_REPORTER = (metrics, value) -> {}; + private final ServiceEmitter emitter; private final QueryToolChest> queryToolChest; private final QueryRunner queryRunner; diff --git a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java index 5fc3ab5b8eeb..61db5b62f39a 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java @@ -40,7 +40,7 @@ public class QueryDataSource implements DataSource private final Query query; @JsonCreator - public QueryDataSource(@JsonProperty("query") Query query) + public QueryDataSource(@JsonProperty("query") Query query) { this.query = Preconditions.checkNotNull(query, "'query' must be nonnull"); } @@ -56,7 +56,7 @@ public Set getTableNames() } @JsonProperty - public Query getQuery() + public Query getQuery() { return query; } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index a2a068207188..0c94683b30d2 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -244,7 +244,7 @@ public Object mergeValues(Object oldValue, Object newValue) */ public static class LongKey extends AbstractKey { - LongKey(String name, boolean inHeader) + public LongKey(String name, boolean inHeader) { super(name, inHeader, false, Long.class); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetrics.java index 6b02d8c29bb3..f058586fd43e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetrics.java @@ -58,4 +58,49 @@ public void granularity(GroupByQuery query) { //Don't emit by default } + + @Override + public void mergeBufferAcquisitionTime(long mergeBufferAcquisitionTime) + { + reportMetric("mergeBufferAcquisitionTime", mergeBufferAcquisitionTime); + this.mergeBufferAcquisitonTime = mergeBufferAcquisitionTime; + } + + @Override + public void bytesSpilledToStorage(long bytesSpilledToStorage) + { + reportMetric("bytesSpilledToStorage", bytesSpilledToStorage); + this.bytesSpilledToStorage = bytesSpilledToStorage; + } + + @Override + public void mergeDictionarySize(long mergeDictionarySize) + { + reportMetric("mergeDictionarySize", mergeDictionarySize); + this.mergeDictionarySize = mergeDictionarySize; + } + + // The following are used for channeling the metrics to GroupByStatsMonitor. + // We can remove these methods should we decide to sunset the GroupByStatsMonitor. + private long mergeBufferAcquisitonTime = 0L; + private long bytesSpilledToStorage = 0L; + private long mergeDictionarySize = 0L; + + @Override + public long getSpilledBytes() + { + return bytesSpilledToStorage; + } + + @Override + public long getMergeDictionarySize() + { + return mergeDictionarySize; + } + + @Override + public long getMergeBufferAcquisitionTime() + { + return mergeBufferAcquisitonTime; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryMetrics.java index 292828fc9858..e55ecc8572ed 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryMetrics.java @@ -42,7 +42,7 @@ public interface GroupByQueryMetrics extends QueryMetrics void numMetrics(GroupByQuery query); /** - * Sets the number of "complex" metrics of the given groupBy query as dimension. By default it is assumed that + * Sets the number of "complex" metrics of the given groupBy query as dimension. By default, it is assumed that * "complex" metric is a metric of not long or double type, but it could be redefined in the implementation of this * method. */ @@ -54,4 +54,23 @@ public interface GroupByQueryMetrics extends QueryMetrics */ @PublicApi void granularity(GroupByQuery query); + + @PublicApi + void mergeBufferAcquisitionTime(long mergeBufferAcquisitionTime); + + @PublicApi + void bytesSpilledToStorage(long bytesSpilledToStorage); + + @PublicApi + void mergeDictionarySize(long mergeDictionarySize); + + // The get metrics methods below are used for GroupByStatsMonitor + @PublicApi + long getSpilledBytes(); + + @PublicApi + long getMergeDictionarySize(); + + @PublicApi + long getMergeBufferAcquisitionTime(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index aa838591ccb7..dfc8457a6996 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -88,8 +88,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest { private static final byte GROUPBY_QUERY = 0x14; - private static final TypeReference OBJECT_TYPE_REFERENCE = - new TypeReference<>() {}; + private static final TypeReference OBJECT_TYPE_REFERENCE = new TypeReference<>() {}; private static final TypeReference TYPE_REFERENCE = new TypeReference<>() {}; private final GroupingEngine groupingEngine; @@ -113,22 +112,6 @@ public GroupByQueryQueryToolChest( ); } - @VisibleForTesting - public GroupByQueryQueryToolChest( - GroupingEngine groupingEngine, - GroupByResourcesReservationPool groupByResourcesReservationPool, - GroupByStatsProvider groupByStatsProvider - ) - { - this( - groupingEngine, - GroupByQueryConfig::new, - DefaultGroupByQueryMetricsFactory.instance(), - groupByResourcesReservationPool, - groupByStatsProvider - ); - } - @Inject public GroupByQueryQueryToolChest( GroupingEngine groupingEngine, @@ -151,7 +134,6 @@ public QueryRunner mergeResults(final QueryRunner runner) return mergeResults(runner, true); } - @Override public QueryRunner mergeResults(final QueryRunner runner, boolean willMergeRunner) { @@ -160,8 +142,7 @@ public QueryRunner mergeResults(final QueryRunner runner, return runner.run(queryPlus, responseContext); } - final GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery(); - return initAndMergeGroupByResults(groupByQuery, runner, responseContext, willMergeRunner); + return initAndMergeGroupByResults(queryPlus, runner, responseContext, willMergeRunner); }; } @@ -178,23 +159,21 @@ public Comparator createResultComparator(Query query) } private Sequence initAndMergeGroupByResults( - final GroupByQuery query, + QueryPlus queryPlus, QueryRunner runner, ResponseContext context, boolean willMergeRunner ) { + final GroupByQuery query = (GroupByQuery) queryPlus.getQuery(); + boolean reportMetricsForEmission = queryPlus.getQueryMetrics() != null; + // Reserve the group by resources (merge buffers) required for executing the query final QueryResourceId queryResourceId = query.context().getQueryResourceId(); - final GroupByStatsProvider.PerQueryStats perQueryStats = - groupByStatsProvider.getPerQueryStatsContainer(query.context().getQueryResourceId()); - groupByResourcesReservationPool.reserve( - queryResourceId, - query, - willMergeRunner, - perQueryStats - ); + long startNs = System.nanoTime(); + groupByResourcesReservationPool.reserve(queryResourceId, query, willMergeRunner); + context.add(GroupByResponseContextKeys.GROUPBY_MERGE_BUFFER_ACQUISITION_TIME_KEY, System.nanoTime() - startNs); final GroupByQueryResources resource = groupByResourcesReservationPool.fetch(queryResourceId); if (resource == null) { @@ -206,18 +185,19 @@ private Sequence initAndMergeGroupByResults( try { Closer closer = Closer.create(); - final Sequence mergedSequence = mergeGroupByResults( - query, - resource, - runner, - context, - closer, - perQueryStats - ); + final Sequence mergedSequence = mergeGroupByResults(query, resource, runner, context, closer); // Clean up the resources reserved during the execution of the query closer.register(() -> groupByResourcesReservationPool.clean(queryResourceId)); - closer.register(() -> groupByStatsProvider.closeQuery(query.context().getQueryResourceId())); + + if (reportMetricsForEmission) { + closer.register(() -> { + GroupByQueryMetrics queryMetrics = (GroupByQueryMetrics) queryPlus.getQueryMetrics(); + populateQueryMetrics(queryMetrics, context); + groupByStatsProvider.aggregateStats(queryMetrics); + }); + } + return Sequences.withBaggage(mergedSequence, closer); } catch (Exception e) { @@ -227,19 +207,39 @@ private Sequence initAndMergeGroupByResults( } } + private void populateQueryMetrics(GroupByQueryMetrics queryMetrics, ResponseContext context) + { + Object bytesSpilledToStorage = context.get(GroupByResponseContextKeys.GROUPBY_BYTES_SPILLED_TO_STORAGE_KEY); + + if (bytesSpilledToStorage != null) { + queryMetrics.bytesSpilledToStorage((Long) bytesSpilledToStorage); + } + + Object mergeDictionarySize = context.get(GroupByResponseContextKeys.GROUPBY_MERGE_DICTIONARY_SIZE_KEY); + + if (mergeDictionarySize != null) { + queryMetrics.mergeDictionarySize((Long) mergeDictionarySize); + } + + Object mergeBufferAcquisitionTime = context.get(GroupByResponseContextKeys.GROUPBY_MERGE_BUFFER_ACQUISITION_TIME_KEY); + + if (mergeBufferAcquisitionTime != null) { + queryMetrics.mergeBufferAcquisitionTime((Long) mergeBufferAcquisitionTime); + } + } + private Sequence mergeGroupByResults( final GroupByQuery query, GroupByQueryResources resource, QueryRunner runner, ResponseContext context, - Closer closer, - GroupByStatsProvider.PerQueryStats perQueryStats + Closer closer ) { if (isNestedQueryPushDown(query)) { - return mergeResultsWithNestedQueryPushDown(query, resource, runner, context, perQueryStats); + return mergeResultsWithNestedQueryPushDown(query, resource, runner, context); } - return mergeGroupByResultsWithoutPushDown(query, resource, runner, context, closer, perQueryStats); + return mergeGroupByResultsWithoutPushDown(query, resource, runner, context, closer); } private Sequence mergeGroupByResultsWithoutPushDown( @@ -247,88 +247,68 @@ private Sequence mergeGroupByResultsWithoutPushDown( GroupByQueryResources resource, QueryRunner runner, ResponseContext context, - Closer closer, - GroupByStatsProvider.PerQueryStats perQueryStats + Closer closer ) { // If there's a subquery, merge subquery results and then apply the aggregator + final DataSource maybeQueryDataSource = query.getDataSource(); - final DataSource dataSource = query.getDataSource(); - - if (dataSource instanceof QueryDataSource) { - final GroupByQuery subquery; - try { - // Inject outer query context keys into subquery if they don't already exist in the subquery context. - // Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win. - final Map subqueryContext = new TreeMap<>(); - if (query.getContext() != null) { - for (Map.Entry entry : query.getContext().entrySet()) { - if (entry.getValue() != null) { - subqueryContext.put(entry.getKey(), entry.getValue()); - } - } - } - if (((QueryDataSource) dataSource).getQuery().getContext() != null) { - subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext()); - } - subqueryContext.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false); - subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(subqueryContext); - - closer.register(() -> groupByStatsProvider.closeQuery(subquery.context().getQueryResourceId())); + if (!(maybeQueryDataSource instanceof QueryDataSource)) { + if (query.getSubtotalsSpec() == null) { + Sequence mergedResults = groupingEngine.mergeResults(runner, query, context); + return groupingEngine.applyPostProcessing(mergedResults, query); + } else { + Sequence mergedResults = groupingEngine.mergeResults(runner, query.withSubtotalsSpec(null), context); + return groupingEngine.processSubtotalsSpec(query, resource, mergedResults, context); } - catch (ClassCastException e) { - throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); + } + + final QueryDataSource dataSource = (QueryDataSource) maybeQueryDataSource; + // Inject outer query context keys into subquery if they don't already exist in the subquery context. + // Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win. + final Map subqueryContext = new TreeMap<>(); + if (query.getContext() != null) { + for (Map.Entry entry : query.getContext().entrySet()) { + if (entry.getValue() != null) { + subqueryContext.put(entry.getKey(), entry.getValue()); + } } + } + if (dataSource.getQuery().getContext() != null) { + subqueryContext.putAll(dataSource.getQuery().getContext()); + } + subqueryContext.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false); - final Sequence subqueryResult = mergeGroupByResults( - subquery, - resource, - runner, - context, - closer, - perQueryStats - ); + final GroupByQuery subquery; + try { + subquery = (GroupByQuery) dataSource.getQuery().withOverriddenContext(subqueryContext); + } + catch (ClassCastException e) { + throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); + } - final Sequence finalizingResults = finalizeSubqueryResults(subqueryResult, subquery); - - if (query.getSubtotalsSpec() != null) { - return groupingEngine.processSubtotalsSpec( - query, - resource, - groupingEngine.processSubqueryResult( - subquery, - query, resource, - finalizingResults, - false, - perQueryStats - ), - perQueryStats - ); - } else { - return groupingEngine.applyPostProcessing( - groupingEngine.processSubqueryResult( - subquery, - query, - resource, - finalizingResults, - false, - perQueryStats - ), - query - ); - } + final Sequence subqueryResult = mergeGroupByResults( + subquery, + resource, + runner, + context, + closer + ); + final Sequence finalizingResults = finalizeSubqueryResults(subqueryResult, subquery); + final Sequence processedSubqueryResults = groupingEngine.processSubqueryResult( + subquery, + query, + resource, + context, + finalizingResults, + false + ); + + if (query.getSubtotalsSpec() == null) { + return groupingEngine.applyPostProcessing(processedSubqueryResults, query); } else { - if (query.getSubtotalsSpec() != null) { - return groupingEngine.processSubtotalsSpec( - query, - resource, - groupingEngine.mergeResults(runner, query.withSubtotalsSpec(null), context), - perQueryStats - ); - } else { - return groupingEngine.applyPostProcessing(groupingEngine.mergeResults(runner, query, context), query); - } + return groupingEngine.processSubtotalsSpec(query, resource, processedSubqueryResults, context); } } @@ -336,24 +316,23 @@ private Sequence mergeResultsWithNestedQueryPushDown( GroupByQuery query, GroupByQueryResources resource, QueryRunner runner, - ResponseContext context, - GroupByStatsProvider.PerQueryStats perQueryStats + ResponseContext context ) { Sequence pushDownQueryResults = groupingEngine.mergeResults(runner, query, context); + final Sequence finalizedResults = finalizeSubqueryResults(pushDownQueryResults, query); - GroupByQuery rewrittenQuery = rewriteNestedQueryForPushDown(query); - return groupingEngine.applyPostProcessing( - groupingEngine.processSubqueryResult( - query, - rewrittenQuery, - resource, - finalizedResults, - true, - perQueryStats - ), - query + + Sequence processedSubqueryResult = groupingEngine.processSubqueryResult( + query, + rewriteNestedQueryForPushDown(query), + resource, + context, + finalizedResults, + true ); + + return groupingEngine.applyPostProcessing(processedSubqueryResult, query); } /** @@ -363,7 +342,7 @@ private Sequence mergeResultsWithNestedQueryPushDown( @VisibleForTesting GroupByQuery rewriteNestedQueryForPushDown(GroupByQuery query) { - return query.withAggregatorSpecs(Lists.transform(query.getAggregatorSpecs(), (agg) -> agg.getCombiningFactory())) + return query.withAggregatorSpecs(Lists.transform(query.getAggregatorSpecs(), AggregatorFactory::getCombiningFactory)) .withDimensionSpecs(Lists.transform( query.getDimensions(), (dim) -> new DefaultDimensionSpec( @@ -376,19 +355,14 @@ GroupByQuery rewriteNestedQueryForPushDown(GroupByQuery query) private Sequence finalizeSubqueryResults(Sequence subqueryResult, GroupByQuery subquery) { - final Sequence finalizingResults; if (subquery.context().isFinalize(false)) { - finalizingResults = new MappedSequence<>( + return new MappedSequence<>( subqueryResult, - makePreComputeManipulatorFn( - subquery, - MetricManipulatorFns.finalizing() - )::apply + makePreComputeManipulatorFn(subquery, MetricManipulatorFns.finalizing())::apply ); } else { - finalizingResults = subqueryResult; + return subqueryResult; } - return finalizingResults; } public static boolean isNestedQueryPushDown(GroupByQuery q) @@ -516,7 +490,7 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r dimensionSpecs.add(dimensionSpec); } } - + // TODO: Do I need to aggregate the responseContext for the subquery? return runner.run( queryPlus.withQuery(groupByQuery.withDimensionSpecs(dimensionSpecs)), responseContext diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index bd585dc7218c..40f1b4b10f4d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -115,11 +115,9 @@ public GroupByResourcesReservationPool( public void reserve( QueryResourceId queryResourceId, GroupByQuery groupByQuery, - boolean willMergeRunner, - GroupByStatsProvider.PerQueryStats perQueryStats + boolean willMergeRunner ) { - long startNs = System.nanoTime(); if (queryResourceId == null) { throw DruidException.defensive("Query resource id must be populated"); } @@ -151,8 +149,6 @@ public void reserve( // Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the // allocated resources from it reference.compareAndSet(null, resources); - - perQueryStats.mergeBufferAcquisitionTime(System.nanoTime() - startNs); } /** diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResponseContextKeys.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResponseContextKeys.java new file mode 100644 index 000000000000..101322ab00f1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResponseContextKeys.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import org.apache.druid.query.context.ResponseContext; + +import javax.annotation.Nullable; + +/** + * Response context keys for GroupBy query metrics. + * These keys are used to aggregate metrics from parallel query execution threads + * using MAX aggregation (taking the maximum value across all threads). + */ +public class GroupByResponseContextKeys +{ + public static final String GROUPBY_MERGE_BUFFER_ACQUISITION_TIME_NAME = "groupByMergeBufferAcquisitionTime"; + public static final String GROUPBY_BYTES_SPILLED_TO_STORAGE_NAME = "groupByBytesSpilledToStorage"; + public static final String GROUPBY_MERGE_DICTIONARY_SIZE_NAME = "groupByMergeDictionarySize"; + + private static Object mergeMax(@Nullable Object oldValue, @Nullable Object newValue) + { + if (oldValue == null && newValue == null) { + return 0L; + } + + if (oldValue == null) { + return newValue; + } else if (newValue == null) { + return oldValue; + } + + return Math.max((Long) oldValue, (Long) newValue); + } + /** + * Maximum bytes spilled to storage across all parallel threads processing segments. + * This represents the peak disk usage during query execution. + */ + public static final ResponseContext.Key GROUPBY_BYTES_SPILLED_TO_STORAGE_KEY = + new ResponseContext.LongKey(GROUPBY_BYTES_SPILLED_TO_STORAGE_NAME, false) + { + @Override + public Object mergeValues(Object oldValue, Object newValue) + { + return mergeMax(oldValue, newValue); + } + }; + + /** + * Maximum merge dictionary size across all parallel threads processing segments. + * This represents the peak dictionary size used during query execution. + */ + public static final ResponseContext.Key GROUPBY_MERGE_DICTIONARY_SIZE_KEY = + new ResponseContext.LongKey(GROUPBY_MERGE_DICTIONARY_SIZE_NAME, false) + { + @Override + public Object mergeValues(Object oldValue, Object newValue) + { + return mergeMax(oldValue, newValue); + } + }; + + /** + * Maximum merge buffer acquisition time across all parallel threads processing segments. + * This represents the longest time any thread waited to acquire a merge buffer. + */ + public static final ResponseContext.Key GROUPBY_MERGE_BUFFER_ACQUISITION_TIME_KEY = + new ResponseContext.LongKey(GROUPBY_MERGE_BUFFER_ACQUISITION_TIME_NAME, false) + { + @Override + public Object mergeValues(Object oldValue, Object newValue) + { + return mergeMax(oldValue, newValue); + } + }; + + static { + ResponseContext.Keys.instance().registerKeys( + new ResponseContext.Key[]{ + GROUPBY_BYTES_SPILLED_TO_STORAGE_KEY, + GROUPBY_MERGE_DICTIONARY_SIZE_KEY, + GROUPBY_MERGE_BUFFER_ACQUISITION_TIME_KEY + } + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java index a5ce31cb5f98..eeb7020060d0 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java @@ -20,42 +20,33 @@ package org.apache.druid.query.groupby; import org.apache.druid.guice.LazySingleton; -import org.apache.druid.query.QueryResourceId; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; /** - * Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size. + * Aggregates {@link GroupByQueryMetrics} emitted by in-flight groupBy queries and exposes a snapshot that can be + * periodically consumed by {@link org.apache.druid.server.metrics.GroupByStatsMonitor}. The provider keeps track of + * aggregate counters such as merge-buffer acquisition time, spilled bytes, and the dictionary sizes used while + * merging results. */ @LazySingleton public class GroupByStatsProvider { - private final Map perQueryStats; private final AggregateStats aggregateStatsContainer; public GroupByStatsProvider() { - this.perQueryStats = new ConcurrentHashMap<>(); - this.aggregateStatsContainer = new AggregateStats(); - } - - public PerQueryStats getPerQueryStatsContainer(QueryResourceId resourceId) - { - if (resourceId == null) { - return null; - } - return perQueryStats.computeIfAbsent(resourceId, value -> new PerQueryStats()); + this.aggregateStatsContainer = AggregateStats.EMPTY_STATS; } - public synchronized void closeQuery(QueryResourceId resourceId) + /** + * Adds the stats reported by a single query execution to the shared accumulator. Callers are expected to provide + * the {@link GroupByQueryMetrics} associated with the query once all relevant numbers have been recorded on the + * metrics instance. + * + * @param groupByQueryMetrics the query metrics to merge into the aggregate view + */ + public void aggregateStats(GroupByQueryMetrics groupByQueryMetrics) { - if (resourceId == null || !perQueryStats.containsKey(resourceId)) { - return; - } - PerQueryStats container = perQueryStats.remove(resourceId); - aggregateStatsContainer.addQueryStats(container); + aggregateStatsContainer.addQueryStats(groupByQueryMetrics); } public synchronized AggregateStats getStatsSince() @@ -63,17 +54,18 @@ public synchronized AggregateStats getStatsSince() return aggregateStatsContainer.reset(); } + /** + * Immutable snapshot of the aggregated groupBy metrics captured between two {@link #getStatsSince()} calls. + */ public static class AggregateStats { - private long mergeBufferQueries = 0; - private long mergeBufferAcquisitionTimeNs = 0; - private long spilledQueries = 0; - private long spilledBytes = 0; - private long mergeDictionarySize = 0; + private long mergeBufferQueries; + private long mergeBufferAcquisitionTimeNs; + private long spilledQueries; + private long spilledBytes; + private long mergeDictionarySize; - public AggregateStats() - { - } + public static final AggregateStats EMPTY_STATS = new AggregateStats(0L, 0L, 0L, 0L, 0L); public AggregateStats( long mergeBufferQueries, @@ -115,22 +107,22 @@ public long getMergeDictionarySize() return mergeDictionarySize; } - public void addQueryStats(PerQueryStats perQueryStats) + private void addQueryStats(GroupByQueryMetrics groupByQueryMetrics) { - if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) { + if (groupByQueryMetrics.getMergeBufferAcquisitionTime() > 0) { mergeBufferQueries++; - mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs(); + mergeBufferAcquisitionTimeNs += groupByQueryMetrics.getMergeBufferAcquisitionTime(); } - if (perQueryStats.getSpilledBytes() > 0) { + if (groupByQueryMetrics.getSpilledBytes() > 0) { spilledQueries++; - spilledBytes += perQueryStats.getSpilledBytes(); + spilledBytes += groupByQueryMetrics.getSpilledBytes(); } - mergeDictionarySize += perQueryStats.getMergeDictionarySize(); + mergeDictionarySize += groupByQueryMetrics.getMergeDictionarySize(); } - public AggregateStats reset() + private AggregateStats reset() { AggregateStats aggregateStats = new AggregateStats( @@ -150,41 +142,4 @@ public AggregateStats reset() return aggregateStats; } } - - public static class PerQueryStats - { - private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0); - private final AtomicLong spilledBytes = new AtomicLong(0); - private final AtomicLong mergeDictionarySize = new AtomicLong(0); - - public void mergeBufferAcquisitionTime(long delay) - { - mergeBufferAcquisitionTimeNs.addAndGet(delay); - } - - public void spilledBytes(long bytes) - { - spilledBytes.addAndGet(bytes); - } - - public void dictionarySize(long size) - { - mergeDictionarySize.addAndGet(size); - } - - public long getMergeBufferAcquisitionTimeNs() - { - return mergeBufferAcquisitionTimeNs.get(); - } - - public long getSpilledBytes() - { - return spilledBytes.get(); - } - - public long getMergeDictionarySize() - { - return mergeDictionarySize.get(); - } - } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 3f85bbf63e1f..c4f6030ad737 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -120,7 +120,6 @@ public class GroupingEngine private final ObjectMapper jsonMapper; private final ObjectMapper spillMapper; private final QueryWatcher queryWatcher; - private final GroupByStatsProvider groupByStatsProvider; @Inject public GroupingEngine( @@ -129,8 +128,7 @@ public GroupingEngine( @Merging GroupByResourcesReservationPool groupByResourcesReservationPool, @Json ObjectMapper jsonMapper, @Smile ObjectMapper spillMapper, - QueryWatcher queryWatcher, - GroupByStatsProvider groupByStatsProvider + QueryWatcher queryWatcher ) { this.processingConfig = processingConfig; @@ -139,7 +137,6 @@ public GroupingEngine( this.jsonMapper = jsonMapper; this.spillMapper = spillMapper; this.queryWatcher = queryWatcher; - this.groupByStatsProvider = groupByStatsProvider; } /** @@ -454,8 +451,7 @@ public QueryRunner mergeRunners( processingConfig.getNumThreads(), processingConfig.intermediateComputeSizeBytes(), spillMapper, - processingConfig.getTmpDir(), - groupByStatsProvider + processingConfig.getTmpDir() ); } @@ -587,9 +583,9 @@ public Sequence processSubqueryResult( GroupByQuery subquery, GroupByQuery query, GroupByQueryResources resource, + ResponseContext context, Sequence subqueryResult, - boolean wasQueryPushedDown, - GroupByStatsProvider.PerQueryStats perQueryStats + boolean wasQueryPushedDown ) { // Keep a reference to resultSupplier outside the "try" so we can close it if something goes wrong @@ -621,10 +617,10 @@ public Sequence processSubqueryResult( configSupplier.get(), processingConfig, resource, + context, spillMapper, processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes(), - perQueryStats + processingConfig.intermediateComputeSizeBytes() ); final GroupByRowProcessor.ResultSupplier finalResultSupplier = resultSupplier; @@ -648,6 +644,7 @@ public Sequence processSubqueryResult( * @param query query that has a "subtotalsSpec" * @param resource resources returned by {@link #prepareResource(GroupByQuery, BlockingPool, boolean, GroupByQueryConfig)} * @param queryResult result rows from the main query + * @param context response context for collating query metrics * * @return results for each list of subtotals in the query, concatenated together */ @@ -655,7 +652,7 @@ public Sequence processSubtotalsSpec( GroupByQuery query, GroupByQueryResources resource, Sequence queryResult, - GroupByStatsProvider.PerQueryStats perQueryStats + ResponseContext context ) { // How it works? @@ -704,10 +701,10 @@ public Sequence processSubtotalsSpec( configSupplier.get(), processingConfig, resource, + context, spillMapper, processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes(), - perQueryStats + processingConfig.intermediateComputeSizeBytes() ); List queryDimNamesInOrder = baseSubtotalQuery.getDimensionNamesInOrder(); @@ -743,8 +740,7 @@ public Sequence processSubtotalsSpec( subtotalQueryLimitSpec = baseSubtotalQuery.getLimitSpec().filterColumns(columns); } - GroupByQuery subtotalQuery = baseSubtotalQuery - .withLimitSpec(subtotalQueryLimitSpec); + GroupByQuery subtotalQuery = baseSubtotalQuery.withLimitSpec(subtotalQueryLimitSpec); final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal = resultSupplierOne; if (Utils.isPrefix(subtotalSpec, queryDimNamesInOrder)) { @@ -767,10 +763,10 @@ public Sequence processSubtotalsSpec( configSupplier.get(), processingConfig, resource, + context, spillMapper, processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes(), - perQueryStats + processingConfig.intermediateComputeSizeBytes() ); subtotalsResults.add( @@ -849,7 +845,7 @@ private void moveOrReplicateTimestampInRow( private Set getAggregatorAndPostAggregatorNames(GroupByQuery query) { - Set aggsAndPostAggs = new HashSet(); + Set aggsAndPostAggs = new HashSet<>(); if (query.getAggregatorSpecs() != null) { for (AggregatorFactory af : query.getAggregatorSpecs()) { aggsAndPostAggs.add(af.getName()); @@ -1005,10 +1001,7 @@ public static boolean summaryRowPreconditions(GroupByQuery query) if (!query.getDimensions().isEmpty() || query.hasDroppedDimensions()) { return false; } - if (query.getGranularity().isFinerThan(Granularities.ALL)) { - return false; - } - return true; + return !query.getGranularity().isFinerThan(Granularities.ALL); } private static Iterator summaryRowIterator(GroupByQuery q) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 8242c9d8cf5c..64c68a30b935 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -38,7 +38,6 @@ import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.GroupByQueryConfig; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.segment.ColumnSelectorFactory; @@ -49,6 +48,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -95,7 +95,6 @@ public class ConcurrentGrouper implements Grouper @Nullable private final ParallelCombiner parallelCombiner; private final boolean mergeThreadLocal; - private final GroupByStatsProvider.PerQueryStats perQueryStats; private volatile boolean initialized = false; @@ -115,8 +114,7 @@ public ConcurrentGrouper( final ListeningExecutorService executor, final int priority, final boolean hasQueryTimeout, - final long queryTimeoutAt, - final GroupByStatsProvider.PerQueryStats perQueryStats + final long queryTimeoutAt ) { this( @@ -140,8 +138,7 @@ public ConcurrentGrouper( queryTimeoutAt, groupByQueryConfig.getIntermediateCombineDegree(), groupByQueryConfig.getNumParallelCombineThreads(), - groupByQueryConfig.isMergeThreadLocal(), - perQueryStats + groupByQueryConfig.isMergeThreadLocal() ); } @@ -166,8 +163,7 @@ public ConcurrentGrouper( final long queryTimeoutAt, final int intermediateCombineDegree, final int numParallelCombineThreads, - final boolean mergeThreadLocal, - final GroupByStatsProvider.PerQueryStats perQueryStats + final boolean mergeThreadLocal ) { Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0"); @@ -217,7 +213,6 @@ public ConcurrentGrouper( } this.mergeThreadLocal = mergeThreadLocal; - this.perQueryStats = perQueryStats; } @Override @@ -231,23 +226,7 @@ public void init() for (int i = 0; i < concurrencyHint; i++) { final ByteBuffer slice = Groupers.getSlice(buffer, sliceSize, i); - final SpillingGrouper grouper = new SpillingGrouper<>( - Suppliers.ofInstance(slice), - keySerdeFactory, - columnSelectorFactory, - aggregatorFactories, - bufferGrouperMaxSize, - bufferGrouperMaxLoadFactor, - bufferGrouperInitialBuckets, - temporaryStorage, - spillMapper, - false, - limitSpec, - sortHasNonGroupingFields, - sliceSize, - perQueryStats - ); - grouper.init(); + final SpillingGrouper grouper = generateSpillingGrouperWithBufferSlice(slice, sliceSize); groupers.add(grouper); if (mergeThreadLocal) { @@ -261,6 +240,27 @@ public void init() } } + private SpillingGrouper generateSpillingGrouperWithBufferSlice(ByteBuffer slice, int sliceSize) + { + final SpillingGrouper grouper = new SpillingGrouper<>( + Suppliers.ofInstance(slice), + keySerdeFactory, + columnSelectorFactory, + aggregatorFactories, + bufferGrouperMaxSize, + bufferGrouperMaxLoadFactor, + bufferGrouperInitialBuckets, + temporaryStorage, + spillMapper, + false, + limitSpec, + sortHasNonGroupingFields, + sliceSize + ); + grouper.init(); + return grouper; + } + @Override public boolean isInitialized() { @@ -491,12 +491,20 @@ private List tryMergeDictionary() return ImmutableList.copyOf(mergedDictionary); } + @Override + public Map getQueryMetricsMap() + { + return groupers.stream() + .flatMap(grouper -> grouper.getQueryMetricsMap().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::sum)); + } + @Override public void close() { if (!closed) { closed = true; - groupers.forEach(Grouper::close); + groupers.forEach(SpillingGrouper::close); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java index 43bec66de7c5..8c4c672b289f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java @@ -59,7 +59,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryResources; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; +import org.apache.druid.query.groupby.GroupByResponseContextKeys; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; @@ -68,6 +68,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -104,7 +105,6 @@ public class GroupByMergingQueryRunner implements QueryRunner private final ObjectMapper spillMapper; private final String processingTmpDir; private final int mergeBufferSize; - private final GroupByStatsProvider groupByStatsProvider; public GroupByMergingQueryRunner( GroupByQueryConfig config, @@ -116,8 +116,7 @@ public GroupByMergingQueryRunner( int concurrencyHint, int mergeBufferSize, ObjectMapper spillMapper, - String processingTmpDir, - GroupByStatsProvider groupByStatsProvider + String processingTmpDir ) { this.config = config; @@ -130,7 +129,6 @@ public GroupByMergingQueryRunner( this.spillMapper = spillMapper; this.processingTmpDir = processingTmpDir; this.mergeBufferSize = mergeBufferSize; - this.groupByStatsProvider = groupByStatsProvider; } @Override @@ -167,9 +165,6 @@ public Sequence run(final QueryPlus queryPlus, final Respo StringUtils.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - GroupByStatsProvider.PerQueryStats perQueryStats = - groupByStatsProvider.getPerQueryStatsContainer(query.context().getQueryResourceId()); - final int priority = queryContext.getPriority(); // Figure out timeoutAt time now, so we can apply the timeout to both the mergeBufferPool.take and the actual @@ -191,8 +186,7 @@ public CloseableGrouperIterator make() try { final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( temporaryStorageDirectory, - querySpecificConfig.getMaxOnDiskStorage().getBytes(), - perQueryStats + querySpecificConfig.getMaxOnDiskStorage().getBytes() ); final ReferenceCountingResourceHolder temporaryStorageHolder = @@ -226,8 +220,7 @@ public CloseableGrouperIterator make() priority, hasTimeout, timeoutAt, - mergeBufferSize, - perQueryStats + mergeBufferSize ); final Grouper grouper = pair.lhs; final Accumulator accumulator = pair.rhs; @@ -306,11 +299,16 @@ public AggregateResult call() waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis()); } - return RowBasedGrouperHelper.makeGrouperIterator( - grouper, - query, - resources - ); + // Finished the query, so let's collate the metrics! + if (responseContext != null) { + responseContext.add(GroupByResponseContextKeys.GROUPBY_BYTES_SPILLED_TO_STORAGE_KEY, temporaryStorage.currentSize()); + + Map metricsMap = grouper.getQueryMetricsMap(); + responseContext.add(GroupByResponseContextKeys.GROUPBY_MERGE_DICTIONARY_SIZE_KEY, + metricsMap.getOrDefault(GroupByResponseContextKeys.GROUPBY_MERGE_DICTIONARY_SIZE_NAME, 0L)); + } + + return RowBasedGrouperHelper.makeGrouperIterator(grouper, query, resources); } catch (Throwable t) { // Exception caught while setting up the iterator; release resources. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index e2ca5c7e83b1..38020284654a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -20,6 +20,7 @@ package org.apache.druid.query.groupby.epinephelinae; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.Pair; @@ -30,11 +31,12 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryResources; -import org.apache.druid.query.groupby.GroupByStatsProvider; +import org.apache.druid.query.groupby.GroupByResponseContextKeys; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; @@ -44,6 +46,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.UUID; /** @@ -92,10 +95,10 @@ public static ResultSupplier process( final GroupByQueryConfig config, final DruidProcessingConfig processingConfig, final GroupByQueryResources resource, + final ResponseContext context, final ObjectMapper spillMapper, final String processingTmpDir, - final int mergeBufferSize, - final GroupByStatsProvider.PerQueryStats perQueryStats + final int mergeBufferSize ) { final Closer closeOnExit = Closer.create(); @@ -108,8 +111,7 @@ public static ResultSupplier process( final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( temporaryStorageDirectory, - querySpecificConfig.getMaxOnDiskStorage().getBytes(), - perQueryStats + querySpecificConfig.getMaxOnDiskStorage().getBytes() ); closeOnExit.register(temporaryStorage); @@ -131,10 +133,10 @@ public ByteBuffer get() }, temporaryStorage, spillMapper, - mergeBufferSize, - perQueryStats + mergeBufferSize ); final Grouper grouper = pair.lhs; + Preconditions.checkNotNull(grouper); final Accumulator accumulator = pair.rhs; closeOnExit.register(grouper); @@ -144,6 +146,14 @@ public ByteBuffer get() throw new ResourceLimitExceededException(retVal.getReason()); } + if (context != null) { + context.add(GroupByResponseContextKeys.GROUPBY_BYTES_SPILLED_TO_STORAGE_KEY, temporaryStorage.currentSize()); + + Map metricsMap = grouper.getQueryMetricsMap(); + context.add(GroupByResponseContextKeys.GROUPBY_MERGE_DICTIONARY_SIZE_KEY, + metricsMap.getOrDefault(GroupByResponseContextKeys.GROUPBY_MERGE_DICTIONARY_SIZE_NAME, 0L)); + } + return new ResultSupplier() { @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java index 9ab8f738791e..b77bf777fe62 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -29,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.function.ToIntFunction; /** @@ -93,6 +95,15 @@ default ToIntFunction hashFunction() return Groupers::hashObject; } + /** + * Update the results of the GroupByQueryMetrics. + * Currently only used by {@link ConcurrentGrouper} and {@link SpillingGrouper} + */ + default Map getQueryMetricsMap() + { + return ImmutableMap.of(); + } + /** * Close the grouper and release associated resources. */ diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java index 23bc2706a2da..a283f1e1bee4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java @@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.groupby.GroupByStatsProvider; import java.io.Closeable; import java.io.File; @@ -48,10 +47,8 @@ public class LimitedTemporaryStorage implements Closeable { private static final Logger log = new Logger(LimitedTemporaryStorage.class); - private final GroupByStatsProvider.PerQueryStats perQueryStatsContainer; - private final File storageDirectory; - private final long maxBytesUsed; + private final long capacity; private final AtomicLong bytesUsed = new AtomicLong(); private final Set files = new TreeSet<>(); @@ -62,13 +59,11 @@ public class LimitedTemporaryStorage implements Closeable public LimitedTemporaryStorage( File storageDirectory, - long maxBytesUsed, - GroupByStatsProvider.PerQueryStats perQueryStatsContainer + long capacity ) { this.storageDirectory = storageDirectory; - this.maxBytesUsed = maxBytesUsed; - this.perQueryStatsContainer = perQueryStatsContainer; + this.capacity = capacity; } /** @@ -82,8 +77,8 @@ public LimitedTemporaryStorage( */ public LimitedOutputStream createFile() throws IOException { - if (bytesUsed.get() >= maxBytesUsed) { - throw new TemporaryStorageFullException(maxBytesUsed); + if (bytesUsed.get() >= capacity) { + throw new TemporaryStorageFullException(capacity); } synchronized (files) { @@ -125,7 +120,7 @@ public void delete(final File file) public long maxSize() { - return maxBytesUsed; + return capacity; } @VisibleForTesting @@ -142,9 +137,6 @@ public void close() return; } closed = true; - - perQueryStatsContainer.spilledBytes(bytesUsed.get()); - bytesUsed.set(0); for (File file : ImmutableSet.copyOf(files)) { @@ -208,8 +200,8 @@ public File getFile() private void grab(int n) throws IOException { - if (bytesUsed.addAndGet(n) > maxBytesUsed) { - throw new TemporaryStorageFullException(maxBytesUsed); + if (bytesUsed.addAndGet(n) > capacity) { + throw new TemporaryStorageFullException(capacity); } } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 6ea439d7a90b..d5235b3920c4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -57,7 +57,6 @@ import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.Grouper.BufferComparator; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -133,8 +132,7 @@ public static Pair, Accumulator final Supplier bufferSupplier, final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, - final int mergeBufferSize, - final GroupByStatsProvider.PerQueryStats perQueryStats + final int mergeBufferSize ) { return createGrouperAccumulatorPair( @@ -151,8 +149,7 @@ public static Pair, Accumulator UNKNOWN_THREAD_PRIORITY, false, UNKNOWN_TIMEOUT, - mergeBufferSize, - perQueryStats + mergeBufferSize ); } @@ -170,7 +167,7 @@ public static Pair, Accumulator * and dim filters) are respected, and its aggregators are used in standard (not combining) form. The input * ResultRows are assumed to be results originating from the provided "subquery". * - * @param query query that we are grouping for + * @param query the query that we are grouping for * @param subquery optional subquery that we are receiving results from (see combining vs. subquery * mode above) * @param config groupBy query config @@ -201,8 +198,7 @@ public static Pair, Accumulator final int priority, final boolean hasQueryTimeout, final long queryTimeoutAt, - final int mergeBufferSize, - final GroupByStatsProvider.PerQueryStats perQueryStats + final int mergeBufferSize ) { // concurrencyHint >= 1 for concurrent groupers, -1 for single-threaded @@ -281,8 +277,7 @@ public static Pair, Accumulator true, limitSpec, sortHasNonGroupingFields, - mergeBufferSize, - perQueryStats + mergeBufferSize ); } else { final Grouper.KeySerdeFactory combineKeySerdeFactory = new RowBasedKeySerdeFactory( @@ -311,8 +306,7 @@ public static Pair, Accumulator grouperSorter, priority, hasQueryTimeout, - queryTimeoutAt, - perQueryStats + queryTimeoutAt ); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index fadcfa02c95d..7afa58daec39 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -26,6 +26,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockOutputStream; @@ -37,7 +38,7 @@ import org.apache.druid.query.BaseQuery; import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.groupby.GroupByStatsProvider; +import org.apache.druid.query.groupby.GroupByResponseContextKeys; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.segment.ColumnSelectorFactory; @@ -53,6 +54,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -75,7 +77,6 @@ public class SpillingGrouper implements Grouper private final AggregatorFactory[] aggregatorFactories; private final Comparator> keyObjComparator; private final Comparator> defaultOrderKeyObjComparator; - private final GroupByStatsProvider.PerQueryStats perQueryStats; private final List files = new ArrayList<>(); private final List dictionaryFiles = new ArrayList<>(); @@ -97,8 +98,7 @@ public SpillingGrouper( final boolean spillingAllowed, final DefaultLimitSpec limitSpec, final boolean sortHasNonGroupingFields, - final int mergeBufferSize, - final GroupByStatsProvider.PerQueryStats perQueryStats + final int mergeBufferSize ) { this.keySerde = keySerdeFactory.factorize(); @@ -158,7 +158,6 @@ public SpillingGrouper( this.spillMapper = keySerde.decorateObjectMapper(spillMapper); this.spillingAllowed = spillingAllowed; this.sortHasNonGroupingFields = sortHasNonGroupingFields; - this.perQueryStats = perQueryStats; } @Override @@ -215,10 +214,17 @@ public void reset() deleteFiles(); } + @Override + public Map getQueryMetricsMap() + { + return ImmutableMap.of( + GroupByResponseContextKeys.GROUPBY_MERGE_DICTIONARY_SIZE_NAME, keySerde.getDictionarySize() + ); + } + @Override public void close() { - perQueryStats.dictionarySize(keySerde.getDictionarySize()); grouper.close(); keySerde.reset(); deleteFiles(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index 9e4706d44f58..7f223a2640b9 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -326,8 +326,6 @@ public String getFormatString() final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); - final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool( mergePool, config @@ -342,8 +340,7 @@ public String getFormatString() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER, - groupByStatsProvider + NOOP_QUERYWATCHER ); final GroupingEngine tooSmallEngine = new GroupingEngine( @@ -352,8 +349,7 @@ public String getFormatString() tooSmallGroupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER, - groupByStatsProvider + NOOP_QUERYWATCHER ); groupByFactory = new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java index df03020a8acf..5ab4ff05369f 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java @@ -578,7 +578,6 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPoolBroker = new GroupByResourcesReservationPool(mergePoolBroker, config); @@ -593,8 +592,7 @@ public String getFormatString() groupByResourcesReservationPoolBroker, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER, - groupByStatsProvider + NOOP_QUERYWATCHER ); final GroupingEngine groupingEngineHistorical = new GroupingEngine( druidProcessingConfig, @@ -602,8 +600,7 @@ public String getFormatString() groupByResourcesReservationPoolHistorical, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER, - groupByStatsProvider + NOOP_QUERYWATCHER ); final GroupingEngine groupingEngineHistorical2 = new GroupingEngine( druidProcessingConfig, @@ -611,8 +608,7 @@ public String getFormatString() groupByResourcesReservationPoolHistorical2, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER, - groupByStatsProvider + NOOP_QUERYWATCHER ); groupByFactoryBroker = new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java index bb0cb89d69dd..1039e40ccb9a 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java @@ -241,7 +241,6 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergePool, config); final GroupingEngine groupingEngine = new GroupingEngine( @@ -250,8 +249,7 @@ public String getFormatString() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER, - groupByStatsProvider + NOOP_QUERYWATCHER ); groupByFactory = new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java index 775a8c602bbe..77f116aa6e40 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -124,7 +124,6 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( ) { final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(MERGE_BUFFER_POOL, config); @@ -134,8 +133,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( groupByResourcesReservationPool, TestHelper.makeJsonMapper(), mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - groupByStatsProvider + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 80731a244c9a..2cb156edf826 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -1294,7 +1294,6 @@ public String getFormatString() bufferSupplier, processingConfig.getNumMergeBuffers() ); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergeBufferPool, queryConfig); final GroupingEngine groupingEngine = new GroupingEngine( @@ -1303,8 +1302,7 @@ public String getFormatString() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - groupByStatsProvider + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); final ObjectMapper mapper = TestHelper.makeJsonMapper(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 2e52da567b66..13475eef9e82 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -105,7 +105,6 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( ) { final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(MERGE_BUFFER_POOL, config); final GroupingEngine groupingEngine = new GroupingEngine( @@ -114,8 +113,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( groupByResourcesReservationPool, TestHelper.makeJsonMapper(), mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - groupByStatsProvider + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index f4e4d1dbd247..f83e78d7f44b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -398,8 +398,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( groupByResourcesReservationPool, mapper, mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - statsProvider + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, @@ -3551,10 +3550,11 @@ public void testGroupByWithFirstLast() makeRow(query, "2011-04-01", "market", "upfront", "first", 1447L, "last", 780L) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(factory, runner, query, emitter); TestHelper.assertExpectedObjects(expectedResults, results, "first-last-aggs"); - verifyGroupByMetricsForSmallBufferConfig(); + verifyGroupByMetricsForSmallBufferConfig(emitter); } @Test @@ -6148,10 +6148,11 @@ public void testDifferentGroupingSubqueryMultipleAggregatorsOnSameField() ) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(factory, runner, query, emitter); TestHelper.assertExpectedObjects(expectedResults, results, "subquery-multiple-aggs"); - verifyGroupByMetricsForSmallBufferConfig(); + verifyGroupByMetricsForSmallBufferConfig(emitter); } @Test @@ -6195,10 +6196,11 @@ public void testDifferentGroupingSubqueryWithFilter() makeRow(query, "2011-04-02", "idx", 2505.0) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(factory, runner, query, emitter); TestHelper.assertExpectedObjects(expectedResults, results, "subquery-filter"); - verifyGroupByMetricsForSmallBufferConfig(); + verifyGroupByMetricsForSmallBufferConfig(emitter); } @Test @@ -6920,10 +6922,11 @@ public boolean eval(ResultRow row) ); // Subqueries are handled by the ToolChest - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(factory, runner, query, emitter); TestHelper.assertExpectedObjects(expectedResults, results, "subquery-postaggs"); - verifyGroupByMetricsForSmallBufferConfig(); + verifyGroupByMetricsForSmallBufferConfig(emitter); } @Test @@ -12433,10 +12436,11 @@ public void testGroupByOnNullableDoubleNoLimitPushdown() makeRow(query, "2011-04-01", "nullable", 50.0, "rows", 6L) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(factory, runner, query, emitter); TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); - verifyGroupByMetricsForSmallBufferConfig(true); + verifyGroupByMetricsForSmallBufferConfig(emitter, true); } @Test @@ -12504,10 +12508,11 @@ public void testGroupByOnVirtualColumn() makeRow(query, "2011-04-01", "v", 19600000L, "rows", 6L, "twosum", 18L) ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(factory, runner, query, emitter); TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); - verifyGroupByMetricsForSmallBufferConfig(true); + verifyGroupByMetricsForSmallBufferConfig(emitter, true); } @Test @@ -13728,7 +13733,7 @@ private void cannotVectorize() } } - private void verifyGroupByMetricsForSmallBufferConfig(boolean skipMergeDictionaryMetric) + private void verifyGroupByMetricsForSmallBufferConfig(StubServiceEmitter emitter, boolean skipMergeDictionaryMetric) { if (!config.toString().equals(V2_SMALL_BUFFER_CONFIG.toString())) { return; @@ -13738,14 +13743,23 @@ private void verifyGroupByMetricsForSmallBufferConfig(boolean skipMergeDictionar Assert.assertTrue(aggregateStats.getSpilledBytes() > 0); Assert.assertEquals(1, aggregateStats.getMergeBufferQueries()); Assert.assertTrue(aggregateStats.getMergeBufferAcquisitionTimeNs() > 0); + if (!skipMergeDictionaryMetric) { Assert.assertTrue(aggregateStats.getMergeDictionarySize() > 0); + if (emitter != null) { + emitter.verifyEmitted("mergeDictionarySize", 1); + } + } + + if (emitter != null) { + emitter.verifyEmitted("query/wait/time", 1); + emitter.verifyEmitted("bytesSpilledToStorage", 1); + emitter.verifyEmitted("mergeBufferAcquisitionTime", 1); } } - private void verifyGroupByMetricsForSmallBufferConfig() + private void verifyGroupByMetricsForSmallBufferConfig(StubServiceEmitter emitter) { - verifyGroupByMetricsForSmallBufferConfig(false); + verifyGroupByMetricsForSmallBufferConfig(emitter, false); } } - diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java index 049b00ca1159..17db1b6d2902 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTestHelper.java @@ -66,20 +66,30 @@ public static Iterable runQueryWithEmitter( ServiceEmitter serviceEmitter ) { + QueryToolChest toolChest = factory.getToolchest(); MetricsEmittingQueryRunner metricsEmittingQueryRunner = new MetricsEmittingQueryRunner( serviceEmitter, - factory.getToolchest(), + toolChest, runner, - (obj, lng) -> {}, + MetricsEmittingQueryRunner.NOOP_METRIC_REPORTER, (metrics) -> {} ).withWaitMeasuredFromNow(); - QueryToolChest toolChest = factory.getToolchest(); - QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + + QueryRunner finalizeResultsQueryRunner = new FinalizeResultsQueryRunner<>( toolChest.mergeResults(toolChest.preMergeQueryDecoration(metricsEmittingQueryRunner)), toolChest ); + QueryRunner theRunner = new MetricsEmittingQueryRunner<>( + serviceEmitter, + toolChest, + finalizeResultsQueryRunner, + MetricsEmittingQueryRunner.NOOP_METRIC_REPORTER, + (queryMetrics -> { + }) + ); + return theRunner.run(QueryPlus.wrap(populateResourceId(query))).toList(); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java index 85c81151dd3a..67af4d758130 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java @@ -129,8 +129,7 @@ public boolean equals(Object o) groupByResourcesReservationPool.reserve( queryResourceId1, QUERY, - true, - new GroupByStatsProvider.PerQueryStats() + true ); reserveCalledByFirstThread.countDown(); try { @@ -173,8 +172,7 @@ public boolean equals(Object o) groupByResourcesReservationPool.reserve( queryResourceId2, QUERY, - true, - new GroupByStatsProvider.PerQueryStats() + true ); threadsCompleted.countDown(); }); @@ -209,8 +207,7 @@ public void testMultipleSimultaneousAllocationAttemptsFail() groupByResourcesReservationPool.reserve( queryResourceId, QUERY, - true, - new GroupByStatsProvider.PerQueryStats() + true ); Assert.assertThrows( @@ -218,8 +215,7 @@ public void testMultipleSimultaneousAllocationAttemptsFail() () -> groupByResourcesReservationPool.reserve( queryResourceId, QUERY, - true, - new GroupByStatsProvider.PerQueryStats() + true ) ); } @@ -235,8 +231,7 @@ public void testMultipleSequentialAllocationAttemptsSucceed() groupByResourcesReservationPool.reserve( queryResourceId, QUERY, - true, - new GroupByStatsProvider.PerQueryStats() + true ); GroupByQueryResources oldResources = groupByResourcesReservationPool.fetch(queryResourceId); @@ -247,8 +242,7 @@ public void testMultipleSequentialAllocationAttemptsSucceed() groupByResourcesReservationPool.reserve( queryResourceId, QUERY, - true, - new GroupByStatsProvider.PerQueryStats() + true ); GroupByQueryResources newResources = groupByResourcesReservationPool.fetch(queryResourceId); Assert.assertNotNull(newResources); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java index 565a5ab97bc3..a8600e5b1cb9 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java @@ -19,52 +19,53 @@ package org.apache.druid.query.groupby; -import org.apache.druid.query.QueryResourceId; import org.junit.Assert; import org.junit.Test; public class GroupByStatsProviderTest { @Test - public void testMetricCollection() + public void testAggregateStatsFromQueryMetrics() { GroupByStatsProvider statsProvider = new GroupByStatsProvider(); - QueryResourceId id1 = new QueryResourceId("q1"); - GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(id1); + DefaultGroupByQueryMetrics metricsWithAllStats = new DefaultGroupByQueryMetrics(); + metricsWithAllStats.mergeBufferAcquisitionTime(100L); + metricsWithAllStats.bytesSpilledToStorage(2_048L); + metricsWithAllStats.mergeDictionarySize(10L); - stats1.mergeBufferAcquisitionTime(300); - stats1.mergeBufferAcquisitionTime(400); - stats1.spilledBytes(200); - stats1.spilledBytes(400); - stats1.dictionarySize(100); - stats1.dictionarySize(200); + DefaultGroupByQueryMetrics metricsWithPartialStats = new DefaultGroupByQueryMetrics(); + metricsWithPartialStats.bytesSpilledToStorage(1_024L); + metricsWithPartialStats.mergeDictionarySize(5L); - QueryResourceId id2 = new QueryResourceId("q2"); - GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(id2); + statsProvider.aggregateStats(metricsWithAllStats); + statsProvider.aggregateStats(metricsWithPartialStats); - stats2.mergeBufferAcquisitionTime(500); - stats2.mergeBufferAcquisitionTime(600); - stats2.spilledBytes(400); - stats2.spilledBytes(600); - stats2.dictionarySize(300); - stats2.dictionarySize(400); + GroupByStatsProvider.AggregateStats stats = statsProvider.getStatsSince(); - GroupByStatsProvider.AggregateStats aggregateStats = statsProvider.getStatsSince(); - Assert.assertEquals(0L, aggregateStats.getMergeBufferQueries()); - Assert.assertEquals(0L, aggregateStats.getMergeBufferAcquisitionTimeNs()); - Assert.assertEquals(0L, aggregateStats.getSpilledQueries()); - Assert.assertEquals(0L, aggregateStats.getSpilledBytes()); - Assert.assertEquals(0L, aggregateStats.getMergeDictionarySize()); + Assert.assertEquals(1, stats.getMergeBufferQueries()); + Assert.assertEquals(100L, stats.getMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(2, stats.getSpilledQueries()); + Assert.assertEquals(3_072L, stats.getSpilledBytes()); + Assert.assertEquals(15L, stats.getMergeDictionarySize()); + } + + @Test + public void testGetStatsSinceResetsCounters() + { + GroupByStatsProvider statsProvider = new GroupByStatsProvider(); + + DefaultGroupByQueryMetrics metrics = new DefaultGroupByQueryMetrics(); + metrics.bytesSpilledToStorage(512L); + metrics.mergeDictionarySize(7L); + statsProvider.aggregateStats(metrics); - statsProvider.closeQuery(id1); - statsProvider.closeQuery(id2); + Assert.assertEquals(512L, statsProvider.getStatsSince().getSpilledBytes()); - aggregateStats = statsProvider.getStatsSince(); - Assert.assertEquals(2, aggregateStats.getMergeBufferQueries()); - Assert.assertEquals(1800L, aggregateStats.getMergeBufferAcquisitionTimeNs()); - Assert.assertEquals(2L, aggregateStats.getSpilledQueries()); - Assert.assertEquals(1600L, aggregateStats.getSpilledBytes()); - Assert.assertEquals(1000L, aggregateStats.getMergeDictionarySize()); + GroupByStatsProvider.AggregateStats reset = statsProvider.getStatsSince(); + Assert.assertEquals(0L, reset.getMergeBufferQueries()); + Assert.assertEquals(0L, reset.getSpilledQueries()); + Assert.assertEquals(0L, reset.getSpilledBytes()); + Assert.assertEquals(0L, reset.getMergeDictionarySize()); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java index 5b89ca1c1415..ea452f8750f6 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java @@ -287,7 +287,6 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergePool, config); final GroupByResourcesReservationPool groupByResourcesReservationPool2 = @@ -298,8 +297,7 @@ public String getFormatString() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER, - groupByStatsProvider + NOOP_QUERYWATCHER ); final GroupingEngine engine2 = new GroupingEngine( druidProcessingConfig, @@ -307,8 +305,7 @@ public String getFormatString() groupByResourcesReservationPool2, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER, - groupByStatsProvider + NOOP_QUERYWATCHER ); groupByFactory = new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java index b220f00b05a3..59b539de3caa 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java @@ -172,7 +172,6 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( ); } final Supplier configSupplier = Suppliers.ofInstance(config); - GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(bufferPools.getMergePool(), config); final GroupingEngine groupingEngine = new GroupingEngine( @@ -181,8 +180,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( groupByResourcesReservationPool, TestHelper.makeJsonMapper(), mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - groupByStatsProvider + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 50e01b0f60c7..5f2e2208a297 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -36,7 +36,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.dimension.DimensionSpec; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.epinephelinae.Grouper.BufferComparator; import org.apache.druid.query.groupby.epinephelinae.Grouper.Entry; import org.apache.druid.query.groupby.epinephelinae.Grouper.KeySerde; @@ -148,11 +147,9 @@ public ByteBuffer get() @Test() public void testAggregate() throws InterruptedException, ExecutionException, IOException { - GroupByStatsProvider.PerQueryStats perQueryStats = new GroupByStatsProvider.PerQueryStats(); final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( temporaryFolder.newFolder(), - 1024 * 1024, - perQueryStats + 1024 * 1024 ); final ListeningExecutorService service = MoreExecutors.listeningDecorator(exec); try { @@ -177,8 +174,7 @@ public void testAggregate() throws InterruptedException, ExecutionException, IOE 0, 4, parallelCombineThreads, - mergeThreadLocal, - perQueryStats + mergeThreadLocal ); closer.register(grouper); grouper.init(); @@ -197,7 +193,7 @@ public void testAggregate() throws InterruptedException, ExecutionException, IOE }); } - for (Future eachFuture : futures) { + for (Future eachFuture : futures) { eachFuture.get(); } @@ -231,7 +227,6 @@ public void testGrouperTimeout() throws Exception return; } - GroupByStatsProvider.PerQueryStats perQueryStats = new GroupByStatsProvider.PerQueryStats(); ListeningExecutorService service = MoreExecutors.listeningDecorator(exec); try { final ConcurrentGrouper grouper = new ConcurrentGrouper<>( @@ -244,7 +239,7 @@ public void testGrouperTimeout() throws Exception 1024, 0.7f, 1, - new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024, perQueryStats), + new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024), new DefaultObjectMapper(), concurrencyHint, null, @@ -255,8 +250,7 @@ public void testGrouperTimeout() throws Exception 1, 4, parallelCombineThreads, - mergeThreadLocal, - perQueryStats + mergeThreadLocal ); closer.register(grouper); grouper.init(); @@ -275,7 +269,7 @@ public void testGrouperTimeout() throws Exception }); } - for (Future eachFuture : futures) { + for (Future eachFuture : futures) { eachFuture.get(); } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index b19312384ad1..dce4d40aca8a 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -211,7 +211,7 @@ public void testFullOnSelect() stubServiceEmitter, TOOL_CHEST, runner, - (obj, lng) -> {}, + MetricsEmittingQueryRunner.NOOP_METRIC_REPORTER, (metrics) -> {} ).withWaitMeasuredFromNow(); Iterable results = metricsEmittingQueryRunner.run(QueryPlus.wrap(query)).toList(); diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 52cc5b149668..203eb4f0f1a0 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -227,7 +227,7 @@ public void testFullOnTimeseries() stubServiceEmitter, new TimeseriesQueryQueryToolChest(), runner, - (obj, lng) -> {}, + MetricsEmittingQueryRunner.NOOP_METRIC_REPORTER, (metrics) -> {} ).withWaitMeasuredFromNow(); Iterable> results = metricsEmittingQueryRunner.run(QueryPlus.wrap(query)).toList(); diff --git a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java index 6555921fafc8..95d4d79bab27 100644 --- a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java @@ -71,7 +71,6 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryMetrics; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -582,9 +581,7 @@ public CursorFactoryProjectionTest( resourcesReservationPool, TestHelper.makeJsonMapper(), TestHelper.makeSmileMapper(), - (query, future) -> { - }, - new GroupByStatsProvider() + (query, future) -> {} ); this.timeseriesEngine = new TimeseriesQueryEngine(nonBlockingPool); } @@ -2035,8 +2032,7 @@ private void testGroupByQuery( resourcesReservationPool.reserve( new QueryResourceId(String.valueOf(query.hashCode())), finalQuery, - true, - new GroupByStatsProvider.PerQueryStats() + true ); runner = groupingEngine.mergeRunners(DirectQueryProcessingPool.INSTANCE, List.of(runner)); } @@ -2241,6 +2237,10 @@ void assertProjection() private static class ExpectedProjectionGroupBy extends ExpectedProjectionQueryMetrics implements GroupByQueryMetrics { + private long mergeBufferAcquisitionTimeNs; + private long spilledBytes; + private long mergeDictionarySize; + private ExpectedProjectionGroupBy(@Nullable String expectedProjection) { super(expectedProjection); @@ -2249,25 +2249,57 @@ private ExpectedProjectionGroupBy(@Nullable String expectedProjection) @Override public void numDimensions(GroupByQuery query) { - } @Override public void numMetrics(GroupByQuery query) { - } @Override public void numComplexMetrics(GroupByQuery query) { - } @Override public void granularity(GroupByQuery query) { + } + + @Override + public void mergeBufferAcquisitionTime(long mergeBufferAcquisitionTime) + { + this.mergeBufferAcquisitionTimeNs += mergeBufferAcquisitionTime; + } + @Override + public void bytesSpilledToStorage(long bytesSpilledToStorage) + { + this.spilledBytes += bytesSpilledToStorage; + } + + @Override + public void mergeDictionarySize(long mergeDictionarySize) + { + this.mergeDictionarySize += mergeDictionarySize; + } + + @Override + public long getSpilledBytes() + { + return spilledBytes; + } + + @Override + public long getMergeDictionarySize() + { + return mergeDictionarySize; + } + + @Override + public long getMergeBufferAcquisitionTime() + { + return mergeBufferAcquisitionTimeNs; } } diff --git a/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java b/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java index 646bc97033fd..e3e097187b96 100644 --- a/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java @@ -35,7 +35,6 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.timeseries.TimeseriesQuery; @@ -93,7 +92,6 @@ public void setup() () -> ByteBuffer.allocate(50000), 4 ); - GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(); groupingEngine = new GroupingEngine( new DruidProcessingConfig(), GroupByQueryConfig::new, @@ -103,9 +101,7 @@ public void setup() ), TestHelper.makeJsonMapper(), TestHelper.makeSmileMapper(), - (query, future) -> { - }, - groupByStatsProvider + (query, future) -> {} ); this.cursorFactory = new CursorFactory() diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java index 9a447dd39e0c..1d3460a1680a 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java @@ -55,7 +55,6 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.topn.TopNQueryBuilder; @@ -175,9 +174,7 @@ public IncrementalIndexCursorFactoryTest(String indexType, boolean sortByDim) ), TestHelper.makeJsonMapper(), TestHelper.makeSmileMapper(), - (query, future) -> { - }, - new GroupByStatsProvider() + (query, future) -> {} ); topnQueryEngine = new TopNQueryEngine(nonBlockingPool); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 0fbff897711d..9d8283effffa 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -44,6 +44,7 @@ import org.apache.druid.query.DefaultQueryMetrics; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.FinalizeResultsQueryRunner; +import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; @@ -365,8 +366,9 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final // 1) Populate resource id to the query // 2) Merge results using the toolChest, finalize if necessary. - // 3) Measure CPU time of that operation. - // 4) Release all sink segment references. + // 3) Emit metrics if necessary. + // 4) Measure CPU time of that operation. + // 5) Release all sink segment references. return new ResourceIdPopulatingQueryRunner<>( QueryRunnerHelper.makeClosingQueryRunner( CPUTimeMetricQueryRunner.safeBuild( @@ -468,7 +470,7 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final * This class operates in two distinct modes based on whether {@link SinkMetricsEmittingQueryRunner#segmentId} is null or non-null. * When segmentId is non-null, it accumulates the metrics. When segmentId is null, it emits the accumulated metrics. *

- * This class is derived from {@link org.apache.druid.query.MetricsEmittingQueryRunner}. + * This class is derived from {@link MetricsEmittingQueryRunner}. */ private static class SinkMetricsEmittingQueryRunner implements QueryRunner { diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index fe9b052de9b9..ac6cd400d6af 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -48,6 +48,7 @@ import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; @@ -586,6 +587,15 @@ private QueryRunner decorateClusterRunner(Query query, QueryRunner .applyPreMergeDecoration() .mergeResults(false) .applyPostMergeDecoration() + .map(runner -> + new MetricsEmittingQueryRunner<>( + emitter, + toolChest, + runner, + MetricsEmittingQueryRunner.NOOP_METRIC_REPORTER, + queryMetrics -> {} + ) + ) .emitCPUTimeMetric(emitter) .postProcess( objectMapper.convertValue( diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index ce453f409dd4..f18cb6c198f9 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -27,6 +27,7 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.FluentQueryRunner; +import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -116,6 +117,15 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final .applyPreMergeDecoration() .mergeResults(true) .applyPostMergeDecoration() + .map(runner -> + new MetricsEmittingQueryRunner<>( + emitter, + queryRunnerFactory.getToolchest(), + runner, + MetricsEmittingQueryRunner.NOOP_METRIC_REPORTER, + queryMetrics -> {} + ) + ) .emitCPUTimeMetric(emitter, cpuAccumulator); } diff --git a/server/src/main/java/org/apache/druid/server/ServerManager.java b/server/src/main/java/org/apache/druid/server/ServerManager.java index b4a37dbeeedf..bc0b885d0ea5 100644 --- a/server/src/main/java/org/apache/druid/server/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/ServerManager.java @@ -649,16 +649,28 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) cpuTimeAccumulator, cacheKeyPrefix ); + + final QueryRunner finalizeResultsQueryRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(factory.mergeRunners(queryProcessingPool, queryRunners), true), + toolChest + ); + + final QueryRunner finalizedMetricsEmittingQueryRunner = new MetricsEmittingQueryRunner<>( + emitter, + toolChest, + finalizeResultsQueryRunner, + MetricsEmittingQueryRunner.NOOP_METRIC_REPORTER, + metrics -> {} + ); + final QueryRunner queryRunner = CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner<>( - toolChest.mergeResults(factory.mergeRunners(queryProcessingPool, queryRunners), true), - toolChest - ), + finalizedMetricsEmittingQueryRunner, toolChest, emitter, cpuTimeAccumulator, true ); + return queryRunner.run(queryPlus, responseContext).withBaggage(closer); } catch (Throwable t) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index 38ddba751b5d..bd7d72f44df6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -71,7 +71,6 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryMetricsFactory; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; @@ -961,7 +960,6 @@ private GroupByResourcesReservationPool makeGroupByResourcesReservationPool( private GroupingEngine makeGroupingEngine( final ObjectMapper mapper, final DruidProcessingConfig processingConfig, - final GroupByStatsProvider statsProvider, final GroupByQueryConfig config, final GroupByResourcesReservationPool groupByResourcesReservationPool ) @@ -973,8 +971,7 @@ private GroupingEngine makeGroupingEngine( groupByResourcesReservationPool, mapper, mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - statsProvider + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); } diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index 894619b9ea3c..d2e5609e260c 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -2215,14 +2215,32 @@ private Response postForSyncResponse(SqlQuery query, MockHttpServletRequest req) final Object explicitQueryId = query.getContext().get("queryId"); final Object explicitSqlQueryId = query.getContext().get("sqlQueryId"); + // Set up async context support in case the query execution path uses async processing. + // This is necessary because MetricsEmittingQueryRunner wraps queries in LazySequence, + // which defers execution until the sequence is consumed (after startAsync() is called). + MockHttpServletResponse asyncResponse = MockHttpServletResponse.forRequest(req); final Response response = resource.doPost(query, req); - final Object actualQueryId = getHeader(response, QueryResource.QUERY_ID_RESPONSE_HEADER); - final Object actualSqlQueryId = getHeader(response, SqlResource.SQL_QUERY_ID_RESPONSE_HEADER); - - validateQueryIds(explicitQueryId, explicitSqlQueryId, actualQueryId, actualSqlQueryId); - - return response; + if (response != null) { + // Sync response path - error happened before async processing started + final Object actualQueryId = getHeader(response, QueryResource.QUERY_ID_RESPONSE_HEADER); + final Object actualSqlQueryId = getHeader(response, SqlResource.SQL_QUERY_ID_RESPONSE_HEADER); + validateQueryIds(explicitQueryId, explicitSqlQueryId, actualQueryId, actualSqlQueryId); + return response; + } else { + // Async response path - need to construct a Response from the async response + final Object actualQueryId = asyncResponse.getHeader(QueryResource.QUERY_ID_RESPONSE_HEADER); + final Object actualSqlQueryId = asyncResponse.getHeader(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER); + validateQueryIds(explicitQueryId, explicitSqlQueryId, actualQueryId, actualSqlQueryId); + + Response.ResponseBuilder responseBuilder = Response.status(asyncResponse.getStatus()); + for (String headerName : asyncResponse.getHeaderNames()) { + responseBuilder.header(headerName, asyncResponse.getHeader(headerName)); + } + final byte[] responseBytes = asyncResponse.baos.toByteArray(); + responseBuilder.entity((StreamingOutput) output -> output.write(responseBytes)); + return responseBuilder.build(); + } } private ErrorResponse postSyncForException(String s, int expectedStatus) throws IOException