Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ This whole mechanism **does not ensure message ordering**. When a message is
sent to be retried, another message processing from the same topic is still
unblocked.

### Message splitting

This library also supports sending large messages to topics which don't have
the capacity to process these messages as whole. To do this, producers are
configurable to automatically split messages according to message size.

This feature is custom and is therefore turned off by default. The only place
this feature is always enabled are retry-topics, which are meant to be consumed
only by clients using this library.

The chunked messages have 3 additional headers:

* Group ID (uuid4 value)
* Chunk ID (serial number of the chunk within group, starting with 0)
* Number of chunks (is always +1 from the last chunk ID)

Message is deserialized and processed only if all expected chunks have been
found.

## Contributing guidelines

To check contributing guidelines, please check `CONTRIBUTING.md` in the
Expand Down
2 changes: 2 additions & 0 deletions src/retriable_kafka_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .orchestrate import consume_topics, ConsumerThread
from .producer import BaseProducer
from .health import HealthCheckClient
from .error import SendError

__all__ = (
"BaseConsumer",
Expand All @@ -16,4 +17,5 @@
"ProducerConfig",
"ConsumeTopicConfig",
"HealthCheckClient",
"SendError",
)
139 changes: 139 additions & 0 deletions src/retriable_kafka_client/chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Module for tracking chunked messages"""

from datetime import datetime, timedelta, timezone
import logging
import sys
import uuid

from confluent_kafka import Message

from retriable_kafka_client.headers import (
CHUNK_GROUP_HEADER,
NUMBER_OF_CHUNKS_HEADER,
CHUNK_ID_HEADER,
deserialize_number_from_bytes,
get_header_value,
)
from retriable_kafka_client.kafka_utils import MessageGroup, TrackingInfo

LOGGER = logging.getLogger(__name__)


def generate_group_id() -> bytes:
"""Generate a random group id."""
return uuid.uuid4().bytes
Comment thread
bclindner marked this conversation as resolved.


def calculate_header_size(
headers: dict[str, str | bytes] | list[tuple[str, str | bytes]] | None,
) -> int:
"""Approximate the space needed for headers within a message."""
if not headers:
return 0
result = 0
if not isinstance(headers, dict):
headers = dict(headers)
for header_key, header_value in headers.items():
result += len(header_key) + sys.getsizeof(header_value) + 8
# Two 32-bit numbers specifying lengths of fields are also
# present, therefore adding 8
return result


class MessageGroupBuilder:
"""Class for gathering data from message chunks."""

def __init__(self, max_wait_time: timedelta) -> None:
self._last_update_time = datetime.now(tz=timezone.utc)
self._max_wait_time = max_wait_time
self._messages: list[Message] = []
self.partition_info: TrackingInfo | None = None
self.group_id: bytes | None = None
self.full_length: int = 0
self._present_chunks: set[int] = set()

@staticmethod
def get_group_id_from_message(message: Message) -> bytes | None:
"""Get the group id from the message."""
return get_header_value(message, CHUNK_GROUP_HEADER)

@staticmethod
def get_chunk_id_from_message(message: Message) -> int | None:
"""Get the chunk id from the message."""
if header_value := get_header_value(message, CHUNK_ID_HEADER):
return deserialize_number_from_bytes(header_value)
return None

@staticmethod
def get_number_of_chunks_from_message(message: Message) -> int | None:
"""Get the number of chunks from the message."""
if header_value := get_header_value(message, NUMBER_OF_CHUNKS_HEADER):
return deserialize_number_from_bytes(header_value)
return None

def add(self, message: Message) -> None:
"""
Add a message to the message group builder.
Args:
message: The Kafka message to add.
"""
if (
(new_group_id := self.get_group_id_from_message(message)) is None
or (new_number_of_chunks := self.get_number_of_chunks_from_message(message))
is None
or ((new_chunk_id := self.get_chunk_id_from_message(message)) is None)
):
raise ValueError("The new message is missing required chunk headers!")
if self.group_id is None:
self.group_id = new_group_id
self.full_length = new_number_of_chunks
self.partition_info = TrackingInfo(
message.topic(), # type: ignore[arg-type]
message.partition(), # type: ignore[arg-type]
)
self._last_update_time = datetime.now(tz=timezone.utc)
self._messages.append(message)
self._present_chunks.add(new_chunk_id)

@property
def is_complete(self) -> bool:
"""Does this builder contain all the needed chunks?"""
if not self._messages:
return False
return all(i in self._present_chunks for i in range(self.full_length))

@property
def offsets(self) -> tuple[int, ...]:
"""Return the offsets of the chunks."""
return tuple(message.offset() for message in self._messages) # type: ignore[misc]

def is_still_valid(self) -> bool:
"""Isn't this builder stale? Useful for discarding corrupted data."""
return (
datetime.now(tz=timezone.utc) - self._last_update_time < self._max_wait_time
)

