diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 0fbff897711d..2b5a6210b2c3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; @@ -90,7 +89,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.ObjLongConsumer; import java.util.stream.Collectors; /** @@ -107,13 +105,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME ); - private static final Map>> METRICS_TO_REPORT = - ImmutableMap.of( - DefaultQueryMetrics.QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime, - DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime, - DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime - ); - private final String dataSource; // Maintain a timeline of ids and Sinks for all the segments including the base and upgraded versions @@ -539,17 +530,11 @@ public void after(boolean isDone, Throwable thrown) for (Map.Entry segmentAndMetrics : segmentMetricsAccumulator.entrySet()) { queryMetrics.segment(segmentAndMetrics.getKey()); - for (Map.Entry>> reportMetric : METRICS_TO_REPORT.entrySet()) { - final String metricName = reportMetric.getKey(); - switch (metricName) { - case DefaultQueryMetrics.QUERY_SEGMENT_TIME: - reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentTime()); - case DefaultQueryMetrics.QUERY_WAIT_TIME: - reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getWaitTime()); - case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME: - reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentAndCacheTime()); - } - } + final SegmentMetrics segmentMetrics = segmentAndMetrics.getValue(); + + queryMetrics.reportSegmentTime(segmentMetrics.getSegmentTime()); + queryMetrics.reportWaitTime(segmentMetrics.getWaitTime()); + queryMetrics.reportSegmentAndCacheTime(segmentMetrics.getSegmentAndCacheTime()); try { queryMetrics.emit(emitter); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 6f9030178023..904d99b7cde0 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.query.DefaultQueryMetrics; import org.apache.druid.query.Druids; import org.apache.druid.query.Order; import org.apache.druid.query.QueryPlus; @@ -2296,14 +2297,14 @@ public void testQueryBySegments() throws Exception private void verifySinkMetrics(StubServiceEmitter emitter, Set segmentIds) { int segments = segmentIds.size(); - emitter.verifyEmitted("query/cpu/time", 1); - Assert.assertEquals(segments, emitter.getMetricEvents("query/segment/time").size()); - Assert.assertEquals(segments, emitter.getMetricEvents("query/segmentAndCache/time").size()); - Assert.assertEquals(segments, emitter.getMetricEvents("query/wait/time").size()); + emitter.verifyEmitted(DefaultQueryMetrics.QUERY_CPU_TIME, 1); + Assert.assertEquals(segments, emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_TIME).size()); + Assert.assertEquals(segments, emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME).size()); + Assert.assertEquals(segments, emitter.getMetricEvents(DefaultQueryMetrics.QUERY_WAIT_TIME).size()); for (String id : segmentIds) { - Assert.assertTrue(emitter.getMetricEvents("query/segment/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); - Assert.assertTrue(emitter.getMetricEvents("query/segmentAndCache/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); - Assert.assertTrue(emitter.getMetricEvents("query/wait/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_TIME).stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME).stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_WAIT_TIME).stream().anyMatch(value -> value.getUserDims().containsValue(id))); } }