Skip to content

Conversation

@everySympathy
Copy link
Contributor

@everySympathy everySympathy commented Jan 7, 2026

Changes Made

Added a bounded Kafka batch read API via daft.read_kafka , supporting start / end bounds expressed as:

  • "earliest" / "latest"
  • timestamp_ms (int), datetime , and ISO-8601 strings
  • per-partition offset maps ( {partition: offset} for single-topic; {topic: {partition: offset}} for multi-topic)

Related Issues

Closes #4603

@github-actions github-actions bot added the feat label Jan 7, 2026
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 7, 2026

Greptile Summary

This PR implements a bounded Kafka datasource that enables batch-style reads from Apache Kafka topics, addressing issue #4603. The implementation spans Rust (scan operator and streaming consumer) and Python (public API with comprehensive bound normalization).

Key Changes:

  • Adds KafkaScanOperator in src/daft-scan/src/kafka.rs with support for multiple bound types (earliest/latest/timestamp/explicit offsets)
  • Implements streaming Kafka consumer in src/daft-local-execution/src/sources/scan_task.rs with proper limit pushdown and timeout handling
  • Provides ergonomic Python API in daft/io/_kafka.py with datetime/ISO-8601 support and Spark-style partition offset syntax
  • Includes broker-free unit tests that validate API contracts without requiring Kafka infrastructure
  • Adds rdkafka 0.36 dependency with tokio and cmake-build features

Implementation Quality:

  • Proper error handling with external error wrapping
  • Comprehensive bound validation and normalization in Python layer
  • Limit pushdown correctly implemented in streaming consumer
  • Schema is fixed (topic, partition, offset, timestamp_ms, key, value) with appropriate types

Issue Found:

  • can_absorb_limit() returns false in kafka.rs:189 but limit pushdown is actually implemented in the streaming consumer (scan_task.rs:555-625), which prevents query optimization from pushing limits down to the scan operator

Confidence Score: 4/5

  • Safe to merge with one logical issue that affects query optimization but not correctness
  • The implementation is well-structured with proper error handling, comprehensive validation, and good test coverage. The limit pushdown functionality is correctly implemented in the consumer but incorrectly advertised in the scan operator capability flag, which affects performance optimization but not correctness. All other aspects (API design, bound resolution, schema handling) are sound.
  • Pay close attention to src/daft-scan/src/kafka.rs - the can_absorb_limit() return value needs correction

Important Files Changed

Filename Overview
src/daft-scan/src/kafka.rs New Kafka scan operator with bounded read support; implements offset/timestamp resolution and scan task generation; can_absorb_limit() returns false but limit pushdown is implemented in scan_task.rs
src/daft-local-execution/src/sources/scan_task.rs Adds Kafka streaming consumer with proper limit pushdown, timeout handling, and batch building; refactors existing file-based streaming into else branch
daft/io/_kafka.py Python API for bounded Kafka reads with comprehensive bound normalization (timestamps, offsets, earliest/latest); thorough validation for topics and partition offsets
src/daft-scan/src/python.rs Adds Python binding for kafka_scan_bounded with all necessary parameters; properly exposes KafkaScanOperator to Python
tests/test_kafka_read.py Well-designed broker-free unit tests for API validation; tests export, validation, and normalization without requiring actual Kafka infrastructure

Sequence Diagram

