diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 94289dadef..6f1e722fc1 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -331,6 +331,11 @@ def files( ("z", "not in", ["a","b"]) ``` """ + warnings.warn( + "Method `files` is deprecated, Use DeltaTable.file_uris(predicate) instead.", + category=DeprecationWarning, + stacklevel=2, + ) return self._table.files(self._stringify_partition_values(partition_filters)) def file_uris( @@ -467,13 +472,17 @@ def schema(self) -> DeltaSchema: """ return self._table.schema + @deprecated( + version="1.2.1", + reason="Not compatible with modern Delta features (e.g. shallow clones). Use `file_uris` instead.", + ) def files_by_partitions(self, partition_filters: PartitionFilterType) -> list[str]: """ Get the files for each partition """ warnings.warn( - "files_by_partitions is deprecated, please use DeltaTable.files() instead.", + "Method `files_by_partitions` is deprecated, please use DeltaTable.file_uris() instead.", category=DeprecationWarning, stacklevel=2, ) @@ -882,6 +891,24 @@ def to_pyarrow_dataset( "but these are not yet supported by the deltalake reader." ) + if ( + table_protocol.reader_features + and "columnMapping" in table_protocol.reader_features + ): + raise DeltaProtocolError( + "The table requires reader feature 'columnMapping' " + "but this is not supported using pyarrow Datasets." + ) + + if ( + table_protocol.reader_features + and "deletionVectors" in table_protocol.reader_features + ): + raise DeltaProtocolError( + "The table requires reader feature 'deletionVectors' " + "but this is not supported using pyarrow Datasets." + ) + import pyarrow import pyarrow.fs as pa_fs diff --git a/python/src/lib.rs b/python/src/lib.rs index 3679511810..894973ccff 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -114,19 +114,13 @@ struct RawDeltaTable { _config: FsConfig, } -#[pyclass(frozen)] +#[pyclass(frozen, get_all)] struct RawDeltaTableMetaData { - #[pyo3(get)] id: String, - #[pyo3(get)] name: Option, - #[pyo3(get)] description: Option, - #[pyo3(get)] partition_columns: Vec, - #[pyo3(get)] created_time: Option, - #[pyo3(get)] configuration: HashMap, } @@ -481,8 +475,15 @@ impl RawDeltaTable { /// Run the Vacuum command on the Delta Table: list and delete files no longer referenced /// by the Delta table and are older than the retention threshold. - #[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, - commit_properties=None, post_commithook_properties=None, full = false, keep_versions = None))] + #[pyo3(signature = ( + dry_run, + retention_hours = None, + enforce_retention_duration = true, + commit_properties=None, + post_commithook_properties=None, + full = false, + keep_versions = None, + ))] #[allow(clippy::too_many_arguments)] pub fn vacuum( &self, @@ -539,7 +540,14 @@ impl RawDeltaTable { } /// Run the UPDATE command on the Delta Table - #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, commit_properties = None, post_commithook_properties=None))] + #[pyo3(signature = ( + updates, + predicate=None, + writer_properties=None, + safe_cast = false, + commit_properties = None, + post_commithook_properties=None, + ))] #[allow(clippy::too_many_arguments)] pub fn update( &self, @@ -659,7 +667,8 @@ impl RawDeltaTable { min_commit_interval = None, writer_properties=None, commit_properties=None, - post_commithook_properties=None))] + post_commithook_properties=None, + ))] pub fn z_order_optimize( &self, py: Python, @@ -750,7 +759,9 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (feature, allow_protocol_versions_increase, commit_properties=None, post_commithook_properties=None))] + #[pyo3(signature = ( + feature, allow_protocol_versions_increase, commit_properties=None, post_commithook_properties=None + ))] pub fn add_feature( &self, py: Python, @@ -847,7 +858,15 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (starting_version = None, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, predicate = None, allow_out_of_range = false))] + #[pyo3(signature = ( + starting_version = None, + ending_version = None, + starting_timestamp = None, + ending_timestamp = None, + columns = None, + predicate = None, + allow_out_of_range = false, + ))] #[allow(clippy::too_many_arguments)] pub fn load_cdf( &self, @@ -988,7 +1007,9 @@ impl RawDeltaTable { } // Run the restore command on the Delta Table: restore table to a given version or datetime - #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None))] + #[pyo3(signature = ( + target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None + ))] pub fn restore( &self, target: Option<&Bound<'_, PyAny>>, @@ -1028,7 +1049,8 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } - /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. + /// Run the History command on the Delta Table: Returns provenance information, + /// including the operation, user, and so on, for each write to a table. #[pyo3(signature = (limit=None))] pub fn history(&self, limit: Option) -> PyResult> { #[allow(clippy::await_holding_lock)] @@ -1220,7 +1242,15 @@ impl RawDeltaTable { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (add_actions, mode, partition_by, schema, partitions_filters=None, commit_properties=None, post_commithook_properties=None))] + #[pyo3(signature = ( + add_actions, + mode, + partition_by, + schema, + partitions_filters=None, + commit_properties=None, + post_commithook_properties=None, + ))] fn create_write_transaction( &self, py: Python, @@ -1467,7 +1497,9 @@ impl RawDeltaTable { } /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. - #[pyo3(signature = (predicate = None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] + #[pyo3(signature = ( + predicate = None, writer_properties=None, commit_properties=None, post_commithook_properties=None + ))] pub fn delete( &self, py: Python, @@ -1665,7 +1697,21 @@ impl RawDeltaTable { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] + #[pyo3(signature = ( + data, + batch_schema, + mode, + schema_mode=None, + partition_by=None, + predicate=None, + target_file_size=None, + name=None, + description=None, + configuration=None, + writer_properties=None, + commit_properties=None, + post_commithook_properties=None + ))] fn write( &self, py: Python, @@ -2173,14 +2219,6 @@ fn shutdown_tracing() -> PyResult<()> { Ok(()) } -fn current_timestamp() -> i64 { - let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH) - .expect("Time went backwards"); - since_the_epoch.as_millis().try_into().unwrap() -} - #[derive(FromPyObject)] pub struct PyAddAction { path: String, @@ -2244,14 +2282,11 @@ pub struct PyPostCommitHookProperties { } #[derive(Clone)] -#[pyclass(name = "Transaction", module = "deltalake._internal")] +#[pyclass(name = "Transaction", module = "deltalake._internal", get_all)] pub struct PyTransaction { - #[pyo3(get)] - pub app_id: String, - #[pyo3(get)] - pub version: i64, - #[pyo3(get)] - pub last_updated: Option, + app_id: String, + version: i64, + last_updated: Option, } #[pymethods] @@ -2306,7 +2341,23 @@ pub struct PyCommitProperties { #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (table_uri, data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] +#[pyo3(signature = ( + table_uri, + data, + batch_schema, + mode, + schema_mode=None, + partition_by=None, + predicate=None, + target_file_size=None, + name=None, + description=None, + configuration=None, + storage_options=None, + writer_properties=None, + commit_properties=None, + post_commithook_properties=None, +))] fn write_to_deltalake( py: Python, table_uri: String, @@ -2365,7 +2416,19 @@ fn write_to_deltalake( #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (table_uri, schema, partition_by, mode, raise_if_key_not_exists, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))] +#[pyo3(signature = ( + table_uri, + schema, + partition_by, + mode, + raise_if_key_not_exists, + name=None, + description=None, + configuration=None, + storage_options=None, + commit_properties=None, + post_commithook_properties=None, +))] fn create_deltalake( py: Python, table_uri: String, @@ -2432,7 +2495,19 @@ fn create_deltalake( #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (table_uri, schema, add_actions, mode, partition_by, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))] +#[pyo3(signature = ( + table_uri, + schema, + add_actions, + mode, + partition_by, + name=None, + description=None, + configuration=None, + storage_options=None, + commit_properties=None, + post_commithook_properties=None +))] fn create_table_with_add_actions( py: Python, table_uri: String, @@ -2498,7 +2573,17 @@ fn create_table_with_add_actions( #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (uri, partition_schema=None, partition_strategy=None, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))] +#[pyo3(signature = ( + uri, + partition_schema=None, + partition_strategy=None, + name=None, + description=None, + configuration=None, + storage_options=None, + commit_properties=None, + post_commithook_properties=None, +))] fn convert_to_deltalake( py: Python, uri: String, diff --git a/python/src/reader.rs b/python/src/reader.rs index 3ae3a8617a..6305ab5c39 100644 --- a/python/src/reader.rs +++ b/python/src/reader.rs @@ -1,16 +1,15 @@ use arrow_schema::{ArrowError, SchemaRef}; use deltalake::arrow::array::RecordBatchReader; use deltalake::arrow::record_batch::RecordBatch; -use deltalake::datafusion::execution::RecordBatchStream; +use deltalake::datafusion::execution::SendableRecordBatchStream; use futures::StreamExt; -use std::pin::Pin; use crate::utils::rt; /// A lazy adapter to convert an async RecordBatchStream into a sync RecordBatchReader struct StreamToReaderAdapter { schema: SchemaRef, - stream: Pin>, + stream: SendableRecordBatchStream, } impl Iterator for StreamToReaderAdapter { @@ -28,9 +27,9 @@ impl RecordBatchReader for StreamToReaderAdapter { } } -/// Converts a RecordBatchStream into a lazy RecordBatchReader +/// Converts a [`SendableRecordBatchStream`] into a lazy RecordBatchReader pub(crate) fn convert_stream_to_reader( - stream: Pin>, + stream: SendableRecordBatchStream, ) -> Box { Box::new(StreamToReaderAdapter { schema: stream.schema(),