Skip to content

Commit 4fb6dc5

Browse files
authored
fix consumer instrumentation for kafka 3.7.0+ (#1340)
* fix consumer instrumentation for kafka 3.7.0+ * fmt * use _ not ?
1 parent 09245ac commit 4fb6dc5

File tree

4 files changed

+131
-10
lines changed

4 files changed

+131
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2013-2020 The Kamon Project <https://kamon.io>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kamon.instrumentation.kafka.client.advisor;
18+
19+
import kamon.Kamon;
20+
import kamon.instrumentation.kafka.client.RecordProcessor;
21+
import kanela.agent.libs.net.bytebuddy.asm.Advice;
22+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
25+
import java.time.Instant;
26+
import java.util.Optional;
27+
28+
/**
29+
* Consumer Instrumentation
30+
*/
31+
public class PollMethodAdvisor_3_7_0_and_up_Async {
32+
@Advice.OnMethodEnter(suppress = Throwable.class)
33+
public static void onEnter(@Advice.Local("startTime") Instant startTime) {
34+
startTime = Kamon.clock().instant();
35+
}
36+
37+
@Advice.OnMethodExit(suppress = Throwable.class)
38+
public static <K, V> void onExit(
39+
@Advice.Local("startTime") Instant startTime,
40+
@Advice.FieldValue(value = "groupMetadata") Optional<ConsumerGroupMetadata> groupMetadata,
41+
@Advice.FieldValue("clientId") String clientId,
42+
@Advice.Return(readOnly = false) ConsumerRecords<K, V> records) {
43+
44+
records = RecordProcessor.process(startTime, clientId, groupMetadata, records);
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2013-2020 The Kamon Project <https://kamon.io>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kamon.instrumentation.kafka.client.advisor;
18+
19+
import kamon.Kamon;
20+
import kamon.instrumentation.kafka.client.RecordProcessor;
21+
import kanela.agent.libs.net.bytebuddy.asm.Advice;
22+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
25+
26+
import java.time.Instant;
27+
import java.util.Optional;
28+
29+
/**
30+
* Consumer Instrumentation
31+
*/
32+
public class PollMethodAdvisor_3_7_0_and_up_Legacy {
33+
@Advice.OnMethodEnter(suppress = Throwable.class)
34+
public static void onEnter(@Advice.Local("startTime") Instant startTime) {
35+
startTime = Kamon.clock().instant();
36+
}
37+
38+
@Advice.OnMethodExit(suppress = Throwable.class)
39+
public static <K, V> void onExit(
40+
@Advice.Local("startTime") Instant startTime,
41+
@Advice.FieldValue(value = "coordinator") ConsumerCoordinator coordinator,
42+
@Advice.FieldValue("clientId") String clientId,
43+
@Advice.Return(readOnly = false) ConsumerRecords<K, V> records) {
44+
45+
records = RecordProcessor.process(startTime, clientId, coordinator, records);
46+
}
47+
}

instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala

+27-5
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,50 @@
1717
package kamon.instrumentation.kafka.client
1818

1919
import java.time.{Duration, Instant}
20-
2120
import kamon.context.Context
2221
import kamon.instrumentation.kafka.client.ConsumedRecordData.ConsumerInfo
23-
import kamon.instrumentation.kafka.client.advisor.PollMethodAdvisor
22+
import kamon.instrumentation.kafka.client.advisor.{
23+
PollMethodAdvisor,
24+
PollMethodAdvisor_3_7_0_and_up_Async,
25+
PollMethodAdvisor_3_7_0_and_up_Legacy
26+
}
2427
import kanela.agent.api.instrumentation.InstrumentationBuilder
28+
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.{declaresField, hasType, named}
2529

2630
class ConsumerInstrumentation extends InstrumentationBuilder {
2731

2832
/**
2933
* Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Long)
3034
* Kafka version < 2.3
3135
*/
32-
onType("org.apache.kafka.clients.consumer.KafkaConsumer")
36+
onTypesMatching(named("org.apache.kafka.clients.consumer.KafkaConsumer").and(declaresField(named("groupId"))))
3337
.advise(method("poll").and(withArgument(0, classOf[Long])), classOf[PollMethodAdvisor])
3438

3539
/**
3640
* Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration)
37-
* Kafka version >= 2.3
41+
* Kafka version >= 2.3 < 3.7
3842
*/
39-
onType("org.apache.kafka.clients.consumer.KafkaConsumer")
43+
onTypesMatching(named("org.apache.kafka.clients.consumer.KafkaConsumer").and(declaresField(named("groupId"))))
4044
.advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor])
4145

46+
/**
47+
* Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration)
48+
* Kafka version >= 3.7
49+
*/
50+
onTypesMatching(
51+
named("org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer").and(declaresField(named("groupMetadata")))
52+
)
53+
.advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor_3_7_0_and_up_Async])
54+
55+
/**
56+
* Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration)
57+
* Kafka version >= 3.7
58+
*/
59+
onTypesMatching(
60+
named("org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer").and(declaresField(named("coordinator")))
61+
)
62+
.advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor_3_7_0_and_up_Legacy])
63+
4264
/**
4365
* Instruments org.apache.kafka.clients.consumer.ConsumerRecord with the HasSpan mixin in order
4466
* to make the span available as parent for down stream operations

instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala

+11-5
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ package kamon.instrumentation.kafka.client
1818

1919
import java.time.Instant
2020
import java.util.Optional
21-
2221
import kamon.Kamon
2322
import kamon.context.Context
2423
import kamon.instrumentation.kafka.client.ConsumedRecordData.ConsumerInfo
25-
import org.apache.kafka.clients.consumer.ConsumerRecords
24+
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
25+
import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, ConsumerRecords}
2626

2727
private[kafka] object RecordProcessor {
2828

@@ -61,8 +61,14 @@ private[kafka] object RecordProcessor {
6161
* KafkaConsumer which versions < 2.5 relies on internal groupId: String and higher versions in Optional[String].
6262
*/
6363
private def resolve(groupId: AnyRef): Option[String] = groupId match {
64-
case opt: Optional[String] => if (opt.isPresent) Some(opt.get()) else None
65-
case value: String => Option(value)
66-
case _ => None
64+
case opt: Optional[_] =>
65+
if (opt.isPresent) opt.get() match {
66+
case s: String => Some(s)
67+
case meta: ConsumerGroupMetadata => Some(meta.groupId())
68+
}
69+
else None
70+
case value: String => Option(value)
71+
case coord: ConsumerCoordinator => Some(coord.groupMetadata().groupId())
72+
case _ => None
6773
}
6874
}

0 commit comments

Comments
 (0)