- 
                Notifications
    
You must be signed in to change notification settings  - Fork 328
 
feat: experimental vllm provider #5443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR adds an experimental vllm_prompt() function that enables optimized LLM inference using vLLM's prefix caching capabilities through a new execution path.
Key Changes
- New execution path: Adds 
PyExpr.vllm()that bypasses the standard UDF execution, creating a dedicatedVLLMProjectlogical plan node andVLLMSinkstreaming sink for optimized batch processing - Python layer: New 
VLLMExecutorclass manages anAsyncLLMEnginein a dedicated thread with async task submission and polling - Rust infrastructure: Comprehensive integration including new expression types (
VLLMExpr), optimization rules (SplitVLLM), and streaming sink refactoring to support iterative finalization - API surface: Adds 
prompt()function with vLLM provider support featuring configurable concurrency, buffer management, and batch sizing 
Critical Issues Found
- Data loss bug in 
VLLMSink.finalize()(src/daft-local-execution/src/streaming_sink/vllm.rs:297): Only processes the first worker state whenmax_concurrency > 1, silently dropping buffered data and running tasks from other workers - Race condition in 
VLLMExecutor.num_running_tasks()(daft/execution/vllm.py:106): Readsrunning_task_countwithout lock protection despite concurrent modifications 
Confidence Score: 1/5
- This PR has critical bugs that will cause data loss and race conditions in production
 - Score reflects two critical logic errors: the 
VLLMSink.finalize()method only processes the first worker state when concurrency > 1 (causing silent data loss), andVLLMExecutor.num_running_tasks()has an unprotected read of shared state (race condition). Both issues will cause incorrect behavior in production workloads. - Pay close attention to 
src/daft-local-execution/src/streaming_sink/vllm.rs(finalize method must handle all states) anddaft/execution/vllm.py(thread safety fix needed) 
Important Files Changed
File Analysis
| Filename | Score | Overview | 
|---|---|---|
| src/daft-local-execution/src/streaming_sink/vllm.rs | 1/5 | New VLLMSink implementation with critical bug in finalize() that loses data from concurrent workers | 
| daft/execution/vllm.py | 2/5 | New VLLMExecutor with async task management; has race condition in num_running_tasks() | 
| daft/functions/ai/init.py | 4/5 | Added prompt() vLLM support with PyExpr.vllm() optimization path; has inline import style issues | 
| src/daft-logical-plan/src/ops/vllm.rs | 5/5 | New VLLMProject logical plan node with schema generation and display methods | 
| src/daft-logical-plan/src/optimization/rules/split_vllm.rs | 5/5 | New optimization rule to extract VLLM expressions into separate nodes | 
| src/daft-local-execution/src/streaming_sink/base.rs | 4/5 | Refactored finalize to support iterative output via StreamingSinkFinalizeOutput enum | 
Sequence Diagram
sequenceDiagram
    participant User
    participant PromptFunc as prompt()
    participant PyExpr as PyExpr.vllm()
    participant LogicalPlan as VLLMProject
    participant Optimizer as SplitVLLM Rule
    participant LocalExec as VLLMSink
    participant VLLMExec as VLLMExecutor
    participant vLLM as AsyncLLMEngine
    User->>PromptFunc: prompt(messages, provider="vllm-prefix-cached")
    PromptFunc->>PromptFunc: Resolve VLLMPrefixCachedPrompterDescriptor
    PromptFunc->>PyExpr: vllm(model, concurrency, args...)
    PyExpr->>LogicalPlan: Create VLLMProject node
    LogicalPlan->>Optimizer: Optimization pass
    Optimizer->>Optimizer: SplitVLLM extracts VLLM expr to separate node
    Optimizer->>LocalExec: Translate to VLLMSink
    
    LocalExec->>VLLMExec: make_state() creates VLLMExecutor
    VLLMExec->>vLLM: Initialize AsyncLLMEngine in new thread
    
    loop For each input batch
        LocalExec->>LocalExec: Buffer input (max_buffer_size)
        LocalExec->>VLLMExec: submit(prompts, rows)
        VLLMExec->>vLLM: asyncio.run_coroutine_threadsafe(_generate)
        LocalExec->>VLLMExec: poll() for completed tasks
        VLLMExec-->>LocalExec: Return completed (outputs, rows)
        LocalExec-->>User: Stream results
    end
    
    LocalExec->>LocalExec: finalize(states) - drain remaining
    LocalExec->>VLLMExec: poll() until all tasks complete
    VLLMExec-->>LocalExec: Final results
    LocalExec-->>User: Final output batch
    Additional Comments (2)
