Skip to content

Commit 4ca7357

Browse files
feat(ISV-7020): add chunking capabilities
* chunking enable only in retry topics * tests assisted with Claude Signed-off-by: Marek Szymutko <mszymutk@redhat.com> Assisted-by: Claude-4.6-Opus
1 parent 9e0da43 commit 4ca7357

18 files changed

+832
-283
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""Module for tracking chunked messages"""
2+
3+
import logging
4+
import sys
5+
import uuid
6+
from collections import defaultdict
7+
8+
from confluent_kafka import Message
9+
10+
from retriable_kafka_client.headers import (
11+
CHUNK_GROUP_HEADER,
12+
NUMBER_OF_CHUNKS_HEADER,
13+
CHUNK_ID_HEADER,
14+
deserialize_number_from_bytes,
15+
get_header_value,
16+
)
17+
from retriable_kafka_client.kafka_utils import MessageGroup
18+
19+
LOGGER = logging.getLogger(__name__)
20+
21+
22+
def generate_group_id() -> bytes:
23+
"""Generate a random group id."""
24+
return uuid.uuid4().bytes
25+
26+
27+
def calculate_header_size(
28+
headers: dict[str, str | bytes] | list[tuple[str, str | bytes]] | None,
29+
) -> int:
30+
"""Approximate the space needed for headers within a message."""
31+
if not headers:
32+
return 0
33+
result = 0
34+
if not isinstance(headers, dict):
35+
headers = dict(headers)
36+
for header_key, header_value in headers.items():
37+
result += len(header_key) + sys.getsizeof(header_value) + 8
38+
# Two 32-bit numbers specifying lengths of fields are also
39+
# present, therefore adding 8
40+
return result
41+
42+
43+
class ChunkingCache:
44+
# pylint: disable=too-few-public-methods
45+
"""Class for storing information about received message fragments."""
46+
47+
def __init__(self):
48+
self._message_chunks: dict[tuple[bytes, str, int], dict[int, Message]] = (
49+
defaultdict(dict)
50+
)
51+
52+
def receive(self, message: Message) -> MessageGroup | None:
53+
"""
54+
Receive a message. If the message is whole, or it is
55+
the last fragment, returns the whole message group
56+
and flushes cache of this group. Otherwise, returns None
57+
and the message fragment shall not be processed.
58+
"""
59+
topic: str = message.topic() # type: ignore[assignment]
60+
partition: int = message.partition() # type: ignore[assignment]
61+
if (
62+
(group_id := get_header_value(message, CHUNK_GROUP_HEADER)) is not None
63+
and (
64+
number_of_chunks_raw := get_header_value(
65+
message, NUMBER_OF_CHUNKS_HEADER
66+
)
67+
)
68+
is not None
69+
and (chunk_id_raw := get_header_value(message, CHUNK_ID_HEADER)) is not None
70+
):
71+
number_of_chunks = deserialize_number_from_bytes(number_of_chunks_raw)
72+
chunk_id = deserialize_number_from_bytes(chunk_id_raw)
73+
identifier = (group_id, topic, partition)
74+
stored_message_ids_from_group = set(
75+
self._message_chunks.get(identifier, {}).keys()
76+
)
77+
stored_message_ids_from_group.add(chunk_id)
78+
if not all(
79+
i in stored_message_ids_from_group for i in range(number_of_chunks)
80+
):
81+
LOGGER.debug(
82+
"Received a message chunk, waiting for the other chunks..."
83+
)
84+
self._message_chunks[identifier][chunk_id] = message
85+
return None
86+
LOGGER.debug(
87+
"Received all message chunks, assembling group %s composed of %s messages.",
88+
group_id.hex(),
89+
number_of_chunks,
90+
)
91+
# Clear cache and reassemble, cache can be empty if
92+
# this is just one-message-sized value
93+
messages = self._message_chunks.pop(identifier, {})
94+
messages[chunk_id] = message
95+
return MessageGroup(topic, partition, messages, group_id)
96+
return MessageGroup(topic, partition, {0: message}, None)

src/retriable_kafka_client/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class ProducerConfig(CommonConfig):
4242
retries: int = field(default=3)
4343
fallback_factor: float = field(default=2.0)
4444
fallback_base: float = field(default=5.0)
45+
max_message_size: int = field(default=1000000)
46+
split_messages: bool = field(default=False)
4547

4648

4749
@dataclass

src/retriable_kafka_client/consumer.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Base Kafka Consumer module"""
22

