Skip to content

Commit db617eb

Browse files
andremmxrmx
andauthored
Improve confluent-kafka instrumentation examples (#3369)
* Improve confluent-kafka instrumentation example * Improve confluent-kafka tracer provider example * Apply suggestions from code review --------- Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent 3c60b62 commit db617eb

File tree

1 file changed

+10
-5
lines changed
  • instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka

1 file changed

+10
-5
lines changed

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
# report a span of type consumer with the default settings
3535
consumer = Consumer(conf2)
3636
37+
def msg_process(msg):
38+
print(msg)
39+
3740
def basic_consume_loop(consumer, topics):
3841
try:
3942
consumer.subscribe(topics)
@@ -44,7 +47,7 @@ def basic_consume_loop(consumer, topics):
4447
if msg.error():
4548
if msg.error().code() == KafkaError._PARTITION_EOF:
4649
# End of partition event
47-
sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}")
50+
sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}")
4851
elif msg.error():
4952
raise KafkaException(msg.error())
5053
else:
@@ -53,7 +56,7 @@ def basic_consume_loop(consumer, topics):
5356
# Close down consumer to commit final offsets.
5457
consumer.close()
5558
56-
basic_consume_loop(consumer, "my-topic")
59+
basic_consume_loop(consumer, ["my-topic"])
5760
5861
The _instrument method accepts the following keyword args:
5962
tracer_provider (TracerProvider) - an optional tracer provider
@@ -72,14 +75,16 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
7275
.. code:: python
7376
7477
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
78+
from opentelemetry.trace import get_tracer_provider
7579
7680
from confluent_kafka import Producer, Consumer
7781
7882
inst = ConfluentKafkaInstrumentor()
83+
tracer_provider = get_tracer_provider()
7984
80-
p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'})
81-
c = confluent_kafka.Consumer({
82-
'bootstrap.servers': 'localhost:29092',
85+
p = Producer({'bootstrap.servers': 'localhost:9092'})
86+
c = Consumer({
87+
'bootstrap.servers': 'localhost:9092',
8388
'group.id': 'mygroup',
8489
'auto.offset.reset': 'earliest'
8590
})

0 commit comments

Comments
 (0)