Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1d05985
fix(flotilla): wire min_cpu_per_task into TaskResourceRequest
XiaoHongbo-Hope Jun 13, 2026
034ec2b
test: cover min_cpu_per_task wiring; clarify field doc
XiaoHongbo-Hope Jun 13, 2026
a0f07e5
fix(flotilla): keep min_cpu_per_task default at 1.0 (no-regression)
XiaoHongbo-Hope Jun 14, 2026
2b7ce31
test(flotilla): reuse builder's min_cpu_per_task in with_resource_req…
XiaoHongbo-Hope Jun 15, 2026
ab1e67a
fix(flotilla): apply min_cpu_per_task as floor and reject non-positiv…
XiaoHongbo-Hope Jun 17, 2026
48a4d2a
fix(clippy): collapse let-chain in min_cpu_per_task env guard
XiaoHongbo-Hope Jun 17, 2026
ebdf031
Merge branch 'main' into fix/min-cpu-per-task-wiring
XiaoHongbo-Hope Jun 19, 2026
c36c458
fix(plan): leave num_cpus unset so min_cpu_per_task can lower task CPU
XiaoHongbo-Hope Jun 19, 2026
cd6ea1f
docs: align min_cpu_per_task docstring with fallback (not floor) sema…
XiaoHongbo-Hope Jun 20, 2026
710c932
fix(autoscale): use TaskResourceRequest wrappers so min_cpu_per_task …
XiaoHongbo-Hope Jun 20, 2026
f0ee20c
fix(flotilla): send fractional CPU to Ray autoscaler and reject non-f…
XiaoHongbo-Hope Jun 20, 2026
ad2f528
fix(flotilla): aggregate fractional CPU into integer Ray autoscaler b…
XiaoHongbo-Hope Jun 20, 2026
4620acb
fix(flotilla): keep multi-CPU tasks as individual autoscaler bundles
XiaoHongbo-Hope Jun 20, 2026
5303372
fix(flotilla): track post-aggregation request as autoscaler high-wate…
XiaoHongbo-Hope Jun 20, 2026
eb091a3
fix(resource-request): reject non-finite and negative num_cpus/num_gpus
XiaoHongbo-Hope Jun 21, 2026
d6f11a1
fix(flotilla): pack fractional GPU into integer autoscaler bundles
XiaoHongbo-Hope Jun 21, 2026
467f0b7
fix(flotilla): carry GPU tasks' CPU into packed autoscaler bundles
XiaoHongbo-Hope Jun 21, 2026
5f25d91
refactor(daft-config): share min_cpu_per_task validity rule
XiaoHongbo-Hope Jun 21, 2026
953b6be
fix(flotilla): emit unit GPU autoscaler bundles, not oversized shapes
XiaoHongbo-Hope Jun 21, 2026
1483cee
docs(flotilla): tighten autoscaler bundle comments
XiaoHongbo-Hope Jun 21, 2026
27f15c2
fix(flotilla): honor explicit num_cpus=0 in autoscaler bundles
XiaoHongbo-Hope Jun 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def set_execution_config(
pre_shuffle_merge_partition_threshold: Number of partitions threshold to enable pre-shuffle merge when shuffle_algorithm is "auto". Defaults to 200.
scantask_max_parallel: Set the max parallelism for running scan tasks simultaneously. Currently, this only works for Native Runner. If set to 0, all available CPUs will be used. Defaults to 8.
native_parquet_writer: Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to `True`.
min_cpu_per_task: Minimum CPU per task in the Ray runner. Defaults to 0.5.
min_cpu_per_task: Default CPU per task when a plan does not specify num_cpus. Explicit num_cpus on a plan passes through unchanged. Used by the flotilla scheduler for autoscaler bundle requests. Must be > 0. Defaults to 1.0.
actor_udf_ready_timeout: Timeout for UDF actors to be ready. Defaults to 120 seconds.
maintain_order: Whether to maintain order during execution. Defaults to True. Some blocking sink operators (e.g. write_parquet) won't respect this flag and will always keep maintain_order as false, and propagate to child operators. It's useful to set this to False for running df.collect() when no ordering is required.
enable_dynamic_batching: Whether to enable dynamic batching. Defaults to False.
Expand Down
46 changes: 42 additions & 4 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl Default for DaftExecutionConfig {
pre_shuffle_merge_partition_threshold: 200,
scantask_max_parallel: 8,
native_parquet_writer: true,
min_cpu_per_task: 0.5,
min_cpu_per_task: 1.0,
actor_udf_ready_timeout: 120,
maintain_order: true,
enable_dynamic_batching: false,
Expand All @@ -209,6 +209,11 @@ impl DaftExecutionConfig {
const ENV_DAFT_SCANTASK_MAX_PARALLEL: &'static str = "DAFT_SCANTASK_MAX_PARALLEL";
const ENV_DAFT_NATIVE_PARQUET_WRITER: &'static str = "DAFT_NATIVE_PARQUET_WRITER";
const ENV_DAFT_MIN_CPU_PER_TASK: &'static str = "DAFT_MIN_CPU_PER_TASK";

/// Single validity rule (finite, positive) shared by the env and Python setter.
pub(crate) fn is_valid_min_cpu_per_task(value: f64) -> bool {
value.is_finite() && value > 0.0
}
const ENV_DAFT_ACTOR_UDF_READY_TIMEOUT: &'static str = "DAFT_ACTOR_UDF_READY_TIMEOUT";
const ENV_PARQUET_INFLATION_FACTOR: &'static str = "DAFT_PARQUET_INFLATION_FACTOR";
const ENV_CSV_INFLATION_FACTOR: &'static str = "DAFT_CSV_INFLATION_FACTOR";
Expand Down Expand Up @@ -241,7 +246,16 @@ impl DaftExecutionConfig {
if let Some(val) =
parse_number_from_env(Self::ENV_DAFT_MIN_CPU_PER_TASK, cfg.min_cpu_per_task)
{
cfg.min_cpu_per_task = val;
if Self::is_valid_min_cpu_per_task(val) {
cfg.min_cpu_per_task = val;
} else {
eprintln!(
"Invalid {} value: {}, must be a finite number > 0, using default {}",
Self::ENV_DAFT_MIN_CPU_PER_TASK,
val,
cfg.min_cpu_per_task
);
}
}

if let Some(val) = parse_number_from_env(
Expand Down Expand Up @@ -513,7 +527,7 @@ mod tests {
// ENV_DAFT_MIN_CPU_PER_TASK
{
let cfg = DaftExecutionConfig::from_env();
assert_eq!(cfg.min_cpu_per_task, 0.5);
assert_eq!(cfg.min_cpu_per_task, 1.0);

unsafe {
std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "0.1");
Expand All @@ -525,7 +539,31 @@ mod tests {
std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "invalid");
}
let cfg = DaftExecutionConfig::from_env();
assert_eq!(cfg.min_cpu_per_task, 0.5);
assert_eq!(cfg.min_cpu_per_task, 1.0);

unsafe {
std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "0");
}
let cfg = DaftExecutionConfig::from_env();
assert_eq!(cfg.min_cpu_per_task, 1.0);

unsafe {
std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "-0.5");
}
let cfg = DaftExecutionConfig::from_env();
assert_eq!(cfg.min_cpu_per_task, 1.0);

unsafe {
std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "nan");
}
let cfg = DaftExecutionConfig::from_env();
assert_eq!(cfg.min_cpu_per_task, 1.0);

unsafe {
std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "inf");
}
let cfg = DaftExecutionConfig::from_env();
assert_eq!(cfg.min_cpu_per_task, 1.0);

unsafe {
std::env::remove_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK);
Expand Down
5 changes: 5 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ impl PyDaftExecutionConfig {
}

if let Some(min_cpu_per_task) = min_cpu_per_task {
if !DaftExecutionConfig::is_valid_min_cpu_per_task(min_cpu_per_task) {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"min_cpu_per_task must be a finite number > 0, got {min_cpu_per_task}"
)));
}
config.min_cpu_per_task = min_cpu_per_task;
}

