Skip to content

Commit bb506a4

Browse files
jaychiaJay Chia
andauthored
[FEAT] Daft Catalog API (Eventual-Inc#3036)
Adds a `DaftCatalog` API to help cement Daft's catalog data access patterns. Here is the intended UX: ```python import daft ### # Registering external catalogs # # We recognize `PyIceberg.Catalog`s and `UnityCatalog` for now # TODO: This should be configurable via a Daft catalog config file (.toml or .yaml) ### from pyiceberg.catalog import load_catalog catalog = load_catalog(...) daft.catalog.register_python_catalog(catalog) ### # Adding named tables ### df = daft.register_table(df, "foo") ### # Reading tables ### df1 = daft.read_table("foo") # first priority is named tables df2 = daft.read_table("x.y.z") # next priority is the registered default catalog df2 = daft.read_table("default.x.y.z") # equivalent to the previous call df3 = daft.read_table("my_other_catalog.x.y.z") # Supports named catalogs other than default one ``` Other APIs which will be nice as follow-ons: - [ ] Integrate this with the SQLCatalog API that our SQL stuff uses - [ ] Detection of catalog from a YAML `~/.daft.yaml` config file - [ ] Allow for configuring table access (e.g. `daft.read_table("iceberg_table", options=daft.catalog.IcebergReadOptions(...))`) - [ ] Implementations for other catalogs that isn't a Python catalog, and can support other table types (e.g. Hive and Delta): - [ ] `daft.catalog.register_aws_glue()` - [ ] `daft.catalog.register_hive_metastore()` - [ ] `df.write_table("table_name", mode="overwrite|append", create_if_missing=True)` - [ ] `df.upsert("table_name", match_columns={...}, update_columns={...}, insert_columns={...})` - [ ] DDL: allow for easy creation of tables, erroring out if the selected backend does not support a given table format - [ ] `daft.catalog.create_table_parquet(...)` - [ ] `daft.catalog.create_table_iceberg(...)` - [ ] `daft.catalog.create_table_deltalake(...)` - [ ] `daft.catalog.list_tables(...)` --------- Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
1 parent 84db665 commit bb506a4

25 files changed

+1021
-79
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ common-scan-info = {path = "src/common/scan-info", default-features = false}
99
common-system-info = {path = "src/common/system-info", default-features = false}
1010
common-tracing = {path = "src/common/tracing", default-features = false}
1111
common-version = {path = "src/common/version", default-features = false}
12+
daft-catalog = {path = "src/daft-catalog", default-features = false}
13+
daft-catalog-python-catalog = {path = "src/daft-catalog/python-catalog", optional = true}
1214
daft-compression = {path = "src/daft-compression", default-features = false}
1315
daft-connect = {path = "src/daft-connect", optional = true}
1416
daft-core = {path = "src/daft-core", default-features = false}
@@ -52,6 +54,8 @@ python = [
5254
"common-display/python",
5355
"common-resource-request/python",
5456
"common-system-info/python",
57+
"daft-catalog/python",
58+
"daft-catalog-python-catalog/python",
5559
"daft-connect/python",
5660
"daft-core/python",
5761
"daft-csv/python",
@@ -75,6 +79,11 @@ python = [
7579
"daft-writers/python",
7680
"daft-table/python",
7781
"dep:daft-connect",
82+
"common-daft-config/python",
83+
"common-system-info/python",
84+
"common-display/python",
85+
"common-resource-request/python",
86+
"dep:daft-catalog-python-catalog",
7887
"dep:pyo3",
7988
"dep:pyo3-log"
8089
]
@@ -140,6 +149,7 @@ members = [
140149
"src/common/scan-info",
141150
"src/common/system-info",
142151
"src/common/treenode",
152+
"src/daft-catalog",
143153
"src/daft-core",
144154
"src/daft-csv",
145155
"src/daft-dsl",

daft/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def refresh_logger() -> None:
5858
# Daft top-level imports
5959
###
6060

61+
from daft.catalog import read_table, register_table
6162
from daft.context import set_execution_config, set_planning_config, execution_config_ctx, planning_config_ctx
6263
from daft.convert import (
6364
from_arrow,
@@ -129,6 +130,8 @@ def refresh_logger() -> None:
129130
"set_execution_config",
130131
"planning_config_ctx",
131132
"execution_config_ctx",
133+
"read_table",
134+
"register_table",
132135
"sql",
133136
"sql_expr",
134137
"to_struct",

daft/catalog/__init__.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
"""The `daft.catalog` module contains functionality for Data Catalogs.
2+
3+
A Data Catalog can be understood as a system/service for users to discover, access and query their data.
4+
Most commonly, users' data is represented as a "table". Some more modern Data Catalogs such as Unity Catalog
5+
also expose other types of data including files, ML models, registered functions and more.
6+
7+
Examples of Data Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog.
8+
9+
Daft manages Data Catalogs by registering them in an internal meta-catalog, called the "DaftMetaCatalog". This
10+
is simple a collection of data catalogs, which Daft will attempt to detect from a users' current environment.
11+
12+
**Data Catalog**
13+
14+
Daft recognizes a default catalog which it will attempt to use when no specific catalog name is provided.
15+
16+
```python
17+
# This will hit the default catalog
18+
daft.read_table("my_db.my_namespace.my_table")
19+
```
20+
21+
**Named Tables**
22+
23+
Daft allows also the registration of named tables, which have no catalog associated with them.
24+
25+
Note that named tables take precedence over the default catalog's table names when resolving names.
26+
27+
```python
28+
df = daft.from_pydict({"foo": [1, 2, 3]})
29+
30+
daft.catalog.register_named_table(
31+
"my_table",
32+
df,
33+
)
34+
35+
# Your table is now accessible from Daft-SQL, or Daft's `read_table`
36+
df1 = daft.read_table("my_table")
37+
df2 = daft.sql("SELECT * FROM my_table")
38+
```
39+
"""
40+
41+
from __future__ import annotations
42+
43+
from daft.daft import catalog as native_catalog
44+
from daft.logical.builder import LogicalPlanBuilder
45+
46+
from daft.dataframe import DataFrame
47+
48+
_PYICEBERG_AVAILABLE = False
49+
try:
50+
from pyiceberg.catalog import Catalog as PyIcebergCatalog
51+
52+
_PYICEBERG_AVAILABLE = True
53+
except ImportError:
54+
pass
55+
56+
_UNITY_AVAILABLE = False
57+
try:
58+
from daft.unity_catalog import UnityCatalog
59+
60+
_UNITY_AVAILABLE = True
61+
except ImportError:
62+
pass
63+
64+
__all__ = [
65+
"read_table",
66+
"register_python_catalog",
67+
"unregister_catalog",
68+
"register_table",
69+
]
70+
71+
# Forward imports from the native catalog which don't require Python wrappers
72+
unregister_catalog = native_catalog.unregister_catalog
73+
74+
75+
def read_table(name: str) -> DataFrame:
76+
"""Finds a table with the specified name and reads it as a DataFrame
77+
78+
The provided name can be any of the following, and Daft will return them with the following order of priority:
79+
80+
1. Name of a registered dataframe/SQL view (manually registered using `daft.register_table`): `"my_registered_table"`
81+
2. Name of a table within the default catalog (without inputting the catalog name) for example: `"my.table.name"`
82+
3. Name of a fully-qualified table path with the catalog name for example: `"my_catalog.my.table.name"`
83+
84+
Args:
85+
name: The identifier for the table to read
86+
87+
Returns:
88+
A DataFrame containing the data from the specified table.
89+
"""
90+
native_logical_plan_builder = native_catalog.read_table(name)
91+
return DataFrame(LogicalPlanBuilder(native_logical_plan_builder))
92+
93+
94+
def register_table(name: str, dataframe: DataFrame) -> str:
95+
"""Register a DataFrame as a named table.
96+
97+
This function registers a DataFrame as a named table, making it accessible
98+
via Daft-SQL or Daft's `read_table` function.
99+
100+
Args:
101+
name (str): The name to register the table under.
102+
dataframe (daft.DataFrame): The DataFrame to register as a table.
103+
104+
Returns:
105+
str: The name of the registered table.
106+
107+
Example:
108+
>>> df = daft.from_pydict({"foo": [1, 2, 3]})
109+
>>> daft.catalog.register_table("my_table", df)
110+
>>> daft.read_table("my_table")
111+
"""
112+
return native_catalog.register_table(name, dataframe._builder._builder)
113+
114+
115+
def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str | None = None) -> str:
116+
"""Registers a Python catalog with Daft
117+
118+
Currently supports:
119+
120+
* [PyIceberg Catalogs](https://py.iceberg.apache.org/api/)
121+
* [Unity Catalog](https://www.getdaft.io/projects/docs/en/latest/user_guide/integrations/unity-catalog.html)
122+
123+
Args:
124+
catalog (PyIcebergCatalog | UnityCatalog): The Python catalog to register.
125+
name (str | None, optional): The name to register the catalog under. If None, this catalog is registered as the default catalog.
126+
127+
Returns:
128+
str: The name of the registered catalog.
129+
130+
Raises:
131+
ValueError: If an unsupported catalog type is provided.
132+
133+
Example:
134+
>>> from pyiceberg.catalog import load_catalog
135+
>>> catalog = load_catalog("my_catalog")
136+
>>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog")
137+
138+
"""
139+
python_catalog: PyIcebergCatalog
140+
if _PYICEBERG_AVAILABLE and isinstance(catalog, PyIcebergCatalog):
141+
from daft.catalog.pyiceberg import PyIcebergCatalogAdaptor
142+
143+
python_catalog = PyIcebergCatalogAdaptor(catalog)
144+
elif _UNITY_AVAILABLE and isinstance(catalog, UnityCatalog):
145+
from daft.catalog.unity import UnityCatalogAdaptor
146+
147+
python_catalog = UnityCatalogAdaptor(catalog)
148+
else:
149+
raise ValueError(f"Unsupported catalog type: {type(catalog)}")
150+
151+
return native_catalog.register_python_catalog(python_catalog, name)

daft/catalog/pyiceberg.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
5+
if TYPE_CHECKING:
6+
from pyiceberg.catalog import Catalog as PyIcebergCatalog
7+
from pyiceberg.table import Table as PyIcebergTable
8+
9+
from daft.dataframe import DataFrame
10+
11+
from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable
12+
13+
14+
class PyIcebergCatalogAdaptor(PythonCatalog):
15+
def __init__(self, pyiceberg_catalog: PyIcebergCatalog):
16+
self._catalog = pyiceberg_catalog
17+
18+
def list_tables(self, prefix: str) -> list[str]:
19+
return [".".join(tup) for tup in self._catalog.list_tables(prefix)]
20+
21+
def load_table(self, name: str) -> PyIcebergTableAdaptor:
22+
return PyIcebergTableAdaptor(self._catalog.load_table(name))
23+
24+
25+
class PyIcebergTableAdaptor(PythonCatalogTable):
26+
def __init__(self, pyiceberg_table: PyIcebergTable):
27+
self._table = pyiceberg_table
28+
29+
def to_dataframe(self) -> DataFrame:
30+
import daft
31+
32+
return daft.read_iceberg(self._table)

daft/catalog/python_catalog.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from __future__ import annotations
2+
3+
from abc import abstractmethod
4+
from typing import TYPE_CHECKING
5+
6+
if TYPE_CHECKING:
7+
from daft.dataframe import DataFrame
8+
9+
10+
class PythonCatalog:
11+
"""Wrapper class for various Python implementations of Data Catalogs"""
12+
13+
@abstractmethod
14+
def list_tables(self, prefix: str) -> list[str]: ...
15+
16+
@abstractmethod
17+
def load_table(self, name: str) -> PythonCatalogTable: ...
18+
19+
20+
class PythonCatalogTable:
21+
"""Wrapper class for various Python implementations of Data Catalog Tables"""
22+
23+
@abstractmethod
24+
def to_dataframe(self) -> DataFrame: ...

daft/catalog/unity.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
5+
if TYPE_CHECKING:
6+
from daft.dataframe import DataFrame
7+
from daft.unity_catalog import UnityCatalog, UnityCatalogTable
8+
9+
from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable
10+
11+
12+
class UnityCatalogAdaptor(PythonCatalog):
13+
def __init__(self, unity_catalog: UnityCatalog):
14+
self._catalog = unity_catalog
15+
16+
def list_tables(self, prefix: str) -> list[str]:
17+
num_namespaces = prefix.count(".")
18+
if prefix == "":
19+
return [
20+
tbl
21+
for cat in self._catalog.list_catalogs()
22+
for schema in self._catalog.list_schemas(cat)
23+
for tbl in self._catalog.list_tables(schema)
24+
]
25+
elif num_namespaces == 0:
26+
catalog_name = prefix
27+
return [
28+
tbl for schema in self._catalog.list_schemas(catalog_name) for tbl in self._catalog.list_tables(schema)
29+
]
30+
elif num_namespaces == 1:
31+
schema_name = prefix
32+
return [tbl for tbl in self._catalog.list_tables(schema_name)]
33+
else:
34+
raise ValueError(
35+
f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {prefix}"
36+
)
37+
38+
def load_table(self, name: str) -> UnityTableAdaptor:
39+
return UnityTableAdaptor(self._catalog.load_table(name))
40+
41+
42+
class UnityTableAdaptor(PythonCatalogTable):
43+
def __init__(self, unity_table: UnityCatalogTable):
44+
self._table = unity_table
45+
46+
def to_dataframe(self) -> DataFrame:
47+
import daft
48+
49+
return daft.read_deltalake(self._table)

daft/daft/catalog.pyi

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from typing import TYPE_CHECKING
2+
3+
from daft.daft import LogicalPlanBuilder as PyLogicalPlanBuilder
4+
5+
if TYPE_CHECKING:
6+
from daft.catalog.python_catalog import PythonCatalog
7+
8+
def read_table(name: str) -> PyLogicalPlanBuilder: ...
9+
def register_table(name: str, plan_builder: PyLogicalPlanBuilder) -> str: ...
10+
def register_python_catalog(catalog: PythonCatalog, catalog_name: str | None) -> str: ...
11+
def unregister_catalog(catalog_name: str | None) -> bool: ...

daft/io/catalog.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ class DataCatalogTable:
3434
table_name: str
3535
catalog_id: Optional[str] = None
3636

37+
def __post_init__(self):
38+
import warnings
39+
40+
warnings.warn(
41+
"This API will soon be deprecated. Users should use the new functionality in daft.catalog.",
42+
DeprecationWarning,
43+
stacklevel=2,
44+
)
45+
3746
def table_uri(self, io_config: IOConfig) -> str:
3847
"""
3948
Get the URI of the table in the data catalog.

src/common/error/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub enum DaftError {
5050
FromUtf8Error(#[from] std::string::FromUtf8Error),
5151
#[error("Not Yet Implemented: {0}")]
5252
NotImplemented(String),
53+
#[error("DaftError::CatalogError {0}")]
54+
CatalogError(String),
5355
}
5456

5557
impl DaftError {

0 commit comments

Comments
 (0)