Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ def files(
("z", "not in", ["a","b"])
```
"""
warnings.warn(
"Method `files` is deprecated, Use DeltaTable.file_uris(predicate) instead.",
category=DeprecationWarning,
stacklevel=2,
)
return self._table.files(self._stringify_partition_values(partition_filters))

def file_uris(
Expand Down Expand Up @@ -467,13 +472,17 @@ def schema(self) -> DeltaSchema:
"""
return self._table.schema

@deprecated(
version="1.2.1",
reason="Not compatible with modern Delta features (e.g. shallow clones). Use `file_uris` instead.",
)
def files_by_partitions(self, partition_filters: PartitionFilterType) -> list[str]:
"""
Get the files for each partition

"""
warnings.warn(
"files_by_partitions is deprecated, please use DeltaTable.files() instead.",
"Method `files_by_partitions` is deprecated, please use DeltaTable.file_uris() instead.",
category=DeprecationWarning,
stacklevel=2,
)
Expand Down Expand Up @@ -882,6 +891,24 @@ def to_pyarrow_dataset(
"but these are not yet supported by the deltalake reader."
)

if (
table_protocol.reader_features
and "columnMapping" in table_protocol.reader_features
):
raise DeltaProtocolError(
"The table requires reader feature 'columnMapping' "
"but this is not supported using pyarrow Datasets."
)

if (
table_protocol.reader_features
and "deletionVectors" in table_protocol.reader_features
):
raise DeltaProtocolError(
"The table requires reader feature 'deletionVectors' "
"but this is not supported using pyarrow Datasets."
)

import pyarrow
import pyarrow.fs as pa_fs

Expand Down
159 changes: 122 additions & 37 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,13 @@ struct RawDeltaTable {
_config: FsConfig,
}

#[pyclass(frozen)]
#[pyclass(frozen, get_all)]
struct RawDeltaTableMetaData {
#[pyo3(get)]
id: String,
#[pyo3(get)]
name: Option<String>,
#[pyo3(get)]
description: Option<String>,
#[pyo3(get)]
partition_columns: Vec<String>,
#[pyo3(get)]
created_time: Option<i64>,
#[pyo3(get)]
configuration: HashMap<String, String>,
}

Expand Down Expand Up @@ -481,8 +475,15 @@ impl RawDeltaTable {

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced
/// by the Delta table and are older than the retention threshold.
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true,
commit_properties=None, post_commithook_properties=None, full = false, keep_versions = None))]
#[pyo3(signature = (
dry_run,
retention_hours = None,
enforce_retention_duration = true,
commit_properties=None,
post_commithook_properties=None,
full = false,
keep_versions = None,
))]
#[allow(clippy::too_many_arguments)]
pub fn vacuum(
&self,
Expand Down Expand Up @@ -539,7 +540,14 @@ impl RawDeltaTable {
}

/// Run the UPDATE command on the Delta Table
#[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, commit_properties = None, post_commithook_properties=None))]
#[pyo3(signature = (
updates,
predicate=None,
writer_properties=None,
safe_cast = false,
commit_properties = None,
post_commithook_properties=None,
))]
#[allow(clippy::too_many_arguments)]
pub fn update(
&self,
Expand Down Expand Up @@ -659,7 +667,8 @@ impl RawDeltaTable {
min_commit_interval = None,
writer_properties=None,
commit_properties=None,
post_commithook_properties=None))]
post_commithook_properties=None,
))]
pub fn z_order_optimize(
&self,
py: Python,
Expand Down Expand Up @@ -750,7 +759,9 @@ impl RawDeltaTable {
Ok(())
}

#[pyo3(signature = (feature, allow_protocol_versions_increase, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (
feature, allow_protocol_versions_increase, commit_properties=None, post_commithook_properties=None
))]
pub fn add_feature(
&self,
py: Python,
Expand Down Expand Up @@ -847,7 +858,15 @@ impl RawDeltaTable {
Ok(())
}

#[pyo3(signature = (starting_version = None, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, predicate = None, allow_out_of_range = false))]
#[pyo3(signature = (
starting_version = None,
ending_version = None,
starting_timestamp = None,
ending_timestamp = None,
columns = None,
predicate = None,
allow_out_of_range = false,
))]
#[allow(clippy::too_many_arguments)]
pub fn load_cdf(
&self,
Expand Down Expand Up @@ -988,7 +1007,9 @@ impl RawDeltaTable {
}

// Run the restore command on the Delta Table: restore table to a given version or datetime
#[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None))]
#[pyo3(signature = (
target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None
))]
pub fn restore(
&self,
target: Option<&Bound<'_, PyAny>>,
Expand Down Expand Up @@ -1028,7 +1049,8 @@ impl RawDeltaTable {
Ok(serde_json::to_string(&metrics).unwrap())
}

/// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
/// Run the History command on the Delta Table: Returns provenance information,
/// including the operation, user, and so on, for each write to a table.
#[pyo3(signature = (limit=None))]
pub fn history(&self, limit: Option<usize>) -> PyResult<Vec<String>> {
#[allow(clippy::await_holding_lock)]
Expand Down Expand Up @@ -1220,7 +1242,15 @@ impl RawDeltaTable {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (add_actions, mode, partition_by, schema, partitions_filters=None, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (
add_actions,
mode,
partition_by,
schema,
partitions_filters=None,
commit_properties=None,
post_commithook_properties=None,
))]
fn create_write_transaction(
&self,
py: Python,
Expand Down Expand Up @@ -1467,7 +1497,9 @@ impl RawDeltaTable {
}

/// Run the delete command on the delta table: delete records following a predicate and return the delete metrics.
#[pyo3(signature = (predicate = None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (
predicate = None, writer_properties=None, commit_properties=None, post_commithook_properties=None
))]
pub fn delete(
&self,
py: Python,
Expand Down Expand Up @@ -1665,7 +1697,21 @@ impl RawDeltaTable {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (
data,
batch_schema,
mode,
schema_mode=None,
partition_by=None,
predicate=None,
target_file_size=None,
name=None,
description=None,
configuration=None,
writer_properties=None,
commit_properties=None,
post_commithook_properties=None
))]
fn write(
&self,
py: Python,
Expand Down Expand Up @@ -2173,14 +2219,6 @@ fn shutdown_tracing() -> PyResult<()> {
Ok(())
}

fn current_timestamp() -> i64 {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
since_the_epoch.as_millis().try_into().unwrap()
}

#[derive(FromPyObject)]
pub struct PyAddAction {
path: String,
Expand Down Expand Up @@ -2244,14 +2282,11 @@ pub struct PyPostCommitHookProperties {
}

#[derive(Clone)]
#[pyclass(name = "Transaction", module = "deltalake._internal")]
#[pyclass(name = "Transaction", module = "deltalake._internal", get_all)]
pub struct PyTransaction {
#[pyo3(get)]
pub app_id: String,
#[pyo3(get)]
pub version: i64,
#[pyo3(get)]
pub last_updated: Option<i64>,
app_id: String,
version: i64,
last_updated: Option<i64>,
}

#[pymethods]
Expand Down Expand Up @@ -2306,7 +2341,23 @@ pub struct PyCommitProperties {

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (
table_uri,
data,
batch_schema,
mode,
schema_mode=None,
partition_by=None,
predicate=None,
target_file_size=None,
name=None,
description=None,
configuration=None,
storage_options=None,
writer_properties=None,
commit_properties=None,
post_commithook_properties=None,
))]
fn write_to_deltalake(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -2365,7 +2416,19 @@ fn write_to_deltalake(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, schema, partition_by, mode, raise_if_key_not_exists, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (
table_uri,
schema,
partition_by,
mode,
raise_if_key_not_exists,
name=None,
description=None,
configuration=None,
storage_options=None,
commit_properties=None,
post_commithook_properties=None,
))]
fn create_deltalake(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -2432,7 +2495,19 @@ fn create_deltalake(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, schema, add_actions, mode, partition_by, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (
table_uri,
schema,
add_actions,
mode,
partition_by,
name=None,
description=None,
configuration=None,
storage_options=None,
commit_properties=None,
post_commithook_properties=None
))]
fn create_table_with_add_actions(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -2498,7 +2573,17 @@ fn create_table_with_add_actions(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (uri, partition_schema=None, partition_strategy=None, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))]
#[pyo3(signature = (
uri,
partition_schema=None,
partition_strategy=None,
name=None,
description=None,
configuration=None,
storage_options=None,
commit_properties=None,
post_commithook_properties=None,
))]
fn convert_to_deltalake(
py: Python,
uri: String,
Expand Down
9 changes: 4 additions & 5 deletions python/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use arrow_schema::{ArrowError, SchemaRef};
use deltalake::arrow::array::RecordBatchReader;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::datafusion::execution::RecordBatchStream;
use deltalake::datafusion::execution::SendableRecordBatchStream;
use futures::StreamExt;
use std::pin::Pin;

use crate::utils::rt;

/// A lazy adapter to convert an async RecordBatchStream into a sync RecordBatchReader
struct StreamToReaderAdapter {
schema: SchemaRef,
stream: Pin<Box<dyn RecordBatchStream + Send>>,
stream: SendableRecordBatchStream,
}

impl Iterator for StreamToReaderAdapter {
Expand All @@ -28,9 +27,9 @@ impl RecordBatchReader for StreamToReaderAdapter {
}
}

/// Converts a RecordBatchStream into a lazy RecordBatchReader
/// Converts a [`SendableRecordBatchStream`] into a lazy RecordBatchReader
pub(crate) fn convert_stream_to_reader(
stream: Pin<Box<dyn RecordBatchStream + Send>>,
stream: SendableRecordBatchStream,
) -> Box<dyn RecordBatchReader + Send> {
Box::new(StreamToReaderAdapter {
schema: stream.schema(),
Expand Down
Loading