diff --git a/awswrangler/s3/__init__.py b/awswrangler/s3/__init__.py index 3c53e0034..30b8320f2 100644 --- a/awswrangler/s3/__init__.py +++ b/awswrangler/s3/__init__.py @@ -13,7 +13,7 @@ from awswrangler.s3._select import select_query from awswrangler.s3._upload import upload from awswrangler.s3._wait import wait_objects_exist, wait_objects_not_exist -from awswrangler.s3._write_deltalake import to_deltalake +from awswrangler.s3._write_deltalake import to_deltalake, to_deltalake_streaming from awswrangler.s3._write_excel import to_excel from awswrangler.s3._write_orc import to_orc from awswrangler.s3._write_parquet import store_parquet_metadata, to_parquet @@ -49,6 +49,7 @@ "to_csv", "to_json", "to_deltalake", + "to_deltalake_streaming", "to_excel", "read_excel", "download", diff --git a/awswrangler/s3/_write_deltalake.py b/awswrangler/s3/_write_deltalake.py index bc5e23e26..cfb76056e 100644 --- a/awswrangler/s3/_write_deltalake.py +++ b/awswrangler/s3/_write_deltalake.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Iterable, Iterator, Literal import boto3 import pandas as pd @@ -30,6 +30,7 @@ def _set_default_storage_options_kwargs( defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=boto3_session).items()} defaults["AWS_REGION"] = defaults.pop("REGION_NAME") defaults["AWS_SESSION_TOKEN"] = "" if defaults["AWS_SESSION_TOKEN"] is None else defaults["AWS_SESSION_TOKEN"] + s3_additional_kwargs = s3_additional_kwargs or {} s3_lock_arguments = {} @@ -133,3 +134,97 @@ def to_deltalake( schema_mode=schema_mode, storage_options=storage_options, ) + + +def _df_iter_to_record_batch_reader( + df_iter: Iterable[pd.DataFrame], + *, + index: bool, + dtype: dict[str, str], + target_schema: pa.Schema | None = None, + batch_size: int | None = None, +) -> tuple[pa.RecordBatchReader, pa.Schema]: + it = iter(df_iter) + + first_df: pd.DataFrame | None = None + for df in it: + if not df.empty: + first_df = df + break + + if first_df is None: + empty_schema = pa.schema([]) + empty_reader = pa.RecordBatchReader.from_batches(empty_schema, []) + return empty_reader, empty_schema + + schema = target_schema or _data_types.pyarrow_schema_from_pandas( + df=first_df, index=index, ignore_cols=None, dtype=dtype + ) + + def batches() -> Iterator[pa.RecordBatch]: + first_tbl: pa.Table = _df_to_table(first_df, schema, index, dtype) + for b in first_tbl.to_batches(batch_size) if batch_size is not None else first_tbl.to_batches(): + yield b + + for df in it: + if df.empty: + continue + tbl: pa.Table = _df_to_table(df, schema, index, dtype) + for b in tbl.to_batches(batch_size) if batch_size is not None else tbl.to_batches(): + yield b + + reader = pa.RecordBatchReader.from_batches(schema, batches()) + return reader, schema + + +@_utils.check_optional_dependency(deltalake, "deltalake") +@Experimental +def to_deltalake_streaming( + *, + dfs: Iterable[pd.DataFrame], + path: str, + index: bool = False, + mode: Literal["error", "append", "overwrite", "ignore"] = "append", + dtype: dict[str, str] | None = None, + partition_cols: list[str] | None = None, + schema_mode: Literal["overwrite", "merge"] | None = None, + lock_dynamodb_table: str | None = None, + s3_allow_unsafe_rename: bool = False, + boto3_session: boto3.Session | None = None, + s3_additional_kwargs: dict[str, str] | None = None, + batch_size: int | None = None, + max_open_files: int | None = None, + max_rows_per_file: int | None = None, + target_file_size: int | None = None, +) -> None: + dtype = dtype or {} + + storage_options = _set_default_storage_options_kwargs( + boto3_session=boto3_session, + s3_additional_kwargs=s3_additional_kwargs, + s3_allow_unsafe_rename=s3_allow_unsafe_rename, + lock_dynamodb_table=lock_dynamodb_table, + ) + + reader, schema = _df_iter_to_record_batch_reader( + df_iter=dfs, + index=index, + dtype=dtype, + target_schema=None, + batch_size=batch_size, + ) + + if len(schema) == 0: + return + + deltalake.write_deltalake( + table_or_uri=path, + data=reader, + partition_by=partition_cols, + mode=mode, + schema_mode=schema_mode, + storage_options=storage_options, + max_open_files=max_open_files, + max_rows_per_file=max_rows_per_file, + target_file_size=target_file_size, + )