- 
daft/functions/ai/__init__.py, line 263 (link)style: move import to top of file per custom style guide
Context Used: Rule from
dashboard- Import statements should be placed at the top of the file rather than inline within functions or met... (source) - 
daft/functions/ai/__init__.py, line 275 (link)style: move import to top of file per custom style guide
Context Used: Rule from
dashboard- Import statements should be placed at the top of the file rather than inline within functions or met... (source) 
46 files reviewed, 4 comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR introduces experimental VLLM provider support for daft.functions.ai.prompt() with async batching, currently only for the local executor. The implementation bypasses the standard UDF execution path by routing VLLM expressions through a custom streaming sink operator.
Major changes:
- New expression type: 
Expr::VLLMadded to DSL withVLLMExprstruct containing model config, concurrency settings, and buffer parameters - Streaming sink API enhancements: Modified 
StreamingSinktrait to support iterative finalization viaStreamingSinkFinalizeOutputenum (allowing sinks to returnHasMoreOutputfor async processing) and mademake_state()fallible - New VLLMProject plan node: Added throughout logical plan, local plan, and pipeline layers with proper optimizer rule integration
 - Python executor: 
VLLMExecutormanages dedicated event loop thread for async vLLM engine with proper locking - Execution restrictions: Correctly blocked for Ray/distributed execution with 
todo!()andNotImplementederrors 
API Impact:
All existing streaming sinks updated to new trait signature (make_state() returns DaftResult, finalize() returns StreamingSinkFinalizeOutput). Changes are mechanical and maintain existing behavior.
Known limitations (per PR description):
- Only one VLLM expression per projection supported
 - Local executor only
 - Prefix bucketing/routing not yet implemented
 
Confidence Score: 4/5
- Safe to merge with minor caveats - streaming sink API changes are well-structured and VLLM implementation is properly isolated
 - The streaming sink API changes are cleanly implemented and all existing sinks have been updated correctly. The VLLM implementation is experimental and properly restricted to local execution. One point deducted because the SplitVLLM optimization rule's hardcoded column name could cause conflicts, and the interaction with SplitUDFs rule needs verification
 - src/daft-logical-plan/src/optimization/rules/split_vllm.rs - verify interaction with SplitUDFs rule and potential column name conflicts with "daft_vllm_output"
 
