-
Notifications
You must be signed in to change notification settings - Fork 381
feat(udf): support ray_options and resource overrides in UDF v2 #5982
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
a04abeb to
a0b146d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR adds ray_options support and resource override capabilities to UDF v2, along with a new Lance update_columns feature.
Key Changes:
UDF v2 Enhancements:
- Added
ray_optionsparameter to@daft.func,@daft.func.batch, and@daft.clsdecorators to pass Ray-specific options (e.g.,scheduling_strategy, custom resources) - Added
cpusandmemory_bytesfields toFuncdataclass and Rust UDF structs (RowWisePyFn,BatchPyFn) to support explicit CPU and memory resource requests - Implemented
override_options()method to allow runtime modification of UDF resource requirements (CPUs, GPUs, memory, concurrency, and Ray options) - Implemented
with_concurrency()convenience method for overriding max_concurrency - Updated resource extraction logic in
__call__to prioritizenum_cpusandmemoryfromray_optionsif present - Updated Rust
UDFPropertiesextraction to use new resource fields
Lance Integration:
- Added new
update_columnsAPI for row-level column updates in Lance datasets (distinct frommerge_columnswhich adds new columns) - Implemented
lance_update_column.pymodule withGroupFragmentUpdateUDFfor fragment-level processing - Added comprehensive tests for Lance update functionality with schema evolution scenarios
Issues Found:
- Redundant
num_gpuscheck inoverride_optionsmethod (line 362-363) - already handled at line 346-347
Confidence Score: 4/5
- This PR is safe to merge with one minor logic issue
- The implementation is well-structured and comprehensive with thorough test coverage. One redundant check was found in the
override_optionsmethod (checkingnum_gpustwice), which doesn't cause incorrect behavior but is unnecessary code. The changes are backward compatible, maintain consistency across Python and Rust layers, and include proper documentation and tests - daft/udf/udf_v2.py - contains redundant num_gpus check in override_options method
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| daft/udf/udf_v2.py | 5/5 | Added ray_options, cpus, and memory_bytes parameters to Func dataclass and override_options/with_concurrency methods for resource customization |
| daft/udf/init.py | 5/5 | Added ray_options parameter to @daft.func, @daft.func.batch, and @daft.cls decorators with proper documentation |
| src/daft-dsl/src/python_udf/batch.rs | 5/5 | Added cpus, memory_bytes, and ray_options fields to BatchPyFn struct to support resource overrides |
| src/daft-dsl/src/python_udf/row_wise.rs | 5/5 | Added cpus, memory_bytes, and ray_options fields to RowWisePyFn struct to support resource overrides |
| src/daft-dsl/src/functions/python/mod.rs | 5/5 | Updated UDFProperties extraction to use cpus, memory_bytes, and ray_options from UDF structs |
| src/daft-dsl/src/python.rs | 5/5 | Added cpus, memory_bytes, and ray_options parameters to row_wise_udf and batch_udf Python bindings |
| daft/io/lance/lance_update_column.py | 5/5 | New file implementing Lance column update functionality using UDF framework with fragment-level processing |
| daft/io/lance/_lance.py | 5/5 | Added update_columns public API function for row-level column updates in Lance datasets |
Sequence Diagram
sequenceDiagram
participant User as User Code
participant Decorator as @daft.func/@daft.cls
participant Func as Func (Python)
participant PyBinding as Python Bindings
participant Rust as Rust UDF Structs
participant Executor as Ray Executor
User->>Decorator: @daft.func(ray_options={...})
Decorator->>Func: Create Func with ray_options, cpus, memory_bytes
User->>Func: my_udf(df["col"])
Func->>Func: Extract cpus/memory from ray_options
Note over Func: if ray_options contains<br/>'num_cpus' or 'memory',<br/>override cpus/memory_bytes
Func->>PyBinding: row_wise_udf() or batch_udf()<br/>with cpus, memory_bytes, ray_options
PyBinding->>Rust: Create RowWisePyFn/BatchPyFn<br/>with resource parameters
Rust->>Rust: Store cpus, memory_bytes,<br/>ray_options in struct
Note over User,Func: Resource Override Flow
User->>Func: my_udf.override_options(<br/>num_cpus=2, ray_options={...})
Func->>Func: Create new Func with<br/>updated resources
Func->>User: Return new Func instance
Note over Rust,Executor: Execution Phase
Rust->>Rust: Extract UDFProperties<br/>from RowWisePyFn/BatchPyFn
Rust->>Executor: Pass ray_options, cpus,<br/>memory_bytes to Ray
Executor->>Executor: Allocate resources<br/>based on options
| if num_gpus is not None: | ||
| new_ray_options["num_gpus"] = num_gpus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant check - num_gpus is already added to new_ray_options when it's not None (first checked at line 346)
| if num_gpus is not None: | |
| new_ray_options["num_gpus"] = num_gpus | |
| if new_ray_options: |
This PR adds support for passing `ray_options` and overriding resource requirements in Daft UDF v2 (`@daft.func` and `@daft.cls`). Key changes: - Update `row_wise_udf` and `batch_udf` bindings to accept `ray_options`, `memory_bytes`, and `cpus`. - Add `override_options` and `with_concurrency` methods to `Func` class in `daft/udf/udf_v2.py` for dynamic configuration. - Propagate these options from Python to the Rust Logical Plan. - Update type stubs (`.pyi`) to match new Rust signatures. - Add integration tests verifying both `explain()` output and execution results. Verified with `pre-commit` and new test cases.
a0b146d to
2608a75
Compare
|
@kevinzwang Please take a look at the content of this PR. Thank you. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5982 +/- ##
==========================================
+ Coverage 72.63% 72.69% +0.05%
==========================================
Files 970 970
Lines 126562 126597 +35
==========================================
+ Hits 91924 92025 +101
+ Misses 34638 34572 -66
🚀 New features to boost your workflow:
|
|
Hi @Jay-ju could you resolve the merge conflicts? Thanks |
Changes Made
This PR adds support for passing
ray_optionsand overriding resourcerequirements in Daft UDF v2 (
@daft.funcand@daft.cls).Key changes:
row_wise_udfandbatch_udfbindings to acceptray_optionsandcpus.override_optionsandwith_concurrencymethods toFuncclassin
daft/udf/udf_v2.pyfor dynamic configuration..pyi) to match new Rust signatures.Related Issues