Skip to content

Commit f7dcaa8

Browse files
committed
fix kafka decoding error blocking consumption
For some reason we received a non utf-8 message from Kafka topic, and the application is getting stuck with this error: ``` Process Process-27: Traceback (most recent call last): File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap self.run() File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3/dist-packages/napalm_logs/listener_proc.py", line 102, in start log_message, log_source = self.listener.receive() ^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3/dist-packages/napalm_logs/listener/kafka.py", line 83, in receive log_source = log_source.decode() ^^^^^^^^^^^^^^^^^^^ UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa2 in position 1: invalid start byte ``` This fix try catch on the initial key decoding and return and follow the same error handling pattern as the message json decoding below.
1 parent 54e7ea5 commit f7dcaa8

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

napalm_logs/listener/kafka.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,20 @@ def receive(self):
7171
raise ListenerException(error)
7272
log_source = msg.key
7373
if isinstance(log_source, bytes):
74-
log_source = log_source.decode()
74+
try:
75+
log_source = log_source.decode()
76+
except UnicodeDecodeError:
77+
log.error("Unable to decode log source: %s", msg.key)
78+
return "", ""
7579
try:
7680
decoded = json.loads(msg.value.decode("utf-8"))
7781
except ValueError:
7882
log.error("Not in json format: %s", msg.value.decode("utf-8"))
7983
return "", ""
8084
log_message = decoded.get("message")
81-
log.debug("[%s] Received from kafka %s from %s", log_message, log_source, time.time())
85+
log.debug(
86+
"[%s] Received from kafka %s from %s", log_message, log_source, time.time()
87+
)
8288
return log_message, log_source
8389

8490
def stop(self):

0 commit comments

Comments
 (0)