Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import org.apache.http.util.EntityUtils;

/**
* Test throughput for Pinot.
Expand Down Expand Up @@ -90,8 +90,9 @@ public void run() {
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
while (System.currentTimeMillis() < endTime) {
long startTime = System.currentTimeMillis();
CloseableHttpResponse httpResponse = httpClient.execute(httpPosts[RANDOM.nextInt(numQueries)]);
httpResponse.close();
try (CloseableHttpResponse httpResponse = httpClient.execute(httpPosts[RANDOM.nextInt(numQueries)])) {
EntityUtils.consume(httpResponse.getEntity());
}
long responseTime = System.currentTimeMillis() - startTime;
counter.getAndIncrement();
totalResponseTime.getAndAdd(responseTime);
Expand Down
5 changes: 5 additions & 0 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.metrics;

import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.annotations.VisibleForTesting;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
Expand All @@ -31,7 +32,7 @@
import org.apache.pinot.common.request.BrokerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.Caffeine;

/**
* Common code for metrics implementations.
Expand All @@ -40,6 +41,7 @@
public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M extends AbstractMetrics.Meter, G extends AbstractMetrics.Gauge, T extends AbstractMetrics.Timer> {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMetrics.class);
private static final int METRICS_NAME_CACHE_SIZE = 1_000;

protected final String _metricPrefix;

Expand All @@ -51,6 +53,8 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e

protected final boolean _global;

private final Cache<String, MetricName> _metricNameCache;

public AbstractMetrics(String metricPrefix, MetricsRegistry metricsRegistry, Class clazz) {
this(metricPrefix, metricsRegistry, clazz, false);
}
Expand All @@ -60,6 +64,7 @@ public AbstractMetrics(String metricPrefix, MetricsRegistry metricsRegistry, Cla
_metricsRegistry = metricsRegistry;
_clazz = clazz;
_global = global;
_metricNameCache = Caffeine.newBuilder().maximumSize(METRICS_NAME_CACHE_SIZE).build();
}

public interface QueryPhase {
Expand Down Expand Up @@ -88,6 +93,10 @@ public interface Timer {
boolean isGlobal();
}

MetricName getMetricName(String name) {
return _metricNameCache.get(name, n -> new MetricName(_clazz, n));
}

/**
* Logs the timing of a query phase.
*
Expand Down Expand Up @@ -143,9 +152,7 @@ public void addTimedValue(T timer, final long duration, final TimeUnit timeUnit)
* @param timeUnit The log time duration time unit
*/
private void addValueToTimer(String fullTimerName, final long duration, final TimeUnit timeUnit) {
final MetricName metricName = new MetricName(_clazz, fullTimerName);
com.yammer.metrics.core.Timer timer =
MetricsHelper.newTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
final MetricName metricName = getMetricName(fullTimerName);
MetricsHelper.newTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
.update(duration, timeUnit);
}
Expand Down Expand Up @@ -212,7 +219,7 @@ public com.yammer.metrics.core.Meter addMeteredGlobalValue(final M meter, final
final String fullMeterName;
String meterName = meter.getMeterName();
fullMeterName = _metricPrefix + meterName;
final MetricName metricName = new MetricName(_clazz, fullMeterName);
final MetricName metricName = getMetricName(fullMeterName);

final com.yammer.metrics.core.Meter newMeter =
MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
Expand Down Expand Up @@ -248,7 +255,7 @@ public com.yammer.metrics.core.Meter addMeteredTableValue(final String tableName
final String fullMeterName;
String meterName = meter.getMeterName();
fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName;
final MetricName metricName = new MetricName(_clazz, fullMeterName);
final MetricName metricName = getMetricName(fullMeterName);

final com.yammer.metrics.core.Meter newMeter =
MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
Expand All @@ -261,7 +268,7 @@ public com.yammer.metrics.core.Meter getMeteredTableValue(final String tableName
final String fullMeterName;
String meterName = meter.getMeterName();
fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName;
final MetricName metricName = new MetricName(_clazz, fullMeterName);
final MetricName metricName = getMetricName(fullMeterName);

return MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
}
Expand All @@ -281,7 +288,7 @@ public void addMeteredQueryValue(final BrokerRequest request, final M meter, fin
} else {
fullMeterName = _metricPrefix + meterName;
}
final MetricName metricName = new MetricName(_clazz, fullMeterName);
final MetricName metricName = getMetricName(fullMeterName);
MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS).mark(unitCount);
}

