diff --git a/napalm_logs/listener/kafka.py b/napalm_logs/listener/kafka.py index 14545cd4..a910a56e 100644 --- a/napalm_logs/listener/kafka.py +++ b/napalm_logs/listener/kafka.py @@ -71,14 +71,20 @@ def receive(self): raise ListenerException(error) log_source = msg.key if isinstance(log_source, bytes): - log_source = log_source.decode() + try: + log_source = log_source.decode() + except UnicodeDecodeError: + log.error("Unable to decode log source: %s", msg.key) + return "", "" try: decoded = json.loads(msg.value.decode("utf-8")) except ValueError: log.error("Not in json format: %s", msg.value.decode("utf-8")) return "", "" log_message = decoded.get("message") - log.debug("[%s] Received from kafka %s from %s", log_message, log_source, time.time()) + log.debug( + "[%s] Received from kafka %s from %s", log_message, log_source, time.time() + ) return log_message, log_source def stop(self):