Important Files Changed
File Analysis
| Filename | Score | Overview | 
|---|---|---|
| src/daft-local-execution/src/streaming_sink/base.rs | 5/5 | Enhanced streaming sink API to support iterative finalization with StreamingSinkFinalizeOutput enum and made make_state() fallible | 
| src/daft-local-execution/src/streaming_sink/vllm.rs | 4/5 | New VLLM streaming sink implementation with async batching and buffer management, correctly set max_concurrency() to 1 | 
| daft/execution/vllm.py | 5/5 | Python VLLMExecutor with dedicated event loop thread, proper locking on shared state, and async batch submission | 
| src/daft-dsl/src/expr/mod.rs | 5/5 | Added new VLLM expression variant with proper semantic ID, display, and visitor integration | 
| src/daft-logical-plan/src/ops/vllm.rs | 5/5 | New VLLMProject logical plan node with proper schema handling and stats state management | 
| src/daft-logical-plan/src/optimization/rules/split_vllm.rs | 4/5 | Optimizer rule to extract VLLM expressions from projections into dedicated VLLMProject nodes, currently supports one VLLM expr per project | 
| src/daft-logical-plan/src/logical_plan.rs | 5/5 | Integrated VLLMProject into logical plan enum with proper schema, stats, and child handling | 
| daft/functions/ai/init.py | 5/5 | Updated prompt() to detect VLLMPrefixCachedPrompterDescriptor and route to PyExpr.vllm() instead of UDF execution | 
Sequence Diagram
sequenceDiagram
    participant User
    participant prompt() as daft.functions.ai.prompt()
    participant PyExpr as PyExpr.vllm()
    participant Optimizer as Logical Plan Optimizer
    participant Pipeline as Pipeline Builder
    participant VLLMSink as VLLMSink (Rust)
    participant VLLMExecutor as VLLMExecutor (Python)
    participant AsyncEngine as vLLM AsyncLLMEngine
    User->>prompt(): prompt(messages, provider="vllm-prefix-cached")
    prompt()->>prompt(): Detect VLLMPrefixCachedPrompterDescriptor
    prompt()->>PyExpr: .vllm(model, concurrency, buffer_size, ...)
    PyExpr->>Optimizer: Create Expr::VLLM in logical plan
    
    Optimizer->>Optimizer: SplitVLLM rule extracts VLLM expr
    Optimizer->>Optimizer: Create VLLMProject logical plan node
    
    Pipeline->>VLLMSink: Translate to StreamingSink
    
    loop For each input batch
        VLLMSink->>VLLMSink: Buffer input until max_buffer_size
        VLLMSink->>VLLMExecutor: submit(prompts, rows)
        VLLMExecutor->>AsyncEngine: asyncio.run_coroutine_threadsafe(_generate())
        AsyncEngine-->>VLLMExecutor: Stream completions to completed_tasks queue
        VLLMSink->>VLLMExecutor: poll() for completed tasks
        VLLMExecutor-->>VLLMSink: Return (outputs, rows) or None
        VLLMSink-->>Pipeline: Return output with NeedMoreInput/HasMoreOutput
    end
    
    Pipeline->>VLLMSink: finalize(states)
    loop Until all tasks complete
        VLLMSink->>VLLMSink: Submit remaining buffered tasks
        VLLMSink->>VLLMExecutor: poll() for results
        alt Tasks still running
            VLLMSink-->>Pipeline: HasMoreOutput with partial results
        else All complete
            VLLMSink-->>Pipeline: Finished with final results
        end
    end
    46 files reviewed, 1 comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on just leveraging async udfs instead of making dedicated logical + physical ops for vllm?
This PR here #5451 makes a streaming sink for async udfs that i think can also work with vllm. The idea is the same as what you have here, but it uses a joinset as the async task pool. The only thing missing is that you have max_buffer_size and max_running_tasks params here, but you can also just control that with the joinset, i.e. if limit is reached force a join_next.await
          
 It's true that what's implemented here can probably be done with async UDFs, but this is just the first step in our work on prefix routing for vllm. We will need to add additional logic to the swordfish side to allow for bucketing the buffer by prefix to emit out, which is why I have this buffer. There's also additional work on the Flotilla side for routing that will require a distributed operator  | 
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, just a couple of nits.
## Changes Made <!-- Describe what changes were made and why. Include implementation details if necessary. --> ## Related Issues <!-- Link to related GitHub issues, e.g., "Closes #123" --> ## Checklist - [ ] Documented in API Docs (if applicable) - [ ] Documented in User Guide (if applicable) - [ ] If adding a new documentation page, doc is added to `docs/mkdocs.yml` navigation - [ ] Documentation builds and is formatted properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR adds experimental support for vLLM-based LLM inference with prefix caching and async batching optimization. When using VLLMPrefixCachedProvider, the prompt function creates a VLLMExpr instead of a UDF, which gets optimized into a custom streaming sink operator.
Key Changes:
- New 
VLLMSinkstreaming sink with prefix bucketing and async batching SplitVLLMoptimizer rule extracts VLLM expressions into dedicatedVLLMProjectnodesLocalVLLMExecutorandRemoteVLLMExecutorhandle local and distributed (Ray) executionPrefixRouterload balances requests across multiple Ray actors based on prefix similarity- Integration in 
daft.functions.ai.prompt()detects vLLM provider and bypasses standard UDF path 
Architecture:
The implementation uses a streaming sink pattern where prompts are buffered, sorted by prefix similarity, bucketed together, and submitted to vLLM's async engine. Results are polled and returned incrementally. For distributed mode, multiple Ray actors are spawned with a prefix-aware router.
Confidence Score: 4/5
- Safe to merge as experimental feature with minor style improvements needed
 - The implementation is well-structured with proper async handling and the critical 
