1616# under the License.
1717
1818"""kafka operate"""
19+ import inspect
1920import json
2021from kafka import KafkaProducer , KafkaConsumer
2122from kafka .errors import KafkaError
23+ from opentelemetry import trace
2224from ..config .config import KAFKA_CLUSTER
2325from ..milvus import milvus
2426from ..milvus .operators import do_upload
2527
28+ tracer = trace .get_tracer (__name__ )
29+
2630
2731class KafkaHelper :
2832 """
@@ -39,25 +43,27 @@ def connect_producer(self):
3943 """
4044 connect kafka producer
4145 """
42- try :
43- self .producer = KafkaProducer (
44- bootstrap_servers = self .bootstrap_servers )
45- print ("Connected to Kafka producer successfully." )
46- except KafkaError as e :
47- print (f"Failed to connect to Kafka producer: { e } " )
46+ with tracer .start_as_current_span (inspect .getframeinfo (inspect .currentframe ()).function ):
47+ try :
48+ self .producer = KafkaProducer (
49+ bootstrap_servers = self .bootstrap_servers )
50+ print ("Connected to Kafka producer successfully." )
51+ except KafkaError as e :
52+ print (f"Failed to connect to Kafka producer: { e } " )
4853
4954 def connect_consumer (self , topic ):
5055 """
5156 connect kafka consumer
5257 """
53- try :
54- self .consumer = KafkaConsumer (
55- topic , bootstrap_servers = self .bootstrap_servers )
56- print (
57- f"Connected to Kafka consumer successfully. Listening to topic: { topic } "
58- )
59- except KafkaError as e :
60- print (f"Failed to connect to Kafka consumer: { e } " )
58+ with tracer .start_as_current_span (inspect .getframeinfo (inspect .currentframe ()).function ):
59+ try :
60+ self .consumer = KafkaConsumer (
61+ topic , bootstrap_servers = self .bootstrap_servers )
62+ print (
63+ f"Connected to Kafka consumer successfully. Listening to topic: { topic } "
64+ )
65+ except KafkaError as e :
66+ print (f"Failed to connect to Kafka consumer: { e } " )
6167
6268 def send_message (self , topic , msg ):
6369 """
@@ -88,14 +94,15 @@ def consume_messages_store_milvus(self, milvus_table):
8894 """
8995 consume messages from kafka and store in milvus
9096 """
91- if not self .consumer :
92- print ("No Kafka consumer connected." )
93- return
94- print ("Consuming messages..." )
95- for msg in self .consumer :
96- data = json .loads (msg .value .decode ('utf-8' ))
97- do_upload (milvus_table , int (data ["doc_id" ]), data ["title" ],
98- data ["body" ], self .milvus_client )
97+ with tracer .start_as_current_span (inspect .getframeinfo (inspect .currentframe ()).function ):
98+ if not self .consumer :
99+ print ("No Kafka consumer connected." )
100+ return
101+ print ("Consuming messages..." )
102+ for msg in self .consumer :
103+ data = json .loads (msg .value .decode ('utf-8' ))
104+ do_upload (milvus_table , int (data ["doc_id" ]), data ["title" ],
105+ data ["body" ], self .milvus_client )
99106
100107 def on_send_success (self , record_metadata ):
101108 """
0 commit comments