sequenceDiagram
    participant User
    participant Python API
    participant KafkaScanOperator
    participant BaseConsumer
    participant StreamConsumer
    participant Kafka Broker
    
    User->>Python API: read_kafka(bootstrap_servers, topics, start, end)
    Python API->>Python API: Normalize bounds (timestamps, offsets)
    Python API->>Python API: Validate topics, partitions, timeouts
    Python API->>KafkaScanOperator: kafka_scan_bounded()
    
    KafkaScanOperator->>BaseConsumer: create(config)
    BaseConsumer->>Kafka Broker: connect
    
    loop For each topic
        KafkaScanOperator->>BaseConsumer: fetch_metadata(topic)
        BaseConsumer->>Kafka Broker: metadata request
        Kafka Broker-->>BaseConsumer: partition list
        
        loop For each partition
            KafkaScanOperator->>BaseConsumer: fetch_watermarks(topic, partition)
            BaseConsumer->>Kafka Broker: watermark request
            Kafka Broker-->>BaseConsumer: low/high offsets
            
            alt timestamp_ms bound
                KafkaScanOperator->>BaseConsumer: offsets_for_times(timestamp)
                BaseConsumer->>Kafka Broker: offset lookup
                Kafka Broker-->>BaseConsumer: resolved offset
            end
            
            KafkaScanOperator->>KafkaScanOperator: resolve_bound(start/end)
            KafkaScanOperator->>KafkaScanOperator: create ScanTask
        end
    end
    
    KafkaScanOperator-->>User: ScanTasks (DataFrame plan)
    
    User->>User: .collect() / .show()
    
    loop For each ScanTask
        User->>StreamConsumer: create(config)
        StreamConsumer->>StreamConsumer: assign(topic, partition)
        StreamConsumer->>StreamConsumer: seek(start_offset)
        
        loop While not reached end_offset or limit
            StreamConsumer->>Kafka Broker: recv()
            Kafka Broker-->>StreamConsumer: message
            StreamConsumer->>StreamConsumer: check offset bounds
            StreamConsumer->>StreamConsumer: build batch (chunk_size)
            
            alt Limit reached or end_offset
                StreamConsumer->>StreamConsumer: stop streaming
            end
        end
        
        StreamConsumer-->>User: RecordBatch stream
    end
    
    User->>User: MicroPartition results
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. src/daft-scan/src/kafka.rs, line 189-191 (link)

    logic: can_absorb_limit() returns false, but the Kafka consumer in scan_task.rs:555-625 implements limit pushdown by tracking remaining and stopping early. This should return true to enable query optimization.

12 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@codecov
Copy link

codecov bot commented Jan 7, 2026

Codecov Report

❌ Patch coverage is 41.96676% with 419 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.78%. Comparing base (fb45faf) to head (97bc1e0).
⚠️ Report is 13 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-scan/src/kafka.rs 20.00% 196 Missing ⚠️
src/daft-local-execution/src/sources/scan_task.rs 41.63% 164 Missing ⚠️
src/daft-scan/src/lib.rs 3.77% 51 Missing ⚠️
daft/io/_kafka.py 93.20% 7 Missing ⚠️
src/daft-scan/src/python.rs 97.43% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5970      +/-   ##
==========================================
+ Coverage   72.63%   72.78%   +0.15%     
==========================================
  Files         970      972       +2     
  Lines      126562   127542     +980     
==========================================
+ Hits        91924    92829     +905     
- Misses      34638    34713      +75     
Files with missing lines Coverage Δ
daft/__init__.py 86.66% <ø> (ø)
daft/io/__init__.py 100.00% <100.00%> (ø)
src/daft-scan/src/python.rs 64.52% <97.43%> (+2.78%) ⬆️
daft/io/_kafka.py 93.20% <93.20%> (ø)
src/daft-scan/src/lib.rs 70.94% <3.77%> (-3.59%) ⬇️
src/daft-local-execution/src/sources/scan_task.rs 58.64% <41.63%> (-16.57%) ⬇️
src/daft-scan/src/kafka.rs 20.00% <20.00%> (ø)

... and 71 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@everySympathy everySympathy marked this pull request as draft January 8, 2026 02:47
@everySympathy everySympathy force-pushed the kafka-bounded-read branch 2 times, most recently from 083a34d to 3174030 Compare January 8, 2026 03:34
@everySympathy everySympathy marked this pull request as ready for review January 8, 2026 13:03
@kevinzwang
Copy link
Member

@desmondcheongzx tagging you on this one!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Kafka Data Source Support for Streaming Data Processing

2 participants