Skip to content

KAFKA-19139 Plugin#wrapInstance should use LinkedHashMap instead of Map #19519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static class MonitorableCustomQuotaCallback extends CustomQuotaCallback i

@Override
public void withPluginMetrics(PluginMetrics metrics) {
MetricName metricName = metrics.metricName(METRIC_NAME, METRIC_DESCRIPTION, Map.of());
MetricName metricName = metrics.metricName(METRIC_NAME, METRIC_DESCRIPTION, new LinkedHashMap<>());
metrics.addMetric(metricName, (Gauge<Integer>) (config, now) -> 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.kafka.common.MetricName;

import java.util.Map;
import java.util.LinkedHashMap;

/**
* This allows plugins to register metrics and sensors.
Expand All @@ -35,7 +35,7 @@ public interface PluginMetrics {
* @param tags Additional tags for the metric
* @throws IllegalArgumentException if any of the tag names collide with the default tags for the plugin
*/
MetricName metricName(String name, String description, Map<String, String> tags);
MetricName metricName(String name, String description, LinkedHashMap<String, String> tags);

/**
* Add a metric to monitor an object that implements {@link MetricValueProvider}. This metric won't be associated with any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public PluginMetricsImpl(Metrics metrics, Map<String, String> tags) {
}

@Override
public MetricName metricName(String name, String description, Map<String, String> tags) {
public MetricName metricName(String name, String description, LinkedHashMap<String, String> tags) {
if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
for (String tagName : tags.keySet()) {
if (this.tags.containsKey(tagName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3746,9 +3746,13 @@ private MetricName expectedMetricName(String clientId, String config, Class<?> c

private static final String NAME = "name";
private static final String DESCRIPTION = "description";
private static final Map<String, String> TAGS = Collections.singletonMap("k", "v");
private static final LinkedHashMap<String, String> TAGS = new LinkedHashMap<>();
private static final double VALUE = 123.0;

static {
TAGS.put("t1", "v1");
}

public static class MonitorableDeserializer extends MockDeserializer implements Monitorable {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,7 @@ public void testOverwriteAcksAndRetriesForIdempotentProducers() {

@Test
public void testAcksAndIdempotenceForIdempotentProducers() {
Properties baseProps = new Properties() {{
setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
setProperty(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}};
Properties baseProps = baseProperties();

Properties validProps = new Properties() {{
putAll(baseProps);
Expand Down Expand Up @@ -346,11 +339,7 @@ public void testAcksAndIdempotenceForIdempotentProducers() {

@Test
public void testRetriesAndIdempotenceForIdempotentProducers() {
Properties baseProps = new Properties() {{
setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}};
Properties baseProps = baseProperties();

Properties validProps = new Properties() {{
putAll(baseProps);
Expand Down Expand Up @@ -412,13 +401,18 @@ public void testRetriesAndIdempotenceForIdempotentProducers() {
"Must set retries to non-zero when using the transactional producer.");
}

@Test
public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
private static Properties baseProperties() {
Properties baseProps = new Properties() {{
setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}};
return baseProps;
}

@Test
public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
Properties baseProps = baseProperties();

Properties validProps = new Properties() {{
putAll(baseProps);
Expand Down Expand Up @@ -1535,7 +1529,7 @@ public void testMeasureAbortTransactionDuration() {
}

@Test
public void testCommitTransactionWithRecordTooLargeException() throws Exception {
public void testCommitTransactionWithRecordTooLargeException() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Expand Down Expand Up @@ -1565,7 +1559,7 @@ public void testCommitTransactionWithRecordTooLargeException() throws Exception
}

@Test
public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception {
public void testCommitTransactionWithMetadataTimeoutForMissingTopic() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
Expand Down Expand Up @@ -1602,7 +1596,7 @@ public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exc
}

@Test
public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() throws Exception {
public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
Expand Down Expand Up @@ -1639,7 +1633,7 @@ public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() thr
}

@Test
public void testCommitTransactionWithSendToInvalidTopic() throws Exception {
public void testCommitTransactionWithSendToInvalidTopic() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Expand Down Expand Up @@ -2075,7 +2069,7 @@ public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
}

@Test
public void testSendToInvalidTopic() throws Exception {
public void testSendToInvalidTopic() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
Expand Down Expand Up @@ -2881,9 +2875,13 @@ private MetricName expectedMetricName(String clientId, String config, Class<?> c

private static final String NAME = "name";
private static final String DESCRIPTION = "description";
private static final Map<String, String> TAGS = Collections.singletonMap("k", "v");
private static final LinkedHashMap<String, String> TAGS = new LinkedHashMap<>();
private static final double VALUE = 123.0;

static {
TAGS.put("t1", "v1");
}

public static class MonitorableSerializer extends MockSerializer implements Monitorable {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -129,7 +129,7 @@ public void testUsePluginMetricsAfterClose() throws Exception {
Plugin<SomeMonitorablePlugin> plugin = Plugin.wrapInstance(new SomeMonitorablePlugin(), METRICS, CONFIG);
PluginMetrics pluginMetrics = plugin.get().pluginMetrics;
plugin.close();
assertThrows(IllegalStateException.class, () -> pluginMetrics.metricName("", "", Collections.emptyMap()));
assertThrows(IllegalStateException.class, () -> pluginMetrics.metricName("", "", new LinkedHashMap<>()));
assertThrows(IllegalStateException.class, () -> pluginMetrics.addMetric(null, null));
assertThrows(IllegalStateException.class, () -> pluginMetrics.removeMetric(null));
assertThrows(IllegalStateException.class, () -> pluginMetrics.addSensor(""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

Expand All @@ -36,11 +35,15 @@

public class PluginMetricsImplTest {

private final Map<String, String> extraTags = Collections.singletonMap("my-tag", "my-value");
private static final LinkedHashMap<String, String> EXTRA_TAGS = new LinkedHashMap<>();
private Map<String, String> tags;
private Metrics metrics;
private int initialMetrics;

static {
EXTRA_TAGS.put("my-tag", "my-value");
}

@BeforeEach
void setup() {
metrics = new Metrics();
Expand All @@ -53,26 +56,28 @@ void setup() {
@Test
void testMetricName() {
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
MetricName metricName = pmi.metricName("name", "description", extraTags);
MetricName metricName = pmi.metricName("name", "description", EXTRA_TAGS);
assertEquals("name", metricName.name());
assertEquals("plugins", metricName.group());
assertEquals("description", metricName.description());
Map<String, String> expectedTags = new LinkedHashMap<>(tags);
expectedTags.putAll(extraTags);
expectedTags.putAll(EXTRA_TAGS);
assertEquals(expectedTags, metricName.tags());
}

@Test
void testDuplicateTagName() {
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
LinkedHashMap<String, String> tags = new LinkedHashMap<>();
tags.put("k1", "value");
assertThrows(IllegalArgumentException.class,
() -> pmi.metricName("name", "description", Collections.singletonMap("k1", "value")));
() -> pmi.metricName("name", "description", tags));
}

@Test
void testAddRemoveMetrics() {
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
MetricName metricName = pmi.metricName("name", "description", extraTags);
MetricName metricName = pmi.metricName("name", "description", EXTRA_TAGS);
pmi.addMetric(metricName, (Measurable) (config, now) -> 0.0);
assertEquals(initialMetrics + 1, metrics.metrics().size());

Expand All @@ -88,7 +93,7 @@ void testAddRemoveMetrics() {
void testAddRemoveSensor() {
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
String sensorName = "my-sensor";
MetricName metricName = pmi.metricName("name", "description", extraTags);
MetricName metricName = pmi.metricName("name", "description", EXTRA_TAGS);
Sensor sensor = pmi.addSensor(sensorName);
assertEquals(initialMetrics, metrics.metrics().size());
sensor.add(metricName, new Rate());
Expand All @@ -107,10 +112,10 @@ void testAddRemoveSensor() {
void testClose() throws IOException {
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
String sensorName = "my-sensor";
MetricName metricName1 = pmi.metricName("name1", "description", extraTags);
MetricName metricName1 = pmi.metricName("name1", "description", EXTRA_TAGS);
Sensor sensor = pmi.addSensor(sensorName);
sensor.add(metricName1, new Rate());
MetricName metricName2 = pmi.metricName("name2", "description", extraTags);
MetricName metricName2 = pmi.metricName("name2", "description", EXTRA_TAGS);
pmi.addMetric(metricName2, (Measurable) (config, now) -> 1.0);

assertEquals(initialMetrics + 2, metrics.metrics().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;

public class MonitorableSinkConnector extends TestableSinkConnector {
Expand All @@ -35,7 +36,7 @@ public class MonitorableSinkConnector extends TestableSinkConnector {
public void start(Map<String, String> props) {
super.start(props);
PluginMetrics pluginMetrics = context.pluginMetrics();
metricsName = pluginMetrics.metricName("start", "description", Map.of());
metricsName = pluginMetrics.metricName("start", "description", new LinkedHashMap<>());
pluginMetrics.addMetric(metricsName, (Gauge<Object>) (config, now) -> VALUE);
}

Expand All @@ -53,7 +54,7 @@ public static class MonitorableSinkTask extends TestableSinkTask {
public void start(Map<String, String> props) {
super.start(props);
PluginMetrics pluginMetrics = context.pluginMetrics();
metricsName = pluginMetrics.metricName("put", "description", Map.of());
metricsName = pluginMetrics.metricName("put", "description", new LinkedHashMap<>());
pluginMetrics.addMetric(metricsName, (Measurable) (config, now) -> count);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -35,7 +36,7 @@ public class MonitorableSourceConnector extends TestableSourceConnector {
public void start(Map<String, String> props) {
super.start(props);
PluginMetrics pluginMetrics = context.pluginMetrics();
metricsName = pluginMetrics.metricName("start", "description", Map.of());
metricsName = pluginMetrics.metricName("start", "description", new LinkedHashMap<>());
pluginMetrics.addMetric(metricsName, (Gauge<Object>) (config, now) -> VALUE);
}

Expand All @@ -53,7 +54,7 @@ public static class MonitorableSourceTask extends TestableSourceTask {
public void start(Map<String, String> props) {
super.start(props);
PluginMetrics pluginMetrics = context.pluginMetrics();
metricsName = pluginMetrics.metricName("poll", "description", Map.of());
metricsName = pluginMetrics.metricName("poll", "description", new LinkedHashMap<>());
pluginMetrics.addMetric(metricsName, (Measurable) (config, now) -> count);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -799,7 +800,7 @@ protected boolean isAllowed(ConfigValue configValue) {

@Override
public void withPluginMetrics(PluginMetrics metrics) {
metricName = metrics.metricName("name", "description", Map.of());
metricName = metrics.metricName("name", "description", new LinkedHashMap<>());
metrics.addMetric(metricName, (Measurable) (config, now) -> count);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@ public class ConnectMetricsTest {
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter",
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
private static final ConnectorTaskId CONNECTOR_TASK_ID = new ConnectorTaskId("connector", 0);
private static final Map<String, String> TAGS = Map.of("t1", "v1");

private static final LinkedHashMap<String, String> TAGS = new LinkedHashMap<>();
private ConnectMetrics metrics;


static {
TAGS.put("t1", "v1");
}

@BeforeEach
public void setUp() {
metrics = new ConnectMetrics("worker1", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime(), "cluster-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -402,7 +403,7 @@ public void register(ConnectRestExtensionContext restPluginContext) {

@Override
public void withPluginMetrics(PluginMetrics metrics) {
metricName = metrics.metricName("name", "description", Map.of());
metricName = metrics.metricName("name", "description", new LinkedHashMap<>());
metrics.addMetric(metricName, (Gauge<Boolean>) (config, now) -> called);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -230,21 +231,21 @@ private class AuthorizerMetrics {
private AuthorizerMetrics(PluginMetrics metrics) {
authorizationAllowedSensor = metrics.addSensor("authorizer-authorization-allowed");
authorizationAllowedSensor.add(
metrics.metricName("authorization-allowed-rate-per-minute", "The number of authorization allowed per minute", Map.of()),
metrics.metricName("authorization-allowed-rate-per-minute", "The number of authorization allowed per minute", new LinkedHashMap<>()),
new Rate(TimeUnit.MINUTES, new WindowedCount()));

authorizationDeniedSensor = metrics.addSensor("authorizer-authorization-denied");
authorizationDeniedSensor.add(
metrics.metricName("authorization-denied-rate-per-minute", "The number of authorization denied per minute", Map.of()),
metrics.metricName("authorization-denied-rate-per-minute", "The number of authorization denied per minute", new LinkedHashMap<>()),
new Rate(TimeUnit.MINUTES, new WindowedCount()));

authorizationRequestSensor = metrics.addSensor("authorizer-authorization-request");
authorizationRequestSensor.add(
metrics.metricName("authorization-request-rate-per-minute", "The number of authorization request per minute", Map.of()),
metrics.metricName("authorization-request-rate-per-minute", "The number of authorization request per minute", new LinkedHashMap<>()),
new Rate(TimeUnit.MINUTES, new WindowedCount()));

metrics.addMetric(
metrics.metricName("acls-total-count", "The number of acls defined", Map.of()),
metrics.metricName("acls-total-count", "The number of acls defined", new LinkedHashMap<>()),
(Gauge<Integer>) (config, now) -> aclCount());
}

Expand Down
Loading