Skip to content

Conversation

@universalmind303
Copy link
Member

Changes Made

I was trying to use async udfs the other day and noticed that there's no way to enable dynamic batching for them.

Related Issues

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 5, 2026

Greptile Summary

This PR adds dynamic batching configuration support to async UDFs by changing the StreamingSink::batching_strategy() trait method to return a DaftResult, allowing implementations to conditionally use dynamic batching based on execution context configuration.

Key Changes:

  • Modified batching_strategy() trait method signature from returning Self::BatchingStrategy to DaftResult<Self::BatchingStrategy> in base.rs
  • Updated AsyncUdfSink to use DynBatchingStrategy type and conditionally select between LatencyConstrainedBatchingStrategy and StaticBatchingStrategy based on config
  • Updated all other streaming sink implementations to wrap their static strategies in Ok() to match the new signature
  • Added proper error context handling in base.rs:357 using PipelineExecutionSnafu

Issues Found:

  • The dynamic batching logic in async_udf.rs will fail for scalar UDFs (which have Strict(1) requirement) and UDFs with explicit batch_size (which have Strict(batch_size) requirement), as these don't match the Flexible pattern required for dynamic batching. This severely limits the feature's applicability.

Confidence Score: 2/5

  • This PR contains a critical logic error that will cause runtime failures for common UDF use cases when dynamic batching is enabled
  • The implementation has a significant bug where scalar UDFs and UDFs with explicit batch sizes will throw errors when dynamic batching is enabled. Given that scalar UDFs are common (line 374-376 shows special handling for them), this will likely break existing workloads. The TODO comment suggests the author was aware of the limitation but the error-throwing behavior makes it a breaking issue rather than a graceful degradation.
  • src/daft-local-execution/src/streaming_sink/async_udf.rs requires immediate attention to fix the dynamic batching logic for scalar UDFs and batch-size-constrained UDFs

Important Files Changed

Filename Overview
src/daft-local-execution/src/streaming_sink/async_udf.rs Added dynamic batching support for async UDFs, but contains a critical logic error that will prevent scalar UDFs and batch-size-constrained UDFs from using dynamic batching
src/daft-local-execution/src/streaming_sink/base.rs Changed batching_strategy() signature to return DaftResult and added proper error context handling

Sequence Diagram

