File tree Expand file tree Collapse file tree 1 file changed +8
-2
lines changed Expand file tree Collapse file tree 1 file changed +8
-2
lines changed Original file line number Diff line number Diff line change 1010 GROUP_ID_SUGGESTIONS_ENGINE ,
1111 POLL_TIMEOUT_SECS
1212)
13+ from confluent_kafka import KafkaException
1314from ros .lib import consume
1415from ros .lib .app import app
1516from ros .extensions import db
@@ -68,15 +69,18 @@ def run(self):
6869 if message is None :
6970 continue
7071
72+ if message .error ():
73+ logging .error (f"{ self .service } - Consumer error: { message .error ()} " )
74+ raise KafkaException (message .error ())
75+
7176 try :
7277 payload = json .loads (message .value ().decode ('utf-8' ))
7378 event_type = payload .get ('type' )
7479
7580 if event_type != 'delete' :
7681 continue
7782
78- host = payload .get ('host' )
79- host_id = host .get ('id' )
83+ host_id = payload .get ('id' )
8084
8185 logging .debug (
8286 f"{ self .service } - Received a message for system with inventory_id { host_id } "
@@ -89,6 +93,8 @@ def run(self):
8993 logging .error (f"{ self .service } - { self .event } - Failed to decode message: { error } " )
9094 except Exception as error :
9195 logging .error (f"{ self .service } - { self .event } - Error processing message: { error } " )
96+ finally :
97+ self .consumer .commit ()
9298
9399 except Exception as error :
94100 logging .error (f"{ self .service } - { self .event } - error: { error } " )
You can’t perform that action at this time.
0 commit comments