diff --git a/pyproject.toml b/pyproject.toml index 624d7f7..7a2823c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "fsspec >= 2023.12.2", "ray[default] >= 2.10.0", "graphviz >= 0.19.1", + "deltalake >= 0.15.0", ] [project.optional-dependencies] diff --git a/smallpond/dataframe.py b/smallpond/dataframe.py index 9d8fd0f..e1b8a15 100644 --- a/smallpond/dataframe.py +++ b/smallpond/dataframe.py @@ -12,6 +12,7 @@ import ray import ray.exceptions from loguru import logger +from deltalake import DeltaTable from smallpond.execution.task import Task from smallpond.io.filesystem import remove_path @@ -56,6 +57,25 @@ def read_parquet( dataset = ParquetDataSet(paths, columns=columns, union_by_name=union_by_name, recursive=recursive) plan = DataSourceNode(self._ctx, dataset) return DataFrame(self, plan) + + + def read_deltalake( + self, + path: str, + recursive: bool = False, + columns: Optional[List[str]] = None, + union_by_name: bool = False, + ) -> DataFrame: + """ + Create a DataFrame from DeltaLake Parquet files. + """ + if 's3://' in path or 'gs://' in path: + raise ValueError("DeltaLake on S3 and GS is not supported yet.") + dt = DeltaTable(path) + paths = dt.files() + dataset = ParquetDataSet(paths, columns=columns, union_by_name=union_by_name, recursive=recursive) + plan = DataSourceNode(self._ctx, dataset) + return DataFrame(self, plan) def read_json(self, paths: Union[str, List[str]], schema: Dict[str, str]) -> DataFrame: """ diff --git a/tests/test_dataframe.py b/tests/test_dataframe.py index 6d9b085..523037f 100644 --- a/tests/test_dataframe.py +++ b/tests/test_dataframe.py @@ -3,6 +3,7 @@ import pandas as pd import pyarrow as pa import pytest +from deltalake.writer import write_deltalake from smallpond.dataframe import Session @@ -39,6 +40,12 @@ def test_parquet(sp: Session): df = sp.read_parquet("tests/data/mock_urls/*.parquet") assert df.count() == 1000 +def test_read_deltalake(sp: Session): + sample = pd.DataFrame({"x": [1, 2, 3]}) + write_deltalake("tests/data/", sample) + df = sp.read_deltalake("tests/data/") + assert df.count() == 3 + def test_take(sp: Session): df = sp.from_pandas(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}))