Rust-based, Arrow-powered Python Kafka client for high-throughput data pipelines.
Python's Global Interpreter Lock (GIL) and memory management create bottlenecks when consuming high-volume Kafka streams. Traditional Python Kafka clients process messages one at a time, requiring serialization/deserialization overhead for each message and limiting throughput.
kafkars solves this by:
- Rust core: All Kafka operations (polling, buffering, ordering) happen in Rust, bypassing the GIL
- Batch processing: Messages are accumulated and returned as Apache Arrow RecordBatches, not individual Python objects
- Zero-copy where possible: Arrow's columnar format enables efficient data transfer between Rust and Python
- Vectorized operations: Process thousands of messages at once with pandas, polars, or any Arrow-compatible library
This architecture is ideal for:
- Real-time analytics pipelines
- ML feature stores consuming from Kafka
- High-volume event processing
- Data lake ingestion
kafkars does not commit offsets. It is designed for analytics and high-throughput batch processing, not transactional workloads.
- No exactly-once semantics: Messages may be reprocessed if your application restarts
- No offset tracking: You control where to start reading via offset policies
- Stateless consumers: Each consumer instance starts fresh based on the configured policy
If you need exactly-once processing, transactional guarantees, or automatic offset management, use a traditional Kafka client like confluent-kafka-python.
- Ordered delivery: Messages released in timestamp order across all partitions
- Flexible offset policies: Start from earliest, latest, or any timestamp
- Backpressure management: Automatically pauses fast partitions to prevent memory overflow
- Arrow-native output: Returns PyArrow RecordBatch for efficient downstream processing
pip install kafkarsfrom kafkars import ConsumerManager, SourceTopic
# Define source topics with offset policies
topics = [
SourceTopic.from_earliest("events"),
SourceTopic.from_relative_time("metrics", 3600_000), # 1 hour ago
]
# Create consumer
manager = ConsumerManager(
config={"bootstrap.servers": "localhost:9092"},
topics=topics,
)
# Poll returns PyArrow RecordBatch
while True:
batch = manager.poll(timeout_ms=1000)
if batch.num_rows > 0:
# Convert to pandas/polars for processing
df = batch.to_pandas()
print(f"Received {len(df)} messages")
if manager.is_live():
print("Caught up to real-time")
breakEach poll returns a RecordBatch with the following schema:
| Column | Type | Description |
|---|---|---|
key |
binary |
Message key (nullable) |
value |
binary |
Message payload (nullable) |
topic |
utf8 |
Source topic name |
partition |
int32 |
Partition number |
offset |
int64 |
Message offset |
timestamp |
timestamp[ms, UTC] |
Message timestamp |
| Policy | Description |
|---|---|
from_earliest(topic) |
Start from the beginning |
from_latest(topic) |
Start from the end (new messages only) |
from_relative_time(topic, ms) |
Start from N milliseconds ago |
from_absolute_time(topic, ms) |
Start from specific Unix timestamp |
Full documentation is available at kafkars.readthedocs.io.
Apache 2.0