diff --git a/crates/core/src/delta_datafusion/find_files.rs b/crates/core/src/delta_datafusion/find_files.rs index 509691efd3..a6a3faac88 100644 --- a/crates/core/src/delta_datafusion/find_files.rs +++ b/crates/core/src/delta_datafusion/find_files.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; +use arrow::compute::concat_batches; use arrow_array::{Array, RecordBatch, StringArray}; use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema}; use datafusion::catalog::Session; @@ -12,6 +13,7 @@ use datafusion::logical_expr::{col, Expr, Volatility}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::ExecutionPlan; +use futures::TryStreamExt; use itertools::Itertools; use tracing::*; @@ -289,7 +291,13 @@ async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaR .map(|f| f.add_action()) .collect_vec(); - let batch = snapshot.add_actions_table(true)?; + let batches: Vec = snapshot + .add_actions_table(true) + .try_collect() + .await?; + + let batch = concat_batches(&batches[0].schema(), &batches)?; + let mut arrays = Vec::new(); let mut fields = Vec::new(); diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index e6599dc1d5..b8b5b98487 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -97,6 +97,7 @@ pub mod writer; use std::collections::HashMap; use std::sync::OnceLock; use url::Url; +use futures::TryStreamExt; pub use self::data_catalog::{DataCatalog, DataCatalogError}; pub use self::errors::*; @@ -337,7 +338,8 @@ mod tests { ); assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2); - let stats = table.snapshot().unwrap().add_actions_table(true).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap(); + let stats = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let num_records = stats.column_by_name("num_records").unwrap(); let num_records = num_records diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index d4b7bb8b28..b9ffd8abda 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::mem::take; use std::str::FromStr; +use futures::TryStreamExt; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -739,7 +740,9 @@ mod tests { let table_uri = Url::from_directory_path(std::fs::canonicalize(Path::new(path)).unwrap()).unwrap(); let table = crate::open_table(table_uri).await.unwrap(); - let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); + + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ ("path", Arc::new(array::StringArray::from(vec![ @@ -783,7 +786,8 @@ mod tests { let path = "../test/tests/data/table_with_deletion_logs"; let table_uri = Url::from_directory_path(Path::new(path)).unwrap(); let table = crate::open_table(table_uri).await.unwrap(); - let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let actions = actions .project(&[ @@ -841,7 +845,8 @@ mod tests { assert_eq!(expected, actions); - let actions = table.snapshot().unwrap().add_actions_table(false).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(false).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let actions = actions .project(&[ @@ -892,7 +897,8 @@ mod tests { Url::from_directory_path(std::fs::canonicalize(Path::new(path)).unwrap()).unwrap(); let table = crate::open_table(table_uri).await.unwrap(); - let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ @@ -969,7 +975,8 @@ mod tests { let path = "../test/tests/data/table_with_column_mapping"; let table_uri = Url::from_directory_path(Path::new(path)).unwrap(); let table = crate::open_table(table_uri).await.unwrap(); - let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ ( "path", @@ -1045,7 +1052,8 @@ mod tests { let table_uri = Url::from_directory_path(std::fs::canonicalize(Path::new(path)).unwrap()).unwrap(); let table = crate::open_table(table_uri).await.unwrap(); - let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ @@ -1087,7 +1095,8 @@ mod tests { Url::from_directory_path(std::fs::canonicalize(Path::new(path)).unwrap()).unwrap(); let mut table = crate::open_table(table_uri).await.unwrap(); table.load().await.unwrap(); - let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); // get column-0 path, and column-4 num_records, and column_5 null_count.integer let expected_path: ArrayRef = Arc::new(array::StringArray::from(vec![ @@ -1137,7 +1146,8 @@ mod tests { let mut table = crate::open_table(table_uri).await.unwrap(); table.load_version(1).await.unwrap(); - let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ ( @@ -1306,7 +1316,8 @@ mod tests { ); assert_eq!(expected, actions); - let actions = table.snapshot().unwrap().add_actions_table(false).unwrap(); + let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(false).try_collect().await.unwrap(); + let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); // For brevity, just checking a few nested columns in stats assert_eq!( diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index a648784d01..820e175e16 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -9,12 +9,12 @@ use delta_kernel::expressions::column_expr_ref; use delta_kernel::schema::{SchemaRef as KernelSchemaRef, StructField}; use delta_kernel::table_properties::TableProperties; use delta_kernel::{EvaluationHandler, Expression}; -use futures::stream::BoxStream; +use futures::stream::{self, BoxStream}; use futures::{future::ready, StreamExt as _, TryStreamExt as _}; use object_store::path::Path; use serde::{Deserialize, Serialize}; -use super::DeltaTableConfig; +use super::{builder, DeltaTableConfig}; use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, SnapshotExt}; #[cfg(test)] use crate::kernel::Action; @@ -22,10 +22,10 @@ use crate::kernel::{ Add, DataType, EagerSnapshot, LogDataHandler, LogicalFileView, Metadata, Protocol, TombstoneView, ARROW_HANDLER, }; -use crate::logstore::LogStore; +use crate::logstore::{LogStore, LogStoreRef}; use crate::partitions::PartitionFilter; use crate::table::config::TablePropertiesExt; -use crate::{DeltaResult, DeltaTableError}; +use crate::{open_table, DeltaResult, DeltaTableError}; /// State snapshot currently held by the Delta Table instance. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -266,9 +266,22 @@ impl DeltaTableState { /// partition the file corresponds to. pub fn add_actions_table( &self, + log_store: LogStoreRef, flatten: bool, - ) -> Result { - self.snapshot.add_actions_table(flatten) + ) -> BoxStream<'static, DeltaResult> { + let version = self.version(); + let config = self.load_config().clone(); + + Box::pin(stream::once(async move { + let snapshot = EagerSnapshot::try_new( + log_store.as_ref(), + config, + Some(version) + ).await?; + + Ok::<_, DeltaTableError>(snapshot.add_actions_table(flatten)) + }) + .try_flatten()) } } @@ -302,7 +315,7 @@ impl EagerSnapshot { pub fn add_actions_table( &self, flatten: bool, - ) -> Result { + ) -> BoxStream<'static, DeltaResult> { let mut expressions = vec![ column_expr_ref!("path"), column_expr_ref!("size"), @@ -314,13 +327,21 @@ impl EagerSnapshot { StructField::not_null("modification_time", DataType::LONG), ]; - let stats_schema = self.snapshot().inner.stats_schema()?; - let num_records_field = stats_schema - .field("numRecords") - .ok_or_else(|| DeltaTableError::SchemaMismatch { - msg: "numRecords field not found".to_string(), - })? - .with_name("num_records"); + let stats_schema = match self.snapshot().inner.stats_schema() { + Ok(schema) => schema, + Err(e) => return stream::once(ready(Err(e.into()))).boxed(), + }; + + let num_records_field = match stats_schema.field("numRecords") { + Some(field) => field.with_name("num_records"), + None => { + return stream::once(ready(Err(DeltaTableError::SchemaMismatch { + msg: "numRecords field not found".to_string(), + }))) + .boxed() + } + }; + expressions.push(column_expr_ref!("stats_parsed.numRecords")); fields.push(num_records_field); @@ -343,34 +364,45 @@ impl EagerSnapshot { fields.push(max_values_field); } - if let Some(partition_schema) = self.snapshot().inner.partitions_schema()? { - fields.push(StructField::nullable( - "partition", - DataType::try_struct_type(partition_schema.fields().cloned())?, - )); - expressions.push(column_expr_ref!("partitionValues_parsed")); + if let Some(partition_schema) = self.snapshot().inner.partitions_schema().ok().flatten() { + match DataType::try_struct_type(partition_schema.fields().cloned()) { + Ok(partition_type) => { + fields.push(StructField::nullable("partition", partition_type)); + expressions.push(column_expr_ref!("partitionValues_parsed")); + } + Err(e) => return stream::once(ready(Err(e.into()))).boxed(), + } } let expression = Expression::Struct(expressions); - let table_schema = DataType::try_struct_type(fields)?; + let table_schema = match DataType::try_struct_type(fields) { + Ok(schema) => schema, + Err(e) => return stream::once(ready(Err(e.into()))).boxed(), + }; - let input_schema = self.snapshot().inner.scan_row_parsed_schema_arrow()?; - let input_schema = Arc::new(input_schema.as_ref().try_into_kernel()?); - let evaluator = - ARROW_HANDLER.new_expression_evaluator(input_schema, expression.into(), table_schema); + let input_schema = match self.snapshot().inner.scan_row_parsed_schema_arrow() { + Ok(schema) => schema, + Err(e) => return stream::once(ready(Err(e.into()))).boxed(), + }; - let results = self - .files - .iter() - .map(|file| evaluator.evaluate_arrow(file.clone())) - .collect::, _>>()?; + let input_schema = match input_schema.as_ref().try_into_kernel() { + Ok(schema) => Arc::new(schema), + Err(e) => return stream::once(ready(Err(e.into()))).boxed(), + }; - let result = concat_batches(results[0].schema_ref(), &results)?; + let evaluator = + ARROW_HANDLER.new_expression_evaluator(input_schema, expression.into(), table_schema); - if flatten { - Ok(result.normalize(".", None)?) - } else { - Ok(result) - } + let files = self.files.clone(); // race condition exists here + stream::iter(files) + .map(move |file| { + let batch = evaluator.evaluate_arrow(file)?; + if flatten { + Ok(batch.normalize(".", None)?) + } else { + Ok(batch) + } + }) + .boxed() } } diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index a2aad56a98..49ceca38e6 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -12,7 +12,7 @@ from typing import ( from arro3.core import DataType as ArrowDataType from arro3.core import Field as ArrowField -from arro3.core import RecordBatch, RecordBatchReader +from arro3.core import RecordBatchReader from arro3.core import Schema as ArrowSchema from arro3.core.types import ArrowSchemaExportable @@ -191,7 +191,7 @@ class RawDeltaTable: partition_filters: FilterConjunctionType | None, ) -> list[Any]: ... def create_checkpoint(self) -> None: ... - def get_add_actions(self, flatten: bool) -> RecordBatch: ... + def get_add_actions(self, flatten: bool) -> RecordBatchReader: ... def delete( self, predicate: str | None, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 94289dadef..5cd8627065 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -15,7 +15,7 @@ Union, ) -from arro3.core import RecordBatch, RecordBatchReader +from arro3.core import RecordBatchReader from arro3.core.types import ( ArrowArrayExportable, ArrowSchemaExportable, @@ -542,7 +542,9 @@ def count(self) -> int: """ total_rows = 0 - for value in self.get_add_actions().column("num_records").to_pylist(): + for value in ( + self.get_add_actions().read_all().column("num_records").to_pylist() + ): # Add action file statistics are optional and so while most modern # tables are _likely_ to have this information it is not # guaranteed. @@ -1018,7 +1020,7 @@ def _stringify_partition_values( out.append((field, op, str_value)) return out - def get_add_actions(self, flatten: bool = False) -> RecordBatch: + def get_add_actions(self, flatten: bool = False) -> RecordBatchReader: """ Return a dataframe with all current add actions. diff --git a/python/src/lib.rs b/python/src/lib.rs index 3679511810..ab70f7ce08 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -17,7 +17,7 @@ use datafusion_ffi::table_provider::FFI_TableProvider; use delta_kernel::expressions::Scalar; use delta_kernel::schema::{MetadataValue, StructField}; use delta_kernel::table_properties::DataSkippingNumIndexedCols; -use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; +use deltalake::arrow::{self, array::RecordBatch, datatypes::Schema as ArrowSchema}; use deltalake::checkpoints::{cleanup_metadata, create_checkpoint}; use deltalake::datafusion::catalog::TableProvider; use deltalake::datafusion::datasource::provider_as_source; @@ -60,7 +60,7 @@ use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::table::config::TablePropertiesExt as _; use deltalake::table::state::DeltaTableState; use deltalake::{init_client_version, DeltaOps, DeltaResult, DeltaTableBuilder}; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::pybacked::PyBackedStr; use pyo3::types::{PyCapsule, PyDict, PyFrozenSet}; @@ -86,7 +86,7 @@ use crate::features::TableFeatures; use crate::filesystem::FsConfig; use crate::merge::PyMergeBuilder; use crate::query::PyQueryBuilder; -use crate::reader::convert_stream_to_reader; +use crate::reader::{convert_boxstream_to_reader, convert_stream_to_reader}; use crate::schema::{schema_to_pyobject, Field}; use crate::utils::rt; use crate::writer::to_lazy_table; @@ -1435,18 +1435,38 @@ impl RawDeltaTable { Ok(()) } - pub fn get_add_actions(&self, flatten: bool) -> PyResult { - // replace with Arro3RecordBatch once new release is done for arro3.core + pub fn get_add_actions(&self, flatten: bool) -> PyResult { if !self.has_files()? { return Err(DeltaError::new_err("Table is instantiated without files.")); } - let batch = self.with_table(|t| { - Ok(t.snapshot() - .map_err(PythonError::from)? - .add_actions_table(flatten) - .map_err(PythonError::from)?) - })?; - Ok(batch.into()) + + self.with_table(|t| { + let log_store = t.log_store(); + let snapshot = t.snapshot() + .map_err(PythonError::from) + .map_err(PyErr::from)?; + + let mut stream = snapshot.add_actions_table(log_store, flatten); + + // peek at first batch to get the schema + let first_batch = rt() + .block_on(async { stream.try_next().await }) + .map_err(PythonError::from) + .map_err(PyErr::from)?; + + let schema = match &first_batch { + Some(batch) => batch.schema(), + None => return Err(DeltaError::new_err("No add actions found").into()), + }; + + // chain it back to the stream + let full_stream = futures::stream::iter(first_batch.into_iter().map(Ok)) + .chain(stream) + .boxed(); + + let reader = convert_boxstream_to_reader(full_stream, schema); + Ok(reader.into()) + }) } pub fn get_add_file_sizes(&self) -> PyResult> { diff --git a/python/src/reader.rs b/python/src/reader.rs index 3ae3a8617a..d2c7c600b4 100644 --- a/python/src/reader.rs +++ b/python/src/reader.rs @@ -2,6 +2,8 @@ use arrow_schema::{ArrowError, SchemaRef}; use deltalake::arrow::array::RecordBatchReader; use deltalake::arrow::record_batch::RecordBatch; use deltalake::datafusion::execution::RecordBatchStream; +use deltalake::DeltaResult; +use futures::stream::BoxStream; use futures::StreamExt; use std::pin::Pin; @@ -37,3 +39,36 @@ pub(crate) fn convert_stream_to_reader( stream, }) } + +/// A lazy adapter to convert a BoxStream of RecordBatches into a sync RecordBatchReader +struct BoxStreamToReaderAdapter { + schema: SchemaRef, + stream: Pin>>>, +} + +impl Iterator for BoxStreamToReaderAdapter { + type Item = Result; + + fn next(&mut self) -> Option { + rt().block_on(self.stream.next()) + .map(|b| b.map_err(|e| ArrowError::ExternalError(Box::new(e)))) + } +} + +impl RecordBatchReader for BoxStreamToReaderAdapter { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// Converts a BoxStream of RecordBatches into a lazy RecordBatchReader +pub(crate) fn convert_boxstream_to_reader( + stream: BoxStream<'static, DeltaResult>, + schema: SchemaRef, +) -> Box { + Box::new(BoxStreamToReaderAdapter { + schema, + stream: Box::pin(stream), + }) +} + diff --git a/python/tests/test_delete.py b/python/tests/test_delete.py index db7ca6b74a..4bddf7b01b 100644 --- a/python/tests/test_delete.py +++ b/python/tests/test_delete.py @@ -97,7 +97,7 @@ def test_delete_stats_columns_stats_provided(tmp_path: pathlib.Path): configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, ) dt = DeltaTable(tmp_path) - add_actions_table = dt.get_add_actions(flatten=True) + add_actions_table = dt.get_add_actions(flatten=True).read_all() def get_value(name: str): return add_actions_table.column(name)[0].as_py() @@ -116,7 +116,7 @@ def get_value(name: str): dt.delete("bar == 3") dt = DeltaTable(tmp_path) - add_actions_table = dt.get_add_actions(flatten=True) + add_actions_table = dt.get_add_actions(flatten=True).read_all() assert dt.version() == 1 diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 0e8fb9918d..a4c760500d 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -1865,7 +1865,7 @@ def test_merge_stats_columns_stats_provided(tmp_path: pathlib.Path, streaming: b configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, ) dt = DeltaTable(tmp_path) - add_actions_table = dt.get_add_actions(flatten=True) + add_actions_table = dt.get_add_actions(flatten=True).read_all() def get_value(name: str): return add_actions_table.column(name)[0].as_py() @@ -1907,7 +1907,7 @@ def get_value(name: str): ).when_matched_update_all().execute() dt = DeltaTable(tmp_path) - add_actions_table = dt.get_add_actions(flatten=True) + add_actions_table = dt.get_add_actions(flatten=True).read_all() assert dt.version() == 1 @@ -2220,7 +2220,7 @@ def test_merge_when_wrong_but_castable_type_passed_while_merge( ).when_not_matched_insert_all().execute() table_schema = pq.read_table( - tmp_path / dt.get_add_actions().column(0)[0].as_py() + tmp_path / dt.get_add_actions().read_all().column(0)[0].as_py() ).schema assert table_schema.field("price").type == sample_table["price"].type diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index cc1bcffd13..2c48fe266a 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -225,7 +225,7 @@ def test_read_simple_table_update_incremental(): def test_read_simple_table_file_sizes_failure(mocker): table_path = "../crates/test/tests/data/simple_table" dt = DeltaTable(table_path) - add_actions = dt.get_add_actions() + add_actions = dt.get_add_actions().read_all() # set all sizes to -1, the idea is to break the reading, to check # that input file sizes are actually used @@ -496,7 +496,7 @@ def test_add_actions_table(flatten: bool): table_path = "../crates/test/tests/data/delta-0.8.0-partitioned" dt = DeltaTable(table_path) - actions_df = dt.get_add_actions(flatten) + actions_df = dt.get_add_actions(flatten).read_all() # RecordBatch doesn't have a sort_by method yet actions_df = pa.table(actions_df).sort_by("path").to_batches()[0] diff --git a/python/tests/test_update.py b/python/tests/test_update.py index 72a67da2bf..48ca6fbb6c 100644 --- a/python/tests/test_update.py +++ b/python/tests/test_update.py @@ -296,7 +296,7 @@ def test_update_stats_columns_stats_provided(tmp_path: pathlib.Path): configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, ) dt = DeltaTable(tmp_path) - add_actions_table = dt.get_add_actions(flatten=True) + add_actions_table = dt.get_add_actions(flatten=True).read_all() def get_value(name: str): return add_actions_table.column(name)[0].as_py() @@ -315,7 +315,7 @@ def get_value(name: str): dt.update({"foo": "'hello world'"}) dt = DeltaTable(tmp_path) - add_actions_table = dt.get_add_actions(flatten=True) + add_actions_table = dt.get_add_actions(flatten=True).read_all() def get_value(name: str): return add_actions_table.column(name)[0].as_py() diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 6ebdff1251..5b15b3e4e9 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1658,7 +1658,7 @@ def test_float_values(tmp_path: pathlib.Path): == data ) - actions = dt.get_add_actions() + actions = dt.get_add_actions().read_all() def get_value(name: str): return actions.column(name)[0].as_py() @@ -1807,7 +1807,7 @@ def test_rust_decimal_cast(tmp_path: pathlib.Path): def test_write_stats_column_idx(tmp_path: pathlib.Path): def _check_stats(dt: DeltaTable): - add_actions_table = dt.get_add_actions(flatten=True) + add_actions_table = dt.get_add_actions(flatten=True).read_all() def get_value(name: str): return add_actions_table.column(name)[0].as_py() @@ -1859,7 +1859,7 @@ def get_value(name: str): def test_write_stats_columns_stats_provided(tmp_path: pathlib.Path): def _check_stats(dt: DeltaTable): - add_actions_table = dt.get_add_actions(flatten=True) + add_actions_table = dt.get_add_actions(flatten=True).read_all() def get_value(name: str): return add_actions_table.column(name)[0].as_py() @@ -2416,7 +2416,7 @@ def test_write_binary_col_without_dssc(tmp_path: pathlib.Path): assert len(df_with_bin_col.rows()) == 4 dt = DeltaTable(tmp_path) - stats = dt.get_add_actions(flatten=True) + stats = dt.get_add_actions(flatten=True).read_all() assert stats["null_count.x"].to_pylist() == [None] assert stats["min.x"].to_pylist() == [None] assert stats["max.x"].to_pylist() == [None] @@ -2451,7 +2451,7 @@ def test_write_binary_col_with_dssc(tmp_path: pathlib.Path): assert len(df_with_bin_col.rows()) == 4 dt = DeltaTable(tmp_path) - stats = dt.get_add_actions(flatten=True) + stats = dt.get_add_actions(flatten=True).read_all() assert stats["null_count.x"].to_pylist() == [None] assert stats["min.x"].to_pylist() == [None] assert stats["max.x"].to_pylist() == [None]