Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip [skip ci] #21986

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 87 additions & 16 deletions py-polars/polars/io/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import polars._reexport as pl
from polars._utils.convert import to_py_date, to_py_datetime
from polars.dependencies import pyiceberg
from polars.io.plugins import register_io_source

if TYPE_CHECKING:
from datetime import date, datetime

from pyiceberg.table import Table
from typing import Iterator

from polars import DataFrame, LazyFrame, Series
from polars import DataFrame, LazyFrame, Series, Expr

__all__ = ["scan_iceberg"]

Expand Down Expand Up @@ -136,23 +138,92 @@ def scan_iceberg(
>>> snapshot_id = 7051579356916758811
>>> pl.scan_iceberg(table_path, snapshot_id=snapshot_id).collect() # doctest: +SKIP
"""
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.table import StaticTable

if isinstance(source, str):
source = StaticTable.from_metadata(
metadata_location=source, properties=storage_options or {}
)

if snapshot_id is not None:
snapshot = source.snapshot_by_id(snapshot_id)
if snapshot is None:
msg = f"Snapshot ID not found: {snapshot_id}"
raise ValueError(msg)

func = partial(_scan_pyarrow_dataset_impl, source, snapshot_id=snapshot_id)
arrow_schema = schema_to_pyarrow(source.schema())
return pl.LazyFrame._scan_python_function(arrow_schema, func, pyarrow=True)
def schema_fn() -> pl.Schema:
from pyiceberg.io.pyarrow import schema_to_pyarrow
import pyarrow as pa
from polars import from_arrow
if isinstance(source, str):
tbl = StaticTable.from_metadata(
metadata_location=source, properties=storage_options or {}
)
else:
tbl = source
arrow_schema = schema_to_pyarrow(tbl.schema())
polars_schema = from_arrow(pa.Table.from_pylist([], arrow_schema)).schema # type: ignore[union-attr]
return polars_schema

def io_source(
with_columns: list[str] | None,
predicate: Expr | None,
n_rows: int | None,
batch_size: int | None,
) -> Iterator[DataFrame]:
if isinstance(source, str):
tbl = StaticTable.from_metadata(
metadata_location=source, properties=storage_options or {}
)
else:
tbl = source


print(tbl)
iceberg_scans = tbl.scan(limit=n_rows, snapshot_id=snapshot_id).plan_files()
import polars as _pl

# print("iceberg scans", iceberg_scans)
for task in iceberg_scans:
file = task.file
path = file.file_path
record_count = file.record_count
file_format = file.file_format
if file_format != "PARQUET":
raise NotImplementedError(f"{file_format} for iceberg not implemented!")

delete_files = [f.file_path for f in task.delete_files]

# print("file format", file_format)
# print("file:", file, "path:", path, "delete files:", delete_files)

lf = _pl.scan_parquet(path)

if with_columns is not None:
lf = lf.with_columns(lf)
if predicate is not None:
print(predicate)
lf = lf.filter(predicate)

yield lf.collect()

# lf = function().lazy()
# if with_columns is not None:
# lf = lf.select(with_columns)
# if predicate is not None:
# lf = lf.filter(predicate)
# if n_rows is not None:
# lf = lf.limit(n_rows)
# yield lf.collect()

return register_io_source(io_source, schema=schema_fn, validate_schema=False)

# print(source)
# if isinstance(source, str):
# source = StaticTable.from_metadata(
# metadata_location=source, properties=storage_options or {}
# )
#
# print(dir(source))
#
# if snapshot_id is not None:
# snapshot = source.snapshot_by_id(snapshot_id)
# if snapshot is None:
# msg = f"Snapshot ID not found: {snapshot_id}"
# raise ValueError(msg)
#
# func = partial(_scan_pyarrow_dataset_impl, source, snapshot_id=snapshot_id)
# arrow_schema = schema_to_pyarrow(source.schema())
# return pl.LazyFrame._scan_python_function(arrow_schema, func, pyarrow=True)


def _scan_pyarrow_dataset_impl(
Expand Down
3 changes: 3 additions & 0 deletions py-polars/tests/unit/io/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def iceberg_path(io_files_path: Path) -> str:
return f"file://{iceberg_path.resolve()}"


def test_scan_iceberg_2(iceberg_path: str) -> None:
print(pl.scan_iceberg(iceberg_path, snapshot_id=7051579356916758811).collect())

@pytest.mark.slow
@pytest.mark.write_disk
@pytest.mark.filterwarnings(
Expand Down
Loading