Expand Down
36 changes: 34 additions & 2 deletions src/common/resource-request/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@ impl ResourceRequest {
num_gpus: Option<f64>,
memory_bytes: Option<usize>,
) -> DaftResult<Self> {
if let Some(num_cpus) = num_cpus
&& !(num_cpus.is_finite() && num_cpus >= 0.0)
{
return Err(DaftError::ValueError(format!(
"ResourceRequest num_cpus must be a finite, nonnegative number, got {}",
num_cpus
)));
}

if let Some(num_gpus) = num_gpus {
if num_gpus < 0.0 {
if !(num_gpus.is_finite() && num_gpus >= 0.0) {
return Err(DaftError::ValueError(format!(
"ResourceRequest num_gpus must be nonnegative, got {}",
"ResourceRequest num_gpus must be a finite, nonnegative number, got {}",
num_gpus
)));
}
Expand Down Expand Up @@ -275,3 +284,26 @@ pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {
parent.add_class::<ResourceRequest>()?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn rejects_non_finite_or_negative_cpus() {
for bad in [f64::NAN, f64::INFINITY, f64::NEG_INFINITY, -0.5] {
assert!(ResourceRequest::try_new_internal(Some(bad), None, None).is_err());
}
assert!(ResourceRequest::try_new_internal(Some(0.0), None, None).is_ok());
assert!(ResourceRequest::try_new_internal(Some(0.25), None, None).is_ok());
}

#[test]
fn rejects_non_finite_or_negative_gpus() {
for bad in [f64::NAN, f64::INFINITY, -1.0] {
assert!(ResourceRequest::try_new_internal(None, Some(bad), None).is_err());
}
assert!(ResourceRequest::try_new_internal(None, Some(0.5), None).is_ok());
assert!(ResourceRequest::try_new_internal(None, Some(2.0), None).is_ok());
}
}
93 changes: 40 additions & 53 deletions src/daft-distributed/src/python/ray/worker_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use pyo3::prelude::*;
use super::{task::RayTaskResultHandle, worker::RaySwordfishWorker};
use crate::scheduling::{
scheduler::WorkerSnapshot,
task::{SwordfishTask, TaskContext, TaskResourceRequest},
task::{SwordfishTask, TaskContext, TaskResourceRequest, next_autoscale_request},
worker::{Worker, WorkerId, WorkerManager},
};

Expand Down Expand Up @@ -262,57 +262,38 @@ impl WorkerManager for RayWorkerManager {
.unwrap_or(0)
.max(cluster_memory_bytes);

// 3. Accumulate bundles one at a time until the running total surpasses the
// high-water mark in any resource dimension (CPU, GPU, or memory). This ensures
// each cycle's request is exactly one bundle larger than the previous max —
// gradual enough to avoid exceeding an unknown cluster capacity limit.
let mut cpu_sum = 0.0;
let mut gpu_sum = 0.0;
let mut memory_sum = 0;
let mut surpassed = false;
let mut selected_bundles = Vec::new();
for bundle in &bundles {
cpu_sum += bundle.resource_request.num_cpus().unwrap_or(0.0);
gpu_sum += bundle.resource_request.num_gpus().unwrap_or(0.0);
memory_sum += bundle.resource_request.memory_bytes().unwrap_or(0);
selected_bundles.push(bundle);
if cpu_sum > high_water_mark_cpus
|| gpu_sum > high_water_mark_gpus
|| memory_sum > high_water_mark_memory
{
surpassed = true;
break;
}
}

// 4. If we went through all pending bundles without surpassing the high-water mark,
// the remaining demand is smaller than what we previously requested. Skip this
// cycle — Ray still holds our previous (larger) request, so no downscale occurs.
if !surpassed {
// 3. Select the next request: one bundle larger than the previous high-water
// mark, so the cluster ramps up gradually. None means demand hasn't grown
// past the last request, so skip — Ray still holds that (larger) request.
let Some(ray_bundles) = next_autoscale_request(
&bundles,
high_water_mark_cpus,
high_water_mark_gpus,
high_water_mark_memory,
) else {
return Ok(());
}

// 5. Send the selected bundles to Ray's autoscaler via request_resources().
// Strip zero-valued GPU/memory keys so Ray doesn't interpret them as a demand
// for zero-resource bundles on specialized nodes.
let python_bundles = selected_bundles
.iter()
.map(|bundle| {
let mut dict = HashMap::new();
dict.insert("CPU", bundle.num_cpus().ceil() as i64);
let gpu = bundle.num_gpus().ceil() as i64;
if gpu > 0 {
dict.insert("GPU", gpu);
}
let memory = bundle.memory_bytes() as i64;
if memory > 0 {
dict.insert("memory", memory);
}
dict
})
.collect::<Vec<_>>();
};

// 4. Send the bundles to Ray's autoscaler. Zero-valued GPU/memory keys are
// omitted so Ray doesn't demand zero-resource bundles on specialized nodes.
Python::attach(|py| -> DaftResult<()> {
let python_bundles = ray_bundles
.iter()
.map(|bundle| {
let dict = pyo3::types::PyDict::new(py);
if bundle.cpu > 0 {
dict.set_item("CPU", bundle.cpu)?;
}
if let Some(gpu) = bundle.gpu {
dict.set_item("GPU", gpu)?;
}
if let Some(memory) = bundle.memory {
dict.set_item("memory", memory)?;
}
Ok::<_, PyErr>(dict)
})
.collect::<Result<Vec<_>, _>>()?;

let flotilla_module = py.import(pyo3::intern!(py, "daft.runners.flotilla"))?;
flotilla_module.call_method1(pyo3::intern!(py, "try_autoscale"), (python_bundles,))?;
Ok(())
Expand All @@ -323,10 +304,16 @@ impl WorkerManager for RayWorkerManager {
state.pending_release_blacklist.clear();
state.last_refresh = None;

// 6. Record this request as the new high-water mark so the next cycle will
// request exactly one bundle more, and so we never send a smaller request.
state.max_resources_requested =
ResourceRequest::try_new_internal(Some(cpu_sum), Some(gpu_sum), Some(memory_sum))?;
// 5. Record the new high-water mark as the aggregated integer totals just
// requested — same units Ray sees, so each cycle escalates by a whole unit.
let requested_cpus: i64 = ray_bundles.iter().map(|b| b.cpu).sum();
let requested_gpus: i64 = ray_bundles.iter().filter_map(|b| b.gpu).sum();
let requested_memory: i64 = ray_bundles.iter().filter_map(|b| b.memory).sum();
state.max_resources_requested = ResourceRequest::try_new_internal(
Some(requested_cpus as f64),
Some(requested_gpus as f64),
Some(requested_memory as usize),
)?;
state.last_autoscale_request_time = Some(Instant::now());

Ok(())
Expand Down
Loading
Loading