diff --git a/contrib/pinot-druid-benchmark/src/main/java/org/apache/pinotdruidbenchmark/PinotThroughput.java b/contrib/pinot-druid-benchmark/src/main/java/org/apache/pinotdruidbenchmark/PinotThroughput.java index 1ac1bf255ba3..3d7c3ee8c038 100644 --- a/contrib/pinot-druid-benchmark/src/main/java/org/apache/pinotdruidbenchmark/PinotThroughput.java +++ b/contrib/pinot-druid-benchmark/src/main/java/org/apache/pinotdruidbenchmark/PinotThroughput.java @@ -33,7 +33,7 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; - +import org.apache.http.util.EntityUtils; /** * Test throughput for Pinot. @@ -90,8 +90,9 @@ public void run() { try (CloseableHttpClient httpClient = HttpClients.createDefault()) { while (System.currentTimeMillis() < endTime) { long startTime = System.currentTimeMillis(); - CloseableHttpResponse httpResponse = httpClient.execute(httpPosts[RANDOM.nextInt(numQueries)]); - httpResponse.close(); + try (CloseableHttpResponse httpResponse = httpClient.execute(httpPosts[RANDOM.nextInt(numQueries)])) { + EntityUtils.consume(httpResponse.getEntity()); + } long responseTime = System.currentTimeMillis() - startTime; counter.getAndIncrement(); totalResponseTime.getAndAdd(responseTime); diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml index 63132ea977c4..f89428caf1f3 100644 --- a/pinot-common/pom.xml +++ b/pinot-common/pom.xml @@ -248,6 +248,11 @@ org.glassfish.jersey.core jersey-server + + com.github.ben-manes.caffeine + caffeine + 2.8.0 + diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index bf4a853689d2..ba52e5b5b321 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.metrics; +import com.github.benmanes.caffeine.cache.Cache; import com.google.common.annotations.VisibleForTesting; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricsRegistry; @@ -31,7 +32,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.github.benmanes.caffeine.cache.Caffeine; /** * Common code for metrics implementations. @@ -40,6 +41,7 @@ public abstract class AbstractMetrics { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMetrics.class); + private static final int METRICS_NAME_CACHE_SIZE = 1_000; protected final String _metricPrefix; @@ -51,6 +53,8 @@ public abstract class AbstractMetrics _metricNameCache; + public AbstractMetrics(String metricPrefix, MetricsRegistry metricsRegistry, Class clazz) { this(metricPrefix, metricsRegistry, clazz, false); } @@ -60,6 +64,7 @@ public AbstractMetrics(String metricPrefix, MetricsRegistry metricsRegistry, Cla _metricsRegistry = metricsRegistry; _clazz = clazz; _global = global; + _metricNameCache = Caffeine.newBuilder().maximumSize(METRICS_NAME_CACHE_SIZE).build(); } public interface QueryPhase { @@ -88,6 +93,10 @@ public interface Timer { boolean isGlobal(); } + MetricName getMetricName(String name) { + return _metricNameCache.get(name, n -> new MetricName(_clazz, n)); + } + /** * Logs the timing of a query phase. * @@ -143,9 +152,7 @@ public void addTimedValue(T timer, final long duration, final TimeUnit timeUnit) * @param timeUnit The log time duration time unit */ private void addValueToTimer(String fullTimerName, final long duration, final TimeUnit timeUnit) { - final MetricName metricName = new MetricName(_clazz, fullTimerName); - com.yammer.metrics.core.Timer timer = - MetricsHelper.newTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + final MetricName metricName = getMetricName(fullTimerName); MetricsHelper.newTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS) .update(duration, timeUnit); } @@ -212,7 +219,7 @@ public com.yammer.metrics.core.Meter addMeteredGlobalValue(final M meter, final final String fullMeterName; String meterName = meter.getMeterName(); fullMeterName = _metricPrefix + meterName; - final MetricName metricName = new MetricName(_clazz, fullMeterName); + final MetricName metricName = getMetricName(fullMeterName); final com.yammer.metrics.core.Meter newMeter = MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS); @@ -248,7 +255,7 @@ public com.yammer.metrics.core.Meter addMeteredTableValue(final String tableName final String fullMeterName; String meterName = meter.getMeterName(); fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName; - final MetricName metricName = new MetricName(_clazz, fullMeterName); + final MetricName metricName = getMetricName(fullMeterName); final com.yammer.metrics.core.Meter newMeter = MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS); @@ -261,7 +268,7 @@ public com.yammer.metrics.core.Meter getMeteredTableValue(final String tableName final String fullMeterName; String meterName = meter.getMeterName(); fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName; - final MetricName metricName = new MetricName(_clazz, fullMeterName); + final MetricName metricName = getMetricName(fullMeterName); return MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS); } @@ -281,7 +288,7 @@ public void addMeteredQueryValue(final BrokerRequest request, final M meter, fin } else { fullMeterName = _metricPrefix + meterName; } - final MetricName metricName = new MetricName(_clazz, fullMeterName); + final MetricName metricName = getMetricName(fullMeterName); MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS).mark(unitCount); } @@ -468,7 +475,7 @@ public void initializeGlobalMeters() { * @param valueCallback The callback function used to retrieve the value of the gauge */ public void addCallbackGauge(final String metricName, final Callable valueCallback) { - MetricsHelper.newGauge(_metricsRegistry, new MetricName(_clazz, _metricPrefix + metricName), + MetricsHelper.newGauge(_metricsRegistry, getMetricName(_metricPrefix + metricName), new com.yammer.metrics.core.Gauge() { @Override public Long value() { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java index 3819cef6709d..0bad91b18b82 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java @@ -18,15 +18,19 @@ */ package org.apache.pinot.common.metrics; -import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.*; + import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; + import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import org.testng.annotations.Test; +import static junit.framework.Assert.assertNotNull; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_METRICS_PREFIX; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -74,4 +78,79 @@ public void testMetricsHelperRegistration() { assertTrue(listenerOneOkay); assertTrue(listenerTwoOkay); } + + @Test + public void testMetricNameGetHelper() { + final String METRIC_NAME = "dummy"; + + MetricName newlyAllocated = new MetricName(ServerMetrics.class, METRIC_NAME); + AbstractMetrics metrics = new ServerMetrics(new MetricsRegistry()); + MetricName fromHelper = metrics.getMetricName(METRIC_NAME); + // Direct metrics creation must match what the helper does + assertEquals(fromHelper, newlyAllocated); + // Make sure we have a different object here, we only used the helper once + assertTrue(newlyAllocated != fromHelper); + MetricName fromHelperCached = metrics.getMetricName(METRIC_NAME); + // Identity test has to work because we are caching the names + assertTrue(fromHelper == fromHelperCached); + } + + @Test + public void testMetricsApisTiming() { + final String TABLE_NAME = "customers"; + + MetricsRegistry registry = new MetricsRegistry(); + AbstractMetrics metrics = new ServerMetrics(registry); + + metrics.addPhaseTiming(TABLE_NAME, BrokerQueryPhase.QUERY_EXECUTION, 10); + metrics.addPhaseTiming(TABLE_NAME, BrokerQueryPhase.QUERY_EXECUTION, 15); + + Map allMetrics = registry.allMetrics(); + assertTrue(allMetrics.size() == 1); + Timer timer = (Timer)allMetrics.values().iterator().next(); + // The sum should be the two query executions added together + assertEquals(timer.sum(), 25.0/1_000_000); + } + + @Test + public void testMetricsApisMeteredTableValues() { + final String TABLE_NAME = "customers"; + + MetricsRegistry registry = new MetricsRegistry(); + AbstractMetrics metrics = new ServerMetrics(registry); + + metrics.addMeteredTableValue(TABLE_NAME, ServerMeter.NUM_SEGMENTS_MATCHED, 10); + Meter meter = metrics.getMeteredTableValue(TABLE_NAME, ServerMeter.NUM_SEGMENTS_MATCHED); + + assertNotNull(meter); + } + + @Test + public void testMetricsApisMeteredQueryValues() { + final String TABLE_NAME = "customers"; + + MetricsRegistry registry = new MetricsRegistry(); + AbstractMetrics metrics = new ServerMetrics(registry); + + metrics.addMeteredQueryValue(null, ServerMeter.NUM_SEGMENTS_MATCHED, 10); + + Map allMetrics = registry.allMetrics(); + assertTrue(allMetrics.size() == 1); + } + + @Test + public void testMetricsCallBackGauge() { + final String METRIC_NAME = "test"; + + MetricsRegistry registry = new MetricsRegistry(); + AbstractMetrics metrics = new ServerMetrics(registry); + + metrics.addCallbackGauge(METRIC_NAME, () -> ServerGauge.DOCUMENT_COUNT); + + Map allMetrics = registry.allMetrics(); + assertTrue(allMetrics.size() == 1); + MetricName fromHelper = metrics.getMetricName(DEFAULT_METRICS_PREFIX + METRIC_NAME); + // Identity test has to work because we are caching the names + assertTrue(fromHelper == allMetrics.keySet().iterator().next()); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1477396337cd..886c2d2a73a8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -259,7 +259,7 @@ public ZkHelixPropertyStore getPropertyStore() { private void addInstanceGroupTagIfNeeded() { InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId); assert instanceConfig != null; - if (!instanceConfig.containsTag(Helix.CONTROLLER_INSTANCE)) { + if (instanceConfig != null && !instanceConfig.containsTag(Helix.CONTROLLER_INSTANCE)) { LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", _instanceId, Helix.CONTROLLER_INSTANCE); instanceConfig.addTag(Helix.CONTROLLER_INSTANCE); HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();