max_concurrencybug from previous comments is fixed. One inline import violates project style guidelines. The experimental nature is clearly documented. - daft/execution/vllm.py has inline import that should be moved to top of file
 
Important Files Changed
File Analysis
| Filename | Score | Overview | 
|---|---|---|
| daft/execution/vllm.py | 4/5 | Implements VLLMExecutor classes for local, blocking, and distributed execution with async batching and prefix routing | 
| src/daft-local-execution/src/streaming_sink/vllm.rs | 5/5 | VLLMSink implementation with prefix bucketing logic, correctly sets max_concurrency to 1 | 
| daft/ai/vllm/provider.py | 5/5 | Simple provider class for vLLM prefix caching | 
| daft/ai/vllm/protocols/prompter.py | 5/5 | PrompterDescriptor configuration for vLLM with prefix caching parameters | 
| src/daft-logical-plan/src/ops/vllm.rs | 5/5 | VLLMProject logical plan node definition | 
| src/daft-logical-plan/src/optimization/rules/split_vllm.rs | 5/5 | Optimizer rule to extract VLLM expressions from projections into dedicated VLLMProject nodes | 
| src/daft-distributed/src/pipeline_node/vllm.rs | 5/5 | Distributed execution node for VLLM with Ray actors initialization | 
| daft/functions/ai/init.py | 5/5 | Updated prompt function to detect and use vLLM provider via PyExpr.vllm() instead of UDF path | 
Sequence Diagram
sequenceDiagram
    participant User as User Code
    participant Prompt as daft.functions.ai.prompt()
    participant Optimizer as SplitVLLM Rule
    participant Sink as VLLMSink
    participant Executor as LocalVLLMExecutor
    participant VLLM as vLLM AsyncEngine
    
    User->>Prompt: prompt(col("text"), provider=vllm)
    Prompt->>Prompt: Detect VLLMPrefixCachingPrompterDescriptor
    Prompt->>Prompt: Create VLLMExpr (not UDF)
    Prompt-->>User: Return Expression
    
    User->>User: Execute dataframe operation
    
    Note over Optimizer: Logical Plan Optimization
    Optimizer->>Optimizer: Extract VLLMExpr from Project
    Optimizer->>Optimizer: Create VLLMProject node
    
    Note over Sink: Physical Execution
    Sink->>Sink: Buffer incoming data
    Sink->>Sink: Sort by prefix similarity
    Sink->>Sink: Bucket prompts by prefix
    Sink->>Executor: submit(prefix, prompts, rows)
    Executor->>VLLM: Generate async (streaming)
    VLLM-->>Executor: Yield outputs
    Executor->>Executor: Store completed results
    Sink->>Executor: poll()
    Executor-->>Sink: Return (outputs, rows)
    Sink-->>User: Yield results incrementally
    58 files reviewed, 1 comment
Changes Made
Adds the experimental
VLLMPrefixCachedProviderfordaft.functions.ai.prompt. Does async batching and prefix routing.When using the
VLLMPrefixCachedProvider,promptwill create aVLLMExprinstead of a UDF, which Daft will turn into a custom VLLM operator. This operator is implemented as a streaming sink, and I had to make some minor changes to our streaming sink APIs to make the async batching mechanism work.Related Issues
Checklist
docs/mkdocs.ymlnavigation