Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.kafka:kafka-clients:0.11.0.0")
}

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-clients-cluster-metrics-0.11.0.0',
'Implementation-Title-Alias': 'kafka-clients-cluster-metrics' }
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:kafka-clients:[0.11.0.0,2.0.0)'
}

site {
title 'Kafka Clients Cluster Metrics'
type 'Kafka'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package com.nr.instrumentation.kafka;

import com.newrelic.api.agent.NewRelic;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.logging.Level;

public final class ClusterIdHelper {

private ClusterIdHelper() {}

public static String fromProducer(Object producer) {
return fromMetadataField(producer);
}

public static String fromConsumer(Object consumer) {
return fromMetadataField(consumer);
}

private static String fromMetadataField(Object obj) {
try {
Field metaField = findField(obj.getClass(), "metadata");
if (metaField == null) return null;
metaField.setAccessible(true);
Object meta = metaField.get(obj);
if (meta == null) return null;
Method fetchMethod = meta.getClass().getMethod("fetch");
Object cluster = fetchMethod.invoke(meta);
if (cluster == null) return null;
Method crMethod = cluster.getClass().getMethod("clusterResource");
Object cr = crMethod.invoke(cluster);
if (cr != null) {
Method clusterIdMethod = cr.getClass().getMethod("clusterId");
String id = (String) clusterIdMethod.invoke(cr);
if (id != null && !id.isEmpty()) {
return id;
}
}
} catch (Exception e) { NewRelic.getAgent().getLogger().log(Level.FINEST, e, "NR Kafka cluster ID fetch failed"); }
return null;
}

private static Field findField(Class<?> cls, String name) {
while (cls != null && cls != Object.class) {
try {
return cls.getDeclaredField(name);
} catch (NoSuchFieldException e) {
cls = cls.getSuperclass();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

public class Utils {
public static final String KAFKA_CLUSTER_METRIC_PREFIX = "MessageBroker/Kafka/Cluster/";
public static final String KAFKA_CLUSTER_TOPIC_SEGMENT = "/Topic/";
public static final String KAFKA_CLUSTER_PRODUCE_SUFFIX = "/Produce";
public static final String KAFKA_CLUSTER_CONSUME_SUFFIX = "/Consume";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.clients.consumer;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.ClusterIdHelper;
import com.nr.instrumentation.kafka.Utils;

@Weave(originalName = "org.apache.kafka.clients.consumer.KafkaConsumer")
public class KafkaConsumer_Instrumentation<K, V> {

@NewField
private volatile String nrClusterId;

public ConsumerRecords<K, V> poll(final long timeoutMs) {
final ConsumerRecords<K, V> records = Weaver.callOriginal();
if (nrClusterId == null) {
String id = ClusterIdHelper.fromConsumer(this);
if (id != null) {
nrClusterId = id;
}
}
if (records != null && !records.isEmpty() && nrClusterId != null) {
final String clusterId = nrClusterId;
final java.util.Map<String, Integer> topicCounts = new java.util.HashMap<>();
for (ConsumerRecord<?, ?> record : records) {
String topic = record.topic();
topicCounts.put(topic, topicCounts.getOrDefault(topic, 0) + 1);
}
for (java.util.Map.Entry<String, Integer> entry : topicCounts.entrySet()) {
NewRelic.getAgent().getMetricAggregator().recordMetric(
Utils.KAFKA_CLUSTER_METRIC_PREFIX + clusterId
+ Utils.KAFKA_CLUSTER_TOPIC_SEGMENT + entry.getKey()
+ Utils.KAFKA_CLUSTER_CONSUME_SUFFIX,
entry.getValue().floatValue());
}
}
return records;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.clients.producer;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.ClusterIdHelper;
import com.nr.instrumentation.kafka.Utils;
import java.util.concurrent.Future;

@Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer")
public class KafkaProducer_Instrumentation<K, V> {

@NewField
private volatile String nrClusterId;

private Future<RecordMetadata> doSend(ProducerRecord record, Callback callback) {
if (nrClusterId == null) {
String id = ClusterIdHelper.fromProducer(this);
if (id != null) {
nrClusterId = id;
}
}

final Future<RecordMetadata> result = Weaver.callOriginal();

if (nrClusterId != null) {
NewRelic.getAgent().getMetricAggregator().recordMetric(
Utils.KAFKA_CLUSTER_METRIC_PREFIX + nrClusterId
+ Utils.KAFKA_CLUSTER_TOPIC_SEGMENT + record.topic()
+ Utils.KAFKA_CLUSTER_PRODUCE_SUFFIX,
1.0f);
}

return result;
}
}
19 changes: 19 additions & 0 deletions instrumentation/kafka-clients-cluster-metrics-2.0.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.kafka:kafka-clients:2.0.0")
}

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-clients-cluster-metrics-2.0.0',
'Implementation-Title-Alias': 'kafka-clients-cluster-metrics' }
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:kafka-clients:[2.0.0,3.7.0)'
}

site {
title 'Kafka Clients Cluster Metrics'
type 'Kafka'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package com.nr.instrumentation.kafka;

import com.newrelic.api.agent.NewRelic;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.logging.Level;

public final class ClusterIdHelper {

private ClusterIdHelper() {}

public static String fromProducer(Object producer) {
return fromMetadataField(producer);
}

public static String fromConsumer(Object consumer) {
return fromMetadataField(consumer);
}

private static String fromMetadataField(Object obj) {
try {
Field metaField = findField(obj.getClass(), "metadata");
if (metaField == null) return null;
metaField.setAccessible(true);
Object meta = metaField.get(obj);
if (meta == null) return null;
Method fetchMethod = meta.getClass().getMethod("fetch");
Object cluster = fetchMethod.invoke(meta);
if (cluster == null) return null;
Method crMethod = cluster.getClass().getMethod("clusterResource");
Object cr = crMethod.invoke(cluster);
if (cr != null) {
Method clusterIdMethod = cr.getClass().getMethod("clusterId");
String id = (String) clusterIdMethod.invoke(cr);
if (id != null && !id.isEmpty()) {
return id;
}
}
} catch (Exception e) { NewRelic.getAgent().getLogger().log(Level.FINEST, e, "NR Kafka cluster ID fetch failed"); }
return null;
}

private static Field findField(Class<?> cls, String name) {
while (cls != null && cls != Object.class) {
try {
return cls.getDeclaredField(name);
} catch (NoSuchFieldException e) {
cls = cls.getSuperclass();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

public class Utils {
public static final String KAFKA_CLUSTER_METRIC_PREFIX = "MessageBroker/Kafka/Cluster/";
public static final String KAFKA_CLUSTER_TOPIC_SEGMENT = "/Topic/";
public static final String KAFKA_CLUSTER_PRODUCE_SUFFIX = "/Produce";
public static final String KAFKA_CLUSTER_CONSUME_SUFFIX = "/Consume";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.clients.consumer;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.ClusterIdHelper;
import com.nr.instrumentation.kafka.Utils;
import java.time.Duration;

@Weave(originalName = "org.apache.kafka.clients.consumer.KafkaConsumer")
public class KafkaConsumer_Instrumentation<K, V> {

@NewField
private volatile String nrClusterId;

public ConsumerRecords<K, V> poll(final Duration timeout) {
final ConsumerRecords<K, V> records = Weaver.callOriginal();
if (nrClusterId == null) {
String id = ClusterIdHelper.fromConsumer(this);
if (id != null) {
nrClusterId = id;
}
}
if (records != null && !records.isEmpty() && nrClusterId != null) {
nrRecordClusterMetrics(records, nrClusterId);
}
return records;
}

public ConsumerRecords<K, V> poll(final long timeoutMs) {
final ConsumerRecords<K, V> records = Weaver.callOriginal();
if (nrClusterId == null) {
String id = ClusterIdHelper.fromConsumer(this);
if (id != null) {
nrClusterId = id;
}
}
if (records != null && !records.isEmpty() && nrClusterId != null) {
nrRecordClusterMetrics(records, nrClusterId);
}
return records;
}

private static void nrRecordClusterMetrics(ConsumerRecords<?, ?> records, String clusterId) {
final java.util.Map<String, Integer> topicCounts = new java.util.HashMap<>();
for (ConsumerRecord<?, ?> record : records) {
String topic = record.topic();
topicCounts.put(topic, topicCounts.getOrDefault(topic, 0) + 1);
}
for (java.util.Map.Entry<String, Integer> entry : topicCounts.entrySet()) {
NewRelic.getAgent().getMetricAggregator().recordMetric(
Utils.KAFKA_CLUSTER_METRIC_PREFIX + clusterId
+ Utils.KAFKA_CLUSTER_TOPIC_SEGMENT + entry.getKey()
+ Utils.KAFKA_CLUSTER_CONSUME_SUFFIX,
entry.getValue().floatValue());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.clients.producer;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.ClusterIdHelper;
import com.nr.instrumentation.kafka.Utils;
import java.util.concurrent.Future;

@Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer")
public class KafkaProducer_Instrumentation<K, V> {

@NewField
private volatile String nrClusterId;

private Future<RecordMetadata> doSend(ProducerRecord record, Callback callback) {
if (nrClusterId == null) {
String id = ClusterIdHelper.fromProducer(this);
if (id != null) {
nrClusterId = id;
}
}

final Future<RecordMetadata> result = Weaver.callOriginal();

if (nrClusterId != null) {
NewRelic.getAgent().getMetricAggregator().recordMetric(
Utils.KAFKA_CLUSTER_METRIC_PREFIX + nrClusterId
+ Utils.KAFKA_CLUSTER_TOPIC_SEGMENT + record.topic()
+ Utils.KAFKA_CLUSTER_PRODUCE_SUFFIX,
1.0f);
}

return result;
}
}
Loading