Expand Down Expand Up @@ -468,7 +475,7 @@ public void initializeGlobalMeters() {
* @param valueCallback The callback function used to retrieve the value of the gauge
*/
public void addCallbackGauge(final String metricName, final Callable<Long> valueCallback) {
MetricsHelper.newGauge(_metricsRegistry, new MetricName(_clazz, _metricPrefix + metricName),
MetricsHelper.newGauge(_metricsRegistry, getMetricName(_metricPrefix + metricName),
new com.yammer.metrics.core.Gauge<Long>() {
@Override
public Long value() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pinot.common.metrics;

import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.*;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import org.testng.annotations.Test;

import static junit.framework.Assert.assertNotNull;
import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_METRICS_PREFIX;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;


Expand Down Expand Up @@ -74,4 +78,79 @@ public void testMetricsHelperRegistration() {
assertTrue(listenerOneOkay);
assertTrue(listenerTwoOkay);
}

@Test
public void testMetricNameGetHelper() {
final String METRIC_NAME = "dummy";

MetricName newlyAllocated = new MetricName(ServerMetrics.class, METRIC_NAME);
AbstractMetrics metrics = new ServerMetrics(new MetricsRegistry());
MetricName fromHelper = metrics.getMetricName(METRIC_NAME);
// Direct metrics creation must match what the helper does
assertEquals(fromHelper, newlyAllocated);
// Make sure we have a different object here, we only used the helper once
assertTrue(newlyAllocated != fromHelper);
MetricName fromHelperCached = metrics.getMetricName(METRIC_NAME);
// Identity test has to work because we are caching the names
assertTrue(fromHelper == fromHelperCached);
}

@Test
public void testMetricsApisTiming() {
final String TABLE_NAME = "customers";

MetricsRegistry registry = new MetricsRegistry();
AbstractMetrics metrics = new ServerMetrics(registry);

metrics.addPhaseTiming(TABLE_NAME, BrokerQueryPhase.QUERY_EXECUTION, 10);
metrics.addPhaseTiming(TABLE_NAME, BrokerQueryPhase.QUERY_EXECUTION, 15);

Map<MetricName, Metric> allMetrics = registry.allMetrics();
assertTrue(allMetrics.size() == 1);
Timer timer = (Timer)allMetrics.values().iterator().next();
// The sum should be the two query executions added together
assertEquals(timer.sum(), 25.0/1_000_000);
}

@Test
public void testMetricsApisMeteredTableValues() {
final String TABLE_NAME = "customers";

MetricsRegistry registry = new MetricsRegistry();
AbstractMetrics metrics = new ServerMetrics(registry);

metrics.addMeteredTableValue(TABLE_NAME, ServerMeter.NUM_SEGMENTS_MATCHED, 10);
Meter meter = metrics.getMeteredTableValue(TABLE_NAME, ServerMeter.NUM_SEGMENTS_MATCHED);

assertNotNull(meter);
}

@Test
public void testMetricsApisMeteredQueryValues() {
final String TABLE_NAME = "customers";

MetricsRegistry registry = new MetricsRegistry();
AbstractMetrics metrics = new ServerMetrics(registry);

metrics.addMeteredQueryValue(null, ServerMeter.NUM_SEGMENTS_MATCHED, 10);

Map<MetricName, Metric> allMetrics = registry.allMetrics();
assertTrue(allMetrics.size() == 1);
}

@Test
public void testMetricsCallBackGauge() {
final String METRIC_NAME = "test";

MetricsRegistry registry = new MetricsRegistry();
AbstractMetrics metrics = new ServerMetrics(registry);

metrics.addCallbackGauge(METRIC_NAME, () -> ServerGauge.DOCUMENT_COUNT);

Map<MetricName, Metric> allMetrics = registry.allMetrics();
assertTrue(allMetrics.size() == 1);
MetricName fromHelper = metrics.getMetricName(DEFAULT_METRICS_PREFIX + METRIC_NAME);
// Identity test has to work because we are caching the names
assertTrue(fromHelper == allMetrics.keySet().iterator().next());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
private void addInstanceGroupTagIfNeeded() {
InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId);
assert instanceConfig != null;
if (!instanceConfig.containsTag(Helix.CONTROLLER_INSTANCE)) {
if (instanceConfig != null && !instanceConfig.containsTag(Helix.CONTROLLER_INSTANCE)) {
LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", _instanceId, Helix.CONTROLLER_INSTANCE);
instanceConfig.addTag(Helix.CONTROLLER_INSTANCE);
HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
Expand Down