Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ This whole mechanism **does not ensure message ordering**. When a message is
sent to be retried, another message processing from the same topic is still
unblocked.

### Message splitting

This library also supports sending large messages to topics which don't have
the capacity to process these messages as whole. To do this, producers are
configurable to automatically split messages according to message size.

This feature is custom and is therefore turned off by default. The only place
this feature is always enabled are retry-topics, which are meant to be consumed
only by clients using this library.

The chunked messages have 3 additional headers:

* Group ID (uuid4 value)
* Chunk ID (serial number of the chunk within group, starting with 0)
* Number of chunks (is always +1 from the last chunk ID)

Message is deserialized and processed only if all expected chunks have been
found.

## Contributing guidelines

To check contributing guidelines, please check `CONTRIBUTING.md` in the
Expand Down
102 changes: 102 additions & 0 deletions src/retriable_kafka_client/chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Module for tracking chunked messages"""

import logging
import sys
import uuid
from collections import defaultdict

from confluent_kafka import Message

from retriable_kafka_client.headers import (
CHUNK_GROUP_HEADER,
NUMBER_OF_CHUNKS_HEADER,
CHUNK_ID_HEADER,
deserialize_number_from_bytes,
get_header_value,
)
from retriable_kafka_client.kafka_utils import MessageGroup

LOGGER = logging.getLogger(__name__)


def generate_group_id() -> bytes:
"""Generate a random group id."""
return uuid.uuid4().bytes
Comment thread
bclindner marked this conversation as resolved.


def calculate_header_size(
headers: dict[str, str | bytes] | list[tuple[str, str | bytes]] | None,
) -> int:
"""Approximate the space needed for headers within a message."""
if not headers:
return 0
result = 0
if not isinstance(headers, dict):
headers = dict(headers)
for header_key, header_value in headers.items():
result += len(header_key) + sys.getsizeof(header_value) + 8
# Two 32-bit numbers specifying lengths of fields are also
# present, therefore adding 8
return result


class ChunkingCache:
# pylint: disable=too-few-public-methods
"""Class for storing information about received message fragments."""

def __init__(self):
# The tuple holds group ID, topic and partition
# in case of group ID collision (that could mean an attack attempt).
# If the same consumer was used for different topics, an adversary
# may want to override split messages to execute different operations.
# By using group id as well as topic and partition, this attack is
# made impossible.
self._message_chunks: dict[tuple[bytes, str, int], dict[int, Message]] = (
defaultdict(dict)
)

def receive(self, message: Message) -> MessageGroup | None:
"""
Receive a message. If the message is whole, or it is
the last fragment, returns the whole message group
and flushes cache of this group. Otherwise, returns None
and the message fragment shall not be processed.
"""
topic: str = message.topic() # type: ignore[assignment]
partition: int = message.partition() # type: ignore[assignment]
if (
(group_id := get_header_value(message, CHUNK_GROUP_HEADER)) is not None
and (
number_of_chunks_raw := get_header_value(
message, NUMBER_OF_CHUNKS_HEADER
)
)
is not None
and (chunk_id_raw := get_header_value(message, CHUNK_ID_HEADER)) is not None
):
number_of_chunks = deserialize_number_from_bytes(number_of_chunks_raw)
chunk_id = deserialize_number_from_bytes(chunk_id_raw)
identifier = (group_id, topic, partition)
stored_message_ids_from_group = set(
self._message_chunks.get(identifier, {}).keys()
)
stored_message_ids_from_group.add(chunk_id)
if not all(
i in stored_message_ids_from_group for i in range(number_of_chunks)
):
LOGGER.debug(
"Received a message chunk, waiting for the other chunks..."
)
self._message_chunks[identifier][chunk_id] = message
return None
LOGGER.debug(
"Received all message chunks, assembling group %s composed of %s messages.",
group_id.hex(),
number_of_chunks,
)
# Clear cache and reassemble, cache can be empty if
# this is just one-message-sized value
messages = self._message_chunks.pop(identifier, {})
messages[chunk_id] = message
return MessageGroup(topic, partition, messages, group_id)
return MessageGroup(topic, partition, {0: message}, None)
1 change: 1 addition & 0 deletions src/retriable_kafka_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ProducerConfig(CommonConfig):
retries: int = field(default=3)
fallback_factor: float = field(default=2.0)
fallback_base: float = field(default=5.0)
split_messages: bool = field(default=False)


@dataclass
Expand Down
27 changes: 13 additions & 14 deletions src/retriable_kafka_client/consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Base Kafka Consumer module"""

import json
import logging
import sys
from concurrent.futures import Executor, Future
Expand All @@ -9,8 +8,9 @@

from confluent_kafka import Consumer, Message, KafkaException, TopicPartition

from .chunking import ChunkingCache
from .health import perform_healthcheck_using_client
from .kafka_utils import message_to_partition
from .kafka_utils import message_to_partition, MessageGroup
from .kafka_settings import KafkaOptions, DEFAULT_CONSUMER_SETTINGS
from .consumer_tracking import TrackingManager
from .config import ConsumerConfig
Expand Down Expand Up @@ -91,6 +91,8 @@ def __init__(
self.__retry_manager = RetryManager(config)
# Store information about pending retried messages
self.__schedule_cache = RetryScheduleCache()
# Store chunking information
self.__chunk_tracker = ChunkingCache()

@property
def _consumer(self) -> Consumer:
Expand Down Expand Up @@ -140,7 +142,7 @@ def __on_revoke(self, _: Consumer, partitions: list[TopicPartition]) -> None:
self.__tracking_manager.register_revoke(partitions)
self.__perform_commits()

def __ack_message(self, message: Message, finished_future: Future) -> None:
def __ack_message(self, message: MessageGroup, finished_future: Future) -> None:
"""
Private method only ever intended to be used from within
_process_message(). It commits offsets and releases
Expand All @@ -155,7 +157,7 @@ def __ack_message(self, message: Message, finished_future: Future) -> None:
if problem := finished_future.exception():
LOGGER.error(
"Message could not be processed! Message: %s.",
message.value(),
message.deserialize(),
exc_info=problem,
)
self.__retry_manager.resend_message(message)
Expand Down Expand Up @@ -212,20 +214,17 @@ def _process_message(self, message: Message) -> Future[Any] | None:
Returns: Future of the target execution if the message can be processed.
None otherwise.
"""
message_value = message.value()
if not message_value:
# Discard empty messages
message_group = self.__chunk_tracker.receive(message)
if not message_group:
return None
try:
message_data = json.loads(message_value)
except json.decoder.JSONDecodeError:
# This message cannot be deserialized, just log and discard it
LOGGER.exception("Decoding error: not a valid JSON: %s", message.value())
message_data = message_group.deserialize()
if not message_data:
self.__tracking_manager.schedule_commit(message_group)
return None
future = self._executor.submit(self._config.target, message_data)
self.__tracking_manager.process_message(message, future)
self.__tracking_manager.process_message(message_group, future)
# The semaphore is released within this callback
future.add_done_callback(lambda res: self.__ack_message(message, res))
future.add_done_callback(lambda res: self.__ack_message(message_group, res))
return future

### Public methods ###
Expand Down
Loading
Loading