From 5dab07b4f0d5ae2e97c566adced828c49f2e862c Mon Sep 17 00:00:00 2001 From: Nikita Veselenko Date: Fri, 15 Aug 2025 16:29:51 +0300 Subject: [PATCH 1/5] fix: add replacing non-utf-8 bytes on decoding headers --- faststream/confluent/parser.py | 4 +++- faststream/kafka/parser.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/faststream/confluent/parser.py b/faststream/confluent/parser.py index fed9ae72fe..be9f0bb406 100644 --- a/faststream/confluent/parser.py +++ b/faststream/confluent/parser.py @@ -96,4 +96,6 @@ async def decode_message_batch( def _parse_msg_headers( headers: Sequence[tuple[str, bytes | str]], ) -> dict[str, str]: - return {i: j if isinstance(j, str) else j.decode() for i, j in headers} + return { + i: j if isinstance(j, str) else j.decode(errors="replace") for i, j in headers + } diff --git a/faststream/kafka/parser.py b/faststream/kafka/parser.py index cb3e956bb1..c2e31f17ef 100644 --- a/faststream/kafka/parser.py +++ b/faststream/kafka/parser.py @@ -38,7 +38,7 @@ async def parse_message( message: Union["ConsumerRecord", "KafkaRawMessage"], ) -> "StreamMessage[ConsumerRecord]": """Parses a Kafka message.""" - headers = {i: j.decode() for i, j in message.headers} + headers = {i: j.decode(errors="replace") for i, j in message.headers} return self.msg_class( body=message.value or b"", From 1d30220e78cc3e42833b72601f755803a61e3f84 Mon Sep 17 00:00:00 2001 From: Nikita Veselenko Date: Sat, 16 Aug 2025 11:57:35 +0300 Subject: [PATCH 2/5] draft: add support for binary Kafka headers (confluent only) --- faststream/confluent/parser.py | 23 +++++++++-------------- faststream/message/message.py | 19 +++++++++++++++---- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/faststream/confluent/parser.py b/faststream/confluent/parser.py index be9f0bb406..fc39129991 100644 --- a/faststream/confluent/parser.py +++ b/faststream/confluent/parser.py @@ -1,4 +1,3 @@ -from collections.abc import Sequence from typing import TYPE_CHECKING, Any from faststream.message import StreamMessage, decode_message @@ -28,7 +27,7 @@ async def parse_message( message: "Message", ) -> KafkaMessage: """Parses a Kafka message.""" - headers = _parse_msg_headers(message.headers() or ()) + headers = dict(message.headers() or ()) body = message.value() or b"" offset = message.offset() @@ -38,9 +37,13 @@ async def parse_message( body=body, headers=headers, reply_to=headers.get("reply_to", ""), - content_type=headers.get("content-type"), + content_type=headers.get("content-type").decode() + if "content-type" in headers + else None, message_id=f"{offset}-{timestamp}", - correlation_id=headers.get("correlation_id"), + correlation_id=headers.get("correlation_id").decode() + if "correlation_id" in headers + else None, raw_message=message, consumer=self._consumer, is_manual=self.is_manual, @@ -52,14 +55,14 @@ async def parse_message_batch( ) -> KafkaMessage: """Parses a batch of messages from a Kafka consumer.""" body: list[Any] = [] - batch_headers: list[dict[str, str]] = [] + batch_headers: list[dict[str, bytes]] = [] first = message[0] last = message[-1] for m in message: body.append(m.value() or b"") - batch_headers.append(_parse_msg_headers(m.headers() or ())) + batch_headers.append(dict(m.headers() or ())) headers = next(iter(batch_headers), {}) @@ -91,11 +94,3 @@ async def decode_message_batch( ) -> "DecodedMessage": """Decode a batch of messages.""" return [decode_message(await self.parse_message(m)) for m in msg.raw_message] - - -def _parse_msg_headers( - headers: Sequence[tuple[str, bytes | str]], -) -> dict[str, str]: - return { - i: j if isinstance(j, str) else j.decode(errors="replace") for i, j in headers - } diff --git a/faststream/message/message.py b/faststream/message/message.py index 8a73da3aa4..b768cd7cfa 100644 --- a/faststream/message/message.py +++ b/faststream/message/message.py @@ -31,9 +31,9 @@ def __init__( raw_message: "MsgType", body: bytes | Any, *, - headers: dict[str, Any] | None = None, + headers: dict[str, bytes] | None = None, reply_to: str = "", - batch_headers: list[dict[str, Any]] | None = None, + batch_headers: list[dict[str, bytes]] | None = None, path: dict[str, Any] | None = None, content_type: str | None = None, correlation_id: str | None = None, @@ -46,7 +46,7 @@ def __init__( self.content_type = content_type self.source_type = source_type - self.headers = headers or {} + self.raw_headers = headers or {} self.batch_headers = batch_headers or [] self.path = path or {} self.correlation_id = correlation_id or str(uuid4()) @@ -78,7 +78,7 @@ def __repr__(self) -> str: f"message_id={self.message_id}", f"correlation_id={self.correlation_id}", f"reply_to={self.reply_to}" if self.reply_to else "", - f"headers={self.headers}", + f"headers={self.raw_headers}", f"path={self.path}", f"committed={self.committed}", f"raw_message={self.raw_message}", @@ -112,3 +112,14 @@ async def nack(self) -> None: async def reject(self) -> None: if self.committed is None: self.committed = AckStatus.REJECTED + + @property + def headers(self) -> dict[str, str]: + """Parse raw_headers values to str. + + Returns ... + """ + return { + header: value if isinstance(value, str) else value.decode() + for header, value in self.raw_headers.items() + } From 00144599de98b5e6cafd0e3c98904466686a84d0 Mon Sep 17 00:00:00 2001 From: Nikita Veselenko Date: Wed, 20 Aug 2025 18:50:54 +0300 Subject: [PATCH 3/5] refactor: Bytes decoding has been changed to 'replace' undecodable characters to avoid errors --- faststream/message/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/message/message.py b/faststream/message/message.py index b768cd7cfa..d7241b3b4f 100644 --- a/faststream/message/message.py +++ b/faststream/message/message.py @@ -120,6 +120,6 @@ def headers(self) -> dict[str, str]: Returns ... """ return { - header: value if isinstance(value, str) else value.decode() + header: value if isinstance(value, str) else value.decode(errors="replace") for header, value in self.raw_headers.items() } From fe0f8bd1980f8318c52c390c88ff349ff621a641 Mon Sep 17 00:00:00 2001 From: Nikita Veselenko Date: Fri, 22 Aug 2025 18:56:29 +0300 Subject: [PATCH 4/5] draft: remove headers decoding for confluent --- faststream/confluent/parser.py | 16 ++++++++++++---- faststream/message/message.py | 15 ++------------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/faststream/confluent/parser.py b/faststream/confluent/parser.py index fc39129991..5afb8a5ed7 100644 --- a/faststream/confluent/parser.py +++ b/faststream/confluent/parser.py @@ -36,7 +36,9 @@ async def parse_message( return KafkaMessage( body=body, headers=headers, - reply_to=headers.get("reply_to", ""), + reply_to=headers.get("reply_to").decode() + if "content-type" in headers + else None, content_type=headers.get("content-type").decode() if "content-type" in headers else None, @@ -72,10 +74,16 @@ async def parse_message_batch( body=body, headers=headers, batch_headers=batch_headers, - reply_to=headers.get("reply_to", ""), - content_type=headers.get("content-type"), + reply_to=headers.get("reply_to").decode() + if "content-type" in headers + else None, + content_type=headers.get("content-type").decode() + if "content-type" in headers + else None, message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}", - correlation_id=headers.get("correlation_id"), + correlation_id=headers.get("correlation_id").decode() + if "correlation_id" in headers + else None, raw_message=message, consumer=self._consumer, is_manual=self.is_manual, diff --git a/faststream/message/message.py b/faststream/message/message.py index d7241b3b4f..e90a1f570b 100644 --- a/faststream/message/message.py +++ b/faststream/message/message.py @@ -46,7 +46,7 @@ def __init__( self.content_type = content_type self.source_type = source_type - self.raw_headers = headers or {} + self.headers = headers or {} self.batch_headers = batch_headers or [] self.path = path or {} self.correlation_id = correlation_id or str(uuid4()) @@ -78,7 +78,7 @@ def __repr__(self) -> str: f"message_id={self.message_id}", f"correlation_id={self.correlation_id}", f"reply_to={self.reply_to}" if self.reply_to else "", - f"headers={self.raw_headers}", + f"headers={self.headers}", f"path={self.path}", f"committed={self.committed}", f"raw_message={self.raw_message}", @@ -112,14 +112,3 @@ async def nack(self) -> None: async def reject(self) -> None: if self.committed is None: self.committed = AckStatus.REJECTED - - @property - def headers(self) -> dict[str, str]: - """Parse raw_headers values to str. - - Returns ... - """ - return { - header: value if isinstance(value, str) else value.decode(errors="replace") - for header, value in self.raw_headers.items() - } From d0ee2c6911627934e78d958536e428ebb2cb1ec6 Mon Sep 17 00:00:00 2001 From: Nikita Veselenko Date: Mon, 25 Aug 2025 14:39:41 +0300 Subject: [PATCH 5/5] feat: refactored the decoding of key headers --- faststream/confluent/parser.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/faststream/confluent/parser.py b/faststream/confluent/parser.py index 5afb8a5ed7..fe88fc88cd 100644 --- a/faststream/confluent/parser.py +++ b/faststream/confluent/parser.py @@ -28,6 +28,14 @@ async def parse_message( ) -> KafkaMessage: """Parses a Kafka message.""" headers = dict(message.headers() or ()) + decoded_headers = { + header: value.decode(errors="replace") + for header, value in ( + headers.get("reply_to"), + headers.get("content-type"), + headers.get("correlation_id"), + ) + } body = message.value() or b"" offset = message.offset() @@ -36,16 +44,10 @@ async def parse_message( return KafkaMessage( body=body, headers=headers, - reply_to=headers.get("reply_to").decode() - if "content-type" in headers - else None, - content_type=headers.get("content-type").decode() - if "content-type" in headers - else None, + reply_to=decoded_headers.get("reply_to") or None, + content_type=decoded_headers.get("content-type") or None, message_id=f"{offset}-{timestamp}", - correlation_id=headers.get("correlation_id").decode() - if "correlation_id" in headers - else None, + correlation_id=decoded_headers.get("correlation_id") or None, raw_message=message, consumer=self._consumer, is_manual=self.is_manual,