3-
import json
43
import logging
54
import sys
65
from concurrent.futures import Executor, Future
@@ -9,8 +8,9 @@
98

109
from confluent_kafka import Consumer, Message, KafkaException, TopicPartition
1110

11+
from .chunking import ChunkingCache
1212
from .health import perform_healthcheck_using_client
13-
from .kafka_utils import message_to_partition
13+
from .kafka_utils import message_to_partition, MessageGroup
1414
from .kafka_settings import KafkaOptions, DEFAULT_CONSUMER_SETTINGS
1515
from .consumer_tracking import TrackingManager
1616
from .config import ConsumerConfig
@@ -91,6 +91,8 @@ def __init__(
9191
self.__retry_manager = RetryManager(config)
9292
# Store information about pending retried messages
9393
self.__schedule_cache = RetryScheduleCache()
94+
# Store chunking information
95+
self.__chunk_tracker = ChunkingCache()
9496

9597
@property
9698
def _consumer(self) -> Consumer:
@@ -140,7 +142,7 @@ def __on_revoke(self, _: Consumer, partitions: list[TopicPartition]) -> None:
140142
self.__tracking_manager.register_revoke(partitions)
141143
self.__perform_commits()
142144

143-
def __ack_message(self, message: Message, finished_future: Future) -> None:
145+
def __ack_message(self, message: MessageGroup, finished_future: Future) -> None:
144146
"""
145147
Private method only ever intended to be used from within
146148
_process_message(). It commits offsets and releases
@@ -155,7 +157,7 @@ def __ack_message(self, message: Message, finished_future: Future) -> None:
155157
if problem := finished_future.exception():
156158
LOGGER.error(
157159
"Message could not be processed! Message: %s.",
158-
message.value(),
160+
message.deserialize(),
159161
exc_info=problem,
160162
)
161163
self.__retry_manager.resend_message(message)
@@ -212,20 +214,17 @@ def _process_message(self, message: Message) -> Future[Any] | None:
212214
Returns: Future of the target execution if the message can be processed.
213215
None otherwise.
214216
"""
215-
message_value = message.value()
216-
if not message_value:
217-
# Discard empty messages
217+
message_group = self.__chunk_tracker.receive(message)
218+
if not message_group:
218219
return None
219-
try:
220-
message_data = json.loads(message_value)
221-
except json.decoder.JSONDecodeError:
222-
# This message cannot be deserialized, just log and discard it
223-
LOGGER.exception("Decoding error: not a valid JSON: %s", message.value())
220+
message_data = message_group.deserialize()
221+
if not message_data:
222+
self.__tracking_manager.schedule_commit(message_group)
224223
return None
225224
future = self._executor.submit(self._config.target, message_data)
226-
self.__tracking_manager.process_message(message, future)
225+
self.__tracking_manager.process_message(message_group, future)
227226
# The semaphore is released within this callback
228-
future.add_done_callback(lambda res: self.__ack_message(message, res))
227+
future.add_done_callback(lambda res: self.__ack_message(message_group, res))
229228
return future
230229

231230
### Public methods ###

src/retriable_kafka_client/consumer_tracking.py

Lines changed: 39 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -3,51 +3,19 @@
33
import logging
44
from collections import defaultdict
55
from concurrent.futures import Future
6+
from itertools import chain
67
from threading import Lock, Semaphore
7-
from typing import NamedTuple, Any
8+
from typing import Any, Iterable
89

9-
from confluent_kafka import Message, TopicPartition
10+
from confluent_kafka import TopicPartition
11+
12+
from retriable_kafka_client.kafka_utils import TrackingInfo, MessageGroup
1013

1114
LOGGER = logging.getLogger(__name__)
1215

1316

14-
class _PartitionInfo(NamedTuple):
15-
"""
16-
Consistently hashable dataclass for storing information about a partition,
17-
namely offset information. Can be used as keys in a dictionary.
18-
"""
19-
20-
topic: str
21-
partition: int
22-
23-
@staticmethod
24-
def from_message(message: Message) -> "_PartitionInfo":
25-
"""
26-
Create a PartitionInfo from a Kafka message.
27-
Args:
28-
message: Kafka message object
29-
Returns: hashable info about a partition
30-
"""
31-
message_topic = message.topic()
32-
message_partition = message.partition()
33-
# This should never happen with polled messages. Polled messages need
34-
# the information asserted to be valid Kafka messages. This can
35-
# happen only for custom-created messages objects, which this
36-
# method is not intended to be used for
37-
assert message_topic is not None and message_partition is not None, (
38-
"Invalid message cannot be converted to partition info"
39-
)
40-
return _PartitionInfo(message_topic, message_partition)
41-
42-
def to_offset_info(self, offset: int) -> TopicPartition:
43-
"""
44-
Create a Kafka-committable object using the provided offset.
45-
Args:
46-
offset: The offset to be committed. Make sure to commit
47-
offset one higher than the latest processed message.
48-
Returns: The committable Kafka object
49-
"""
50-
return TopicPartition(topic=self.topic, partition=self.partition, offset=offset)
17+
def _flatten_offsets(done_offsets: Iterable[tuple[int, ...]]) -> list[int]:
18+
return list(chain(*done_offsets))
5119

5220

5321
class TrackingManager:
@@ -85,8 +53,10 @@ class TrackingManager:
8553
"""
8654

8755
def __init__(self, concurrency: int, cancel_wait_time: float):
88-
self.__to_process: dict[_PartitionInfo, dict[int, Future]] = defaultdict(dict)
89-
self.__to_commit: dict[_PartitionInfo, set[int]] = defaultdict(set)
56+
self.__to_process: dict[TrackingInfo, dict[tuple[int, ...], Future]] = (
57+
defaultdict(dict)
58+
)
59+
self.__to_commit: dict[TrackingInfo, set[tuple[int, ...]]] = defaultdict(set)
9060
self.__access_lock = Lock() # For handling multithreaded access to this object
9161
self.__semaphore = Semaphore(concurrency)
9262
self.__cancel_wait_time = cancel_wait_time
@@ -105,15 +75,15 @@ def pop_committable(self) -> list[TopicPartition]:
10575
"""
10676
to_commit = []
10777
with self.__access_lock:
108-
for partition_info, pending_to_commit in self.__to_commit.items():
109-
if not pending_to_commit:
78+
for partition_info, tuples_pending_to_commit in self.__to_commit.items():
79+
if not tuples_pending_to_commit:
11080
# Nothing to commit
11181
continue
11282

113-
pending_to_process = self.__to_process.get(partition_info, None)
114-
if not pending_to_process:
83+
tuples_pending_to_process = self.__to_process.get(partition_info, None)
84+
if not tuples_pending_to_process:
11585
# Nothing is blocking the committing
116-
max_to_commit = max(pending_to_commit)
86+
max_to_commit = max(_flatten_offsets(tuples_pending_to_commit))
11787
to_commit.append(
11888
TopicPartition(
11989
topic=partition_info.topic,
@@ -123,17 +93,18 @@ def pop_committable(self) -> list[TopicPartition]:
12393
)
12494
self.__to_commit[partition_info] = set()
12595
continue
126-
127-
min_pending_to_process = min(pending_to_process)
96+
min_pending_to_process = min(
97+
_flatten_offsets(tuples_pending_to_process)
98+
)
12899
commit_candidates = {
129-
offset
130-
for offset in pending_to_commit
131-
if offset < min_pending_to_process
100+
offset_tuple
101+
for offset_tuple in tuples_pending_to_commit
102+
if all(offset < min_pending_to_process for offset in offset_tuple)
132103
}
133104
if not commit_candidates:
134105
# Nothing to commit
135106
continue
136-
max_to_commit = max(commit_candidates)
107+
max_to_commit = max(_flatten_offsets(commit_candidates))
137108
to_commit.append(
138109
TopicPartition(
139110
topic=partition_info.topic,
@@ -157,11 +128,11 @@ def reschedule_uncommittable(
157128
failed_committable: list of data that failed to be committed
158129
"""
159130
for failed in failed_committable:
160-
self.__to_commit.setdefault(
161-
_PartitionInfo(topic=failed.topic, partition=failed.partition), set()
162-
).add(failed.offset)
131+
self.__to_commit[
132+
TrackingInfo(topic=failed.topic, partition=failed.partition)
133+
].add((failed.offset,))
163134

164-
def process_message(self, message: Message, future: Future[Any]) -> None:
135+
def process_message(self, message: MessageGroup, future: Future[Any]) -> None:
165136
"""
166137
Mark message as pending for processing.
167138
Args:
@@ -170,15 +141,16 @@ def process_message(self, message: Message, future: Future[Any]) -> None:
170141
"""
171142
# We cannot really use context manager, the semaphore is released in
172143
# future's callback or when the future is cancelled
144+
173145
self.__semaphore.acquire() # pylint: disable=consider-using-with
174-
message_offset: int = message.offset() # type: ignore[assignment]
146+
message_offsets = message.offsets
175147
with self.__access_lock:
176148
# Mark the message as being processed
177-
self.__to_process[_PartitionInfo.from_message(message)][
178-
message_offset + 1
149+
self.__to_process[TrackingInfo.from_message_group(message)][
150+
tuple(message_offset + 1 for message_offset in message_offsets)
179151
] = future
180152

181-
def schedule_commit(self, message: Message) -> bool:
153+
def schedule_commit(self, message: MessageGroup) -> bool:
182154
"""
183155
Mark message as pending for committing when its processing is fully done.
184156
Args:
@@ -188,12 +160,12 @@ def schedule_commit(self, message: Message) -> bool:
188160
as pending for processing), False otherwise
189161
"""
190162
self.__semaphore.release()
191-
partition_info = _PartitionInfo.from_message(message)
192-
message_offset: int = message.offset() # type: ignore[assignment]
193-
stored_offset = message_offset + 1
163+
partition_info = TrackingInfo.from_message_group(message)
164+
message_offsets = message.offsets
165+
stored_offsets = tuple(message_offset + 1 for message_offset in message_offsets)
194166
with self.__access_lock:
195-
self.__to_process[partition_info].pop(stored_offset, None)
196-
self.__to_commit.setdefault(partition_info, set()).add(stored_offset)
167+
self.__to_process[partition_info].pop(stored_offsets, None)
168+
self.__to_commit.setdefault(partition_info, set()).add(stored_offsets)
197169
self._cleanup()
198170
return True
199171

@@ -212,7 +184,7 @@ def _cleanup(self) -> None:
212184
cache_to_clean.pop(key, None)
213185

214186
def _revoke_processing(
215-
self, revoked_partitions: set[_PartitionInfo]
187+
self, revoked_partitions: set[TrackingInfo]
216188
) -> list[Future[Any]]:
217189
"""
218190
Cancel all pending tracked futures related to the given partitions.
@@ -253,7 +225,7 @@ def register_revoke(self, partitions: list[TopicPartition] | None = None) -> Non
253225
revoked_partition_keys = set(self.__to_process.keys())
254226
else:
255227
revoked_partition_keys = {
256-
_PartitionInfo(partition=partition.partition, topic=partition.topic)
228+
TrackingInfo(partition=partition.partition, topic=partition.topic)
257229
for partition in partitions
258230
}
259231
pending_futures = self._revoke_processing(revoked_partition_keys)

0 commit comments

Comments
 (0)