Skip to content

Commit c19a075

Browse files
committed
Merge remote-tracking branch 'upstream/main' into codex-sql-read-parquet-ignore-corrupt-files
2 parents e9a2c6c + 98901dd commit c19a075

24 files changed

Lines changed: 2223 additions & 2074 deletions

File tree

Cargo.lock

Lines changed: 78 additions & 606 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ exclude = [
243243

244244
[workspace.dependencies]
245245
arrow = "57.1.0"
246-
half = {version = "2.7.1", features = ["num-traits", "bytemuck", "serde"]}
247246
arrow-array = {version = "57.1.0", features = ["chrono-tz"]}
248247
arrow-buffer = "57.1.0"
249248
arrow-csv = "57.1.0"
@@ -312,6 +311,7 @@ daft-sql = {path = "src/daft-sql"}
312311
dashmap = "6.1.0"
313312
educe = "0.6.0"
314313
futures = "0.3.30"
314+
half = {version = "2.7.1", features = ["num-traits", "bytemuck", "serde"]}
315315
hashbrown = "0.16"
316316
html-escape = "0.2.13"
317317
image = {version = "0.25.10", default-features = false}

daft/dataframe/dataframe.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
from daft.catalog.__unity._client import UnityCatalogTable
7878
from daft.checkpoint import IdempotentCommit
7979
from daft.convert import ArrowStreamExportable
80+
from daft.dataframe.to_torch import DaftTorchDataLoader
8081
from daft.execution.metadata import ExecutionMetadata
8182
from daft.io import DataSink
8283
from daft.io.sink import WriteResultType
@@ -621,7 +622,7 @@ def iter_partitions(
621622
) -> Iterator[Union[MicroPartition, "ray.ObjectRef"]]:
622623
"""Begin executing this dataframe and return an iterator over the partitions.
623624
624-
Each partition will be returned as a daft.recordbatch object (if using Python runner backend)
625+
Each partition will be returned as a daft.MicroPartition object (if using Python runner backend)
625626
or a ray ObjectRef (if using Ray runner backend).
626627
627628
Args:
@@ -5859,6 +5860,58 @@ def to_torch_iter_dataset(
58595860

58605861
return DaftTorchIterableDataset(df)
58615862

5863+
@DataframePublicAPI
5864+
def to_torch_dataloader(
5865+
self,
5866+
batch_size: int = 1,
5867+
*,
5868+
pin_memory: bool = False,
5869+
pin_memory_device: str = "",
5870+
prefetch_count: int = 0,
5871+
) -> "DaftTorchDataLoader":
5872+
"""Return a DataLoader-like iterator that streams batched partitions for PyTorch training.
5873+
5874+
Begins execution of the DataFrame when iterated. Each yielded batch is a dict mapping column
5875+
names to `torch.Tensor` values (or Python lists for non-numeric columns).
5876+
5877+
For row-level shuffling, use [``shuffle``][daft.DataFrame.shuffle] or
5878+
[``sample``][daft.DataFrame.sample] on the DataFrame before calling this method.
5879+
5880+
Note:
5881+
Batch sizing is best-effort. Batches may be smaller than `batch_size`.
5882+
5883+
Args:
5884+
batch_size: Target number of rows per batch.
5885+
pin_memory: If `True`, pin memory on returned tensors for faster GPU transfer.
5886+
pin_memory_device: Optional device for pinned memory (PyTorch 2.x).
5887+
prefetch_count: Number of batches loaded in advance. This will increase memory usage, but can
5888+
improve throughput.
5889+
5890+
Returns:
5891+
DaftTorchDataLoader: Iterable over batch dicts for use as
5892+
`for batch in df.to_torch_dataloader(batch_size): ...`
5893+
5894+
Examples:
5895+
>>> import daft
5896+
>>> import torch # doctest: +SKIP
5897+
>>> df = daft.from_pydict({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
5898+
>>> for batch in df.to_torch_dataloader(batch_size=2): # doctest: +SKIP
5899+
... assert batch["x"].shape == (2,)
5900+
5901+
Tip:
5902+
For the PyTorch `IterableDataset` + `DataLoader` composition, see
5903+
[``to_torch_iter_dataset``][daft.DataFrame.to_torch_iter_dataset].
5904+
"""
5905+
from daft.dataframe.to_torch import DaftTorchDataLoader
5906+
5907+
return DaftTorchDataLoader(
5908+
self,
5909+
batch_size,
5910+
pin_memory=pin_memory,
5911+
pin_memory_device=pin_memory_device,
5912+
prefetch_count=prefetch_count,
5913+
)
5914+
58625915
@DataframePublicAPI
58635916
def to_ray_dataset(self) -> "ray.data.dataset.DataSet":
58645917
"""Converts the current DataFrame to a [Ray Dataset](https://docs.ray.io/en/latest/data/api/dataset.html#ray.data.Dataset) which is useful for running distributed ML model training in Ray.

daft/dataframe/to_torch.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,15 @@
33
import logging
44
from typing import TYPE_CHECKING, Any
55

6+
from daft.dependencies import np, torch
7+
from daft.runners.partitioning import MaterializedResult
8+
69
if TYPE_CHECKING:
710
from collections.abc import Iterable, Iterator
811

12+
from daft.dataframe.dataframe import DataFrame
13+
from daft.recordbatch import MicroPartition
14+
915
logger = logging.getLogger(__name__)
1016

1117
try:
@@ -47,3 +53,84 @@ def __init__(self, iterable: Iterable[dict[str, Any]]):
4753

4854
def __iter__(self) -> Iterator[dict[str, Any]]:
4955
return iter(self.iterable)
56+
57+
58+
class DaftTorchDataLoader:
59+
"""Streams batched partitions from a Daft DataFrame and yields PyTorch-ready batch dicts.
60+
61+
Note:
62+
This simulates the behavior of a PyTorch DataLoader, but does not use the DataLoader class itself.
63+
If the underlying DataFrame is already materialized, it will reuse the existing data.
64+
"""
65+
66+
def __init__(
67+
self,
68+
df: DataFrame,
69+
batch_size: int = 1,
70+
*,
71+
pin_memory: bool = False,
72+
pin_memory_device: str = "",
73+
prefetch_count: int = 0,
74+
# TODO: Add support for drop_last when we have strict into_batches
75+
) -> None:
76+
if batch_size <= 0:
77+
raise ValueError("batch_size must be greater than 0")
78+
79+
self._batch_size = batch_size
80+
self._pin_memory = pin_memory
81+
self._pin_memory_device = pin_memory_device if pin_memory_device else None
82+
self._prefetch_count = prefetch_count
83+
84+
self._batched_df = df.into_batches(batch_size)
85+
86+
def __iter__(self) -> Iterator[dict[str, Any]]:
87+
from daft.runners import get_or_create_runner
88+
89+
results = self._batched_df._result
90+
if results is not None:
91+
for _, mat_result in results.items():
92+
batch = self._to_torch_batch(mat_result.micropartition())
93+
if batch is not None:
94+
yield batch
95+
else:
96+
buffer_size = self._prefetch_count if self._prefetch_count > 0 else None
97+
partitions_iter: Iterator[MaterializedResult[Any]] = get_or_create_runner().run_iter(
98+
self._batched_df._builder, results_buffer_size=buffer_size
99+
)
100+
for mat_result in partitions_iter:
101+
batch = self._to_torch_batch(mat_result.micropartition())
102+
if batch is not None:
103+
yield batch
104+
105+
def _to_torch_batch(self, batch: MicroPartition) -> dict[str, Any]:
106+
return {key: self._column_to_tensor(values) for key, values in batch.to_pydict().items()}
107+
108+
def _column_to_tensor(self, values: list[Any]) -> Any:
109+
if len(values) == 0:
110+
return self._pin(torch.tensor([]))
111+
112+
first = values[0]
113+
114+
if isinstance(first, torch.Tensor):
115+
return self._pin(torch.stack(values))
116+
if hasattr(first, "__array__") and not isinstance(first, (str, bytes)):
117+
if isinstance(first, np.ndarray) and first.ndim > 0:
118+
return self._pin(torch.stack([torch.as_tensor(v) for v in values]))
119+
return self._pin(torch.as_tensor(values))
120+
if isinstance(first, (bool, int, float)):
121+
return self._pin(torch.as_tensor(values))
122+
123+
return values
124+
125+
def _pin(self, tensor: torch.Tensor) -> torch.Tensor:
126+
if not self._pin_memory:
127+
return tensor
128+
129+
# Pinned host memory is only used for async CPU -> CUDA copies.
130+
if not torch.cuda.is_available():
131+
return tensor
132+
133+
# If a specific device is provided, use it. Otherwise, use the default device.
134+
if self._pin_memory_device:
135+
return tensor.pin_memory(device=self._pin_memory_device)
136+
return tensor.pin_memory()

examples/hello/src/lib.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{ffi::CStr, sync::Arc};
22

3-
use arrow_array::{Array, ArrayRef, builder::StringBuilder, cast::AsArray};
3+
use arrow_array::{Array, ArrayRef};
44
use arrow_schema::{DataType, Field};
55
use daft_ext::{daft_extension, prelude::*};
66

@@ -18,18 +18,9 @@ impl DaftExtension for HelloExtension {
1818

1919
// ── Scalar Function ────────────────────────────────────────────────
2020

21-
#[daft_func_batch(return_dtype = DataType::Utf8)]
22-
fn greet(input: ArrayRef) -> DaftResult<ArrayRef> {
23-
let names = input.as_string::<i64>();
24-
let mut builder = StringBuilder::with_capacity(names.len(), names.len() * 16);
25-
for i in 0..names.len() {
26-
if names.is_null(i) {
27-
builder.append_null();
28-
} else {
29-
builder.append_value(format!("Hello, {}!", names.value(i)));
30-
}
31-
}
32-
Ok(Arc::new(builder.finish()))
21+
#[daft_func]
22+
fn greet(name: &str) -> String {
23+
format!("Hello, {}!", name)
3324
}
3425

3526
// ── Aggregate Function ─────────────────────────────────────────────

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ openai = ["openai<2.39.0", "numpy<2.4.0", "pillow==12.2.0"]
4747
pandas = ["pandas<2.4.0"]
4848
postgres = ["psycopg[binary]<3.4.0", "pgvector<0.5.0", "sqlglot<30.9.0", "connectorx>=0.4.4,<0.5.0"]
4949
ray = [
50+
# floor is 2.11: flotilla uses ray.exceptions.ActorUnavailableError, added in ray 2.11
5051
# Inherit existing Ray version. Get the "default" extra for the Ray dashboard.
51-
'ray[data, client]<2.56.0,>=2.11.0; platform_system != "Windows"',
52-
'ray[data, client]>=2.11.0,<2.56.0; platform_system == "Windows"' # floor is 2.11: flotilla uses ray.exceptions.ActorUnavailableError, added in ray 2.11
52+
'ray[data, client]<2.56.0,>=2.11.0'
5353
]
5454
transformers = ["transformers<5.10.0", "sentence-transformers<5.6.0", "torch<2.13.0", "torchvision<0.28.0", "pillow==12.2.0"]
5555
# connectorx 0.4.4 is needed for pgvector support.

src/common/io-config/src/python.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1548,6 +1548,30 @@ impl HuggingFaceConfig {
15481548
}
15491549
}
15501550

1551+
impl TosConfig {
1552+
const MULTIPART_SETTING_IGNORED_MSG: &str = "TosConfig multipart settings are no longer used; TOS I/O is now handled by OpenDAL, which manages multipart uploads internally.";
1553+
1554+
fn warn_multipart_settings(
1555+
multipart_size: Option<u64>,
1556+
multipart_max_concurrency: Option<u32>,
1557+
) {
1558+
if let Some(multipart_size) = multipart_size {
1559+
log::warn!(
1560+
"{} Ignoring multipart_size={}.",
1561+
Self::MULTIPART_SETTING_IGNORED_MSG,
1562+
multipart_size
1563+
);
1564+
}
1565+
if let Some(multipart_max_concurrency) = multipart_max_concurrency {
1566+
log::warn!(
1567+
"{} Ignoring multipart_max_concurrency={}.",
1568+
Self::MULTIPART_SETTING_IGNORED_MSG,
1569+
multipart_max_concurrency
1570+
);
1571+
}
1572+
}
1573+
}
1574+
15511575
#[pymethods]
15521576
impl TosConfig {
15531577
#[allow(clippy::too_many_arguments)]
@@ -1584,6 +1608,8 @@ impl TosConfig {
15841608
multipart_size: Option<u64>,
15851609
multipart_max_concurrency: Option<u32>,
15861610
) -> PyResult<Self> {
1611+
Self::warn_multipart_settings(multipart_size, multipart_max_concurrency);
1612+
15871613
let def = crate::TosConfig::default();
15881614
Ok(Self {
15891615
config: crate::TosConfig {
@@ -1644,6 +1670,8 @@ impl TosConfig {
16441670
multipart_size: Option<u64>,
16451671
multipart_max_concurrency: Option<u32>,
16461672
) -> PyResult<Self> {
1673+
Self::warn_multipart_settings(multipart_size, multipart_max_concurrency);
1674+
16471675
Ok(Self {
16481676
config: crate::TosConfig {
16491677
region: region.or_else(|| self.config.region.clone()),
@@ -1742,10 +1770,28 @@ impl TosConfig {
17421770
Ok(self.config.multipart_size)
17431771
}
17441772

1773+
#[setter]
1774+
pub fn set_multipart_size(&mut self, multipart_size: u64) {
1775+
log::warn!(
1776+
"{} Ignoring multipart_size={multipart_size}.",
1777+
Self::MULTIPART_SETTING_IGNORED_MSG
1778+
);
1779+
self.config.multipart_size = multipart_size;
1780+
}
1781+
17451782
#[getter]
17461783
pub fn multipart_max_concurrency(&self) -> PyResult<u32> {
17471784
Ok(self.config.multipart_max_concurrency)
17481785
}
1786+
1787+
#[setter]
1788+
pub fn set_multipart_max_concurrency(&mut self, multipart_max_concurrency: u32) {
1789+
log::warn!(
1790+
"{} Ignoring multipart_max_concurrency={multipart_max_concurrency}.",
1791+
Self::MULTIPART_SETTING_IGNORED_MSG
1792+
);
1793+
self.config.multipart_max_concurrency = multipart_max_concurrency;
1794+
}
17491795
}
17501796

17511797
#[pymethods]

0 commit comments

Comments
 (0)