def get_message_group(self, allow_incomplete: bool = False) -> MessageGroup | None:
"""
Generate the message group object from the builder.
Args:
allow_incomplete: If true, this will return a messageGroup
object even if not all chunks have been gathered.
Returns: MessageGroup object if all required data is available,
None otherwise.
"""
if (
(not allow_incomplete and not self.is_complete)
or not self._messages
or not self.partition_info
):
return None
return MessageGroup(
topic=self.partition_info.topic,
partition=self.partition_info.partition,
messages={
self.get_chunk_id_from_message(message): message # type: ignore[misc]
for message in self._messages
},
group_id=self.group_id,
)
8 changes: 8 additions & 0 deletions src/retriable_kafka_client/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Types used in this library"""

from dataclasses import dataclass, field
from datetime import timedelta
from typing import Callable, Any

from confluent_kafka import Message
Expand Down Expand Up @@ -42,6 +43,7 @@ class ProducerConfig(CommonConfig):
retries: int = field(default=3)
fallback_factor: float = field(default=2.0)
fallback_base: float = field(default=5.0)
split_messages: bool = field(default=False)


@dataclass
Expand Down Expand Up @@ -84,10 +86,16 @@ class ConsumerConfig(CommonConfig):
Returns True if the message will be processed
or False if skipped. In case False or exception is returned,
message will be committed without processing.
max_chunk_reassembly_wait_time: Maximal time to wait for all the chunks
of the message to arrive. Has any effect only if chunking is enabled.
If some chunks are still waiting for reassembly after this threshold,
they are deleted and a warning is logged. This happens if the producer
crashed during producing of the chunked message, data cannot be salvaged.
"""

group_id: str
target: Callable[[dict[str, Any]], Any]
topics: list[ConsumeTopicConfig] = field(default_factory=list)
cancel_future_wait_time: float = field(default=30.0)
filter_function: Callable[[Message], bool] | None = field(default=None)
max_chunk_reassembly_wait_time: timedelta = field(default=timedelta(minutes=15))
33 changes: 17 additions & 16 deletions src/retriable_kafka_client/consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Base Kafka Consumer module"""

import json
import logging
import sys
from concurrent.futures import Executor, Future
Expand All @@ -10,7 +9,7 @@
from confluent_kafka import Consumer, Message, KafkaException, TopicPartition

from .health import perform_healthcheck_using_client
from .kafka_utils import message_to_partition
from .kafka_utils import message_to_partition, MessageGroup
from .kafka_settings import KafkaOptions, DEFAULT_CONSUMER_SETTINGS
from .consumer_tracking import TrackingManager
from .config import ConsumerConfig
Expand Down Expand Up @@ -85,7 +84,9 @@ def __init__(
self.__stop_flag: bool = False
# Store information about offsets and tasks
self.__tracking_manager = TrackingManager(
max_concurrency, config.cancel_future_wait_time
max_concurrency,
config.cancel_future_wait_time,
self._config.max_chunk_reassembly_wait_time,
)
# Manage re-sending messages to retry topics
self.__retry_manager = RetryManager(config)
Expand Down Expand Up @@ -140,7 +141,7 @@ def __on_revoke(self, _: Consumer, partitions: list[TopicPartition]) -> None:
self.__tracking_manager.register_revoke(partitions)
self.__perform_commits()

def __ack_message(self, message: Message, finished_future: Future) -> None:
def __ack_message(self, message: MessageGroup, finished_future: Future) -> None:
"""
Private method only ever intended to be used from within
_process_message(). It commits offsets and releases
Expand All @@ -155,12 +156,12 @@ def __ack_message(self, message: Message, finished_future: Future) -> None:
if problem := finished_future.exception():
LOGGER.error(
"Message could not be processed! Message: %s.",
message.value(),
message.deserialize(),
exc_info=problem,
)
self.__retry_manager.resend_message(message)
finally:
self.__tracking_manager.schedule_commit(message)
self.__tracking_manager.schedule_commit(message, release_semaphore=True)

def __graceful_shutdown(self) -> None:
"""
Expand Down Expand Up @@ -212,20 +213,20 @@ def _process_message(self, message: Message) -> Future[Any] | None:
Returns: Future of the target execution if the message can be processed.
None otherwise.
"""
message_value = message.value()
if not message_value:
# Discard empty messages
message_group = self.__tracking_manager.receive(message)
if not message_group:
return None
try:
message_data = json.loads(message_value)
except json.decoder.JSONDecodeError:
# This message cannot be deserialized, just log and discard it
LOGGER.exception("Decoding error: not a valid JSON: %s", message.value())
message_data = message_group.deserialize()
if not message_data:
# Semaphore was not acquired
self.__tracking_manager.schedule_commit(
message_group, release_semaphore=False
)
return None
future = self._executor.submit(self._config.target, message_data)
self.__tracking_manager.process_message(message, future)
self.__tracking_manager.process_message(message_group, future)
# The semaphore is released within this callback
future.add_done_callback(lambda res: self.__ack_message(message, res))
future.add_done_callback(lambda res: self.__ack_message(message_group, res))
return future

### Public methods ###
Expand Down
Loading
Loading