sequenceDiagram
    participant Client as Client Code
    participant Node as StreamingSinkNode
    participant Sink as AsyncUdfSink
    participant Config as ExecutionConfig
    participant BatchMgr as BatchManager
    participant Worker as Worker Thread

    Client->>Node: start()
    Node->>Sink: batching_strategy()
    Sink->>Config: get_context().execution_config()
    Config-->>Sink: {enable_dynamic_batching, dynamic_batching_strategy}
    
    alt enable_dynamic_batching = true
        Sink->>Sink: morsel_size_requirement()
        alt Returns Flexible(min, max)
            Sink-->>Node: Ok(LatencyConstrainedBatchingStrategy)
        else Returns Strict(size)
            Sink-->>Node: Err("cannot use strict batch size...")
        end
    else enable_dynamic_batching = false
        Sink-->>Node: Ok(StaticBatchingStrategy)
    end
    
    Node->>BatchMgr: new(strategy)
    Node->>Worker: spawn_workers()
    Worker->>BatchMgr: get batches according to strategy
    Worker->>Sink: execute(input, state)
    Sink-->>Worker: StreamingSinkOutput
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines +390 to +397
let reqs = self.morsel_size_requirement().unwrap_or_default();
let MorselSizeRequirement::Flexible(min_batch_size, max_batch_size) = reqs
else {
return Err(DaftError::ValueError(
"cannot use strict batch size requirement with dynamic batching"
.to_string(),
));
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: This logic will reject scalar UDFs and UDFs with explicit batch sizes when dynamic batching is enabled. Looking at morsel_size_requirement() (lines 368-381), it returns Strict(1) for scalar UDFs and Strict(batch_size) when the user specifies a batch size. These will fail the Flexible pattern match and throw an error.

This means dynamic batching cannot be used with:

  • Scalar UDFs (which return Strict(1))
  • UDFs with explicit batch_size parameter (which return Strict(batch_size))

Only non-scalar UDFs without a batch_size will work with dynamic batching. Is this intentional? The TODO comment on line 389 suggests this limitation was known, but the error message makes it seem like a hard failure rather than a graceful fallback.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/daft-local-execution/src/streaming_sink/async_udf.rs
Line: 390:397

Comment:
**logic:** This logic will reject scalar UDFs and UDFs with explicit batch sizes when dynamic batching is enabled. Looking at `morsel_size_requirement()` (lines 368-381), it returns `Strict(1)` for scalar UDFs and `Strict(batch_size)` when the user specifies a batch size. These will fail the `Flexible` pattern match and throw an error.

This means dynamic batching cannot be used with:
- Scalar UDFs (which return `Strict(1)`)
- UDFs with explicit `batch_size` parameter (which return `Strict(batch_size)`)

Only non-scalar UDFs without a batch_size will work with dynamic batching. Is this intentional? The TODO comment on line 389 suggests this limitation was known, but the error message makes it seem like a hard failure rather than a graceful fallback.

How can I resolve this? If you propose a fix, please make it concise.

@codecov
Copy link

codecov bot commented Jan 5, 2026

Codecov Report

❌ Patch coverage is 50.98039% with 25 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.60%. Comparing base (e06cbec) to head (27ec86c).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
...ft-local-execution/src/streaming_sink/async_udf.rs 18.18% 18 Missing ⚠️
...rc/daft-local-execution/src/streaming_sink/vllm.rs 0.00% 4 Missing ⚠️
...cution/src/streaming_sink/sort_merge_join_probe.rs 0.00% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5919      +/-   ##
==========================================
+ Coverage   72.52%   72.60%   +0.08%     
==========================================
  Files         970      970              
  Lines      126303   126526     +223     
==========================================
+ Hits        91598    91866     +268     
+ Misses      34705    34660      -45     
Files with missing lines Coverage Δ
...on/src/streaming_sink/anti_semi_hash_join_probe.rs 88.20% <100.00%> (+0.05%) ⬆️
...rc/daft-local-execution/src/streaming_sink/base.rs 92.27% <100.00%> (+0.06%) ⬆️
.../daft-local-execution/src/streaming_sink/concat.rs 91.42% <100.00%> (+0.25%) ⬆️
...c/daft-local-execution/src/streaming_sink/limit.rs 98.75% <100.00%> (+0.01%) ⬆️
.../src/streaming_sink/monotonically_increasing_id.rs 94.93% <100.00%> (+0.06%) ⬆️
...cution/src/streaming_sink/outer_hash_join_probe.rs 93.39% <100.00%> (+0.01%) ⬆️
.../daft-local-execution/src/streaming_sink/sample.rs 93.17% <100.00%> (+0.05%) ⬆️
...cution/src/streaming_sink/sort_merge_join_probe.rs 0.00% <0.00%> (ø)
...rc/daft-local-execution/src/streaming_sink/vllm.rs 0.00% <0.00%> (ø)
...ft-local-execution/src/streaming_sink/async_udf.rs 81.29% <18.18%> (-5.95%) ⬇️

... and 14 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@universalmind303
Copy link
Member Author

so I think we actually need a different batching strategy for async udfs. Since if they are scalar we set the morsel size to 1. cc @colin-ho any suggestions here?

@colin-ho
Copy link
Collaborator

colin-ho commented Jan 6, 2026

Oh oops i didn't see your comment.

But i think its fine actually. I think the cases are

  • Scalar async. No ability to set batch size anyway, use dynamic batching, with min = 1 and max = default_batch_size.
  • Batch async. Use dynamic batching if batch size is not specified, min = 1 and max = default_batch_size.

For scalar previously we always fix to 1, with dynamic batching we just start at 1 and can increase to hit latency

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants