Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion crates/core/src/delta_datafusion/find_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::fmt::Debug;
use std::sync::Arc;

use arrow::compute::concat_batches;
use arrow_array::{Array, RecordBatch, StringArray};
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema};
use datafusion::catalog::Session;
Expand All @@ -12,6 +13,7 @@
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::ExecutionPlan;
use futures::TryStreamExt;
use itertools::Itertools;
use tracing::*;

Expand Down Expand Up @@ -286,10 +288,16 @@
let actions = snapshot
.log_data()
.iter()
.map(|f| f.add_action())

Check warning on line 291 in crates/core/src/delta_datafusion/find_files.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/delta_datafusion/find_files.rs
.collect_vec();

let batch = snapshot.add_actions_table(true)?;
let batches: Vec<RecordBatch> = snapshot
.add_actions_table(true)
.try_collect()
.await?;

let batch = concat_batches(&batches[0].schema(), &batches)?;

let mut arrays = Vec::new();
let mut fields = Vec::new();

Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@
pub mod test_utils;

#[cfg(feature = "datafusion")]
pub mod delta_datafusion;

Check warning on line 94 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/lib.rs
pub mod writer;

use std::collections::HashMap;
use std::sync::OnceLock;
use url::Url;
use futures::TryStreamExt;

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/lib.rs

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / cloud (aws)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 100 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / build-deploy

unused import: `futures::TryStreamExt`

pub use self::data_catalog::{DataCatalog, DataCatalogError};
pub use self::errors::*;
Expand Down Expand Up @@ -334,10 +335,11 @@
Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
]
);

Check warning on line 338 in crates/core/src/lib.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/lib.rs
assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);

let stats = table.snapshot().unwrap().add_actions_table(true).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap();
let stats = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();

let num_records = stats.column_by_name("num_records").unwrap();
let num_records = num_records
Expand Down
29 changes: 20 additions & 9 deletions crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Actions included in Delta table transaction logs

Check warning on line 2 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/protocol/mod.rs
#![allow(non_camel_case_types)]

use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::mem::take;

Check warning on line 8 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/protocol/mod.rs
use std::str::FromStr;
use futures::TryStreamExt;

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (aws)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `futures::TryStreamExt`

Check warning on line 10 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / build-deploy

unused import: `futures::TryStreamExt`

use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -736,10 +737,12 @@
async fn test_with_partitions() {
// test table with partitions
let path = "../test/tests/data/delta-0.8.0-null-partition";
let table_uri =

Check warning on line 740 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/protocol/mod.rs
Url::from_directory_path(std::fs::canonicalize(Path::new(path)).unwrap()).unwrap();
let table = crate::open_table(table_uri).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();

let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap();

Check failure on line 744 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

no method named `try_collect` found for struct `Pin<Box<dyn Stream<Item = Result<RecordBatch, ...>> + Send>>` in the current scope

Check failure on line 744 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

this method takes 2 arguments but 1 argument was supplied
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();

let expected_columns: Vec<(&str, ArrayRef)> = vec![
("path", Arc::new(array::StringArray::from(vec![
Expand Down Expand Up @@ -780,10 +783,11 @@
#[ignore = "enable when deletion vector is supported"]
async fn test_with_deletion_vector() {
// test table with partitions
let path = "../test/tests/data/table_with_deletion_logs";

Check warning on line 786 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/protocol/mod.rs
let table_uri = Url::from_directory_path(Path::new(path)).unwrap();
let table = crate::open_table(table_uri).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap();

Check failure on line 789 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

no method named `try_collect` found for struct `Pin<Box<dyn Stream<Item = Result<RecordBatch, ...>> + Send>>` in the current scope

Check failure on line 789 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

this method takes 2 arguments but 1 argument was supplied
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let actions = actions
.project(&[
Expand Down Expand Up @@ -838,10 +842,11 @@
),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();

Check warning on line 845 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/protocol/mod.rs
assert_eq!(expected, actions);

let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(false).try_collect().await.unwrap();

Check failure on line 848 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

no method named `try_collect` found for struct `Pin<Box<dyn Stream<Item = Result<RecordBatch, ...>> + Send>>` in the current scope

Check failure on line 848 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

this method takes 2 arguments but 1 argument was supplied
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let actions = actions
.project(&[
Expand Down Expand Up @@ -889,10 +894,11 @@
// test table without partitions
let path = "../test/tests/data/simple_table";
let table_uri =
Url::from_directory_path(std::fs::canonicalize(Path::new(path)).unwrap()).unwrap();

Check warning on line 897 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/protocol/mod.rs
let table = crate::open_table(table_uri).await.unwrap();

let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap();

Check failure on line 900 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

no method named `try_collect` found for struct `Pin<Box<dyn Stream<Item = Result<RecordBatch, ...>> + Send>>` in the current scope

Check failure on line 900 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

this method takes 2 arguments but 1 argument was supplied
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();

let expected_columns: Vec<(&str, ArrayRef)> = vec![
Expand Down Expand Up @@ -969,7 +975,8 @@
let path = "../test/tests/data/table_with_column_mapping";
let table_uri = Url::from_directory_path(Path::new(path)).unwrap();
let table = crate::open_table(table_uri).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap();

Check failure on line 978 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

no method named `try_collect` found for struct `Pin<Box<dyn Stream<Item = Result<RecordBatch, ...>> + Send>>` in the current scope

Check failure on line 978 in crates/core/src/protocol/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

this method takes 2 arguments but 1 argument was supplied
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Expand Down Expand Up @@ -1045,7 +1052,8 @@
let table_uri =
Url::from_directory_path(std::fs::canonicalize(Path::new(path)).unwrap()).unwrap();
let table = crate::open_table(table_uri).await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap();
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();

let expected_columns: Vec<(&str, ArrayRef)> = vec![
Expand Down Expand Up @@ -1087,7 +1095,8 @@
Url::from_directory_path(std::fs::canonicalize(Path::new(path)).unwrap()).unwrap();
let mut table = crate::open_table(table_uri).await.unwrap();
table.load().await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap();
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
// get column-0 path, and column-4 num_records, and column_5 null_count.integer
let expected_path: ArrayRef = Arc::new(array::StringArray::from(vec![
Expand Down Expand Up @@ -1137,7 +1146,8 @@
let mut table = crate::open_table(table_uri).await.unwrap();
table.load_version(1).await.unwrap();

let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(true).try_collect().await.unwrap();
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();

let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
Expand Down Expand Up @@ -1306,7 +1316,8 @@
);
assert_eq!(expected, actions);

let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
let batches: Vec<_> = table.snapshot().unwrap().add_actions_table(false).try_collect().await.unwrap();
let actions = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
// For brevity, just checking a few nested columns in stats

assert_eq!(
Expand Down
104 changes: 68 additions & 36 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@

use std::sync::Arc;

use arrow::compute::concat_batches;

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (aws)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `arrow::compute::concat_batches`

Check warning on line 5 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build-deploy

unused import: `arrow::compute::concat_batches`
use chrono::Utc;
use delta_kernel::engine::arrow_conversion::TryIntoKernel;
use delta_kernel::expressions::column_expr_ref;
use delta_kernel::schema::{SchemaRef as KernelSchemaRef, StructField};
use delta_kernel::table_properties::TableProperties;
use delta_kernel::{EvaluationHandler, Expression};
use futures::stream::BoxStream;
use futures::stream::{self, BoxStream};
use futures::{future::ready, StreamExt as _, TryStreamExt as _};
use object_store::path::Path;
use serde::{Deserialize, Serialize};

use super::DeltaTableConfig;
use super::{builder, DeltaTableConfig};

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (aws)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `builder`

Check warning on line 17 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build-deploy

unused import: `builder`
use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, SnapshotExt};
#[cfg(test)]
use crate::kernel::Action;
use crate::kernel::{
Add, DataType, EagerSnapshot, LogDataHandler, LogicalFileView, Metadata, Protocol,
TombstoneView, ARROW_HANDLER,
};
use crate::logstore::LogStore;
use crate::logstore::{LogStore, LogStoreRef};
use crate::partitions::PartitionFilter;
use crate::table::config::TablePropertiesExt;
use crate::{DeltaResult, DeltaTableError};
use crate::{open_table, DeltaResult, DeltaTableError};

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (aws)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `open_table`

Check warning on line 28 in crates/core/src/table/state.rs

View workflow job for this annotation

GitHub Actions / build-deploy

unused import: `open_table`

/// State snapshot currently held by the Delta Table instance.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -266,9 +266,22 @@
/// partition the file corresponds to.
pub fn add_actions_table(
&self,
log_store: LogStoreRef,
flatten: bool,
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
self.snapshot.add_actions_table(flatten)
) -> BoxStream<'static, DeltaResult<arrow::record_batch::RecordBatch>> {
let version = self.version();
let config = self.load_config().clone();

Box::pin(stream::once(async move {
let snapshot = EagerSnapshot::try_new(
log_store.as_ref(),
config,
Some(version)
).await?;

Ok::<_, DeltaTableError>(snapshot.add_actions_table(flatten))
})
.try_flatten())
}
}

Expand Down Expand Up @@ -302,7 +315,7 @@
pub fn add_actions_table(
&self,
flatten: bool,
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
) -> BoxStream<'static, DeltaResult<arrow::record_batch::RecordBatch>> {
let mut expressions = vec![
column_expr_ref!("path"),
column_expr_ref!("size"),
Expand All @@ -314,13 +327,21 @@
StructField::not_null("modification_time", DataType::LONG),
];

let stats_schema = self.snapshot().inner.stats_schema()?;
let num_records_field = stats_schema
.field("numRecords")
.ok_or_else(|| DeltaTableError::SchemaMismatch {
msg: "numRecords field not found".to_string(),
})?
.with_name("num_records");
let stats_schema = match self.snapshot().inner.stats_schema() {
Ok(schema) => schema,
Err(e) => return stream::once(ready(Err(e.into()))).boxed(),
};

let num_records_field = match stats_schema.field("numRecords") {
Some(field) => field.with_name("num_records"),
None => {
return stream::once(ready(Err(DeltaTableError::SchemaMismatch {
msg: "numRecords field not found".to_string(),
})))
.boxed()
}
};


expressions.push(column_expr_ref!("stats_parsed.numRecords"));
fields.push(num_records_field);
Expand All @@ -343,34 +364,45 @@
fields.push(max_values_field);
}

if let Some(partition_schema) = self.snapshot().inner.partitions_schema()? {
fields.push(StructField::nullable(
"partition",
DataType::try_struct_type(partition_schema.fields().cloned())?,
));
expressions.push(column_expr_ref!("partitionValues_parsed"));
if let Some(partition_schema) = self.snapshot().inner.partitions_schema().ok().flatten() {
match DataType::try_struct_type(partition_schema.fields().cloned()) {
Ok(partition_type) => {
fields.push(StructField::nullable("partition", partition_type));
expressions.push(column_expr_ref!("partitionValues_parsed"));
}
Err(e) => return stream::once(ready(Err(e.into()))).boxed(),
}
}

let expression = Expression::Struct(expressions);
let table_schema = DataType::try_struct_type(fields)?;
let table_schema = match DataType::try_struct_type(fields) {
Ok(schema) => schema,
Err(e) => return stream::once(ready(Err(e.into()))).boxed(),
};

let input_schema = self.snapshot().inner.scan_row_parsed_schema_arrow()?;
let input_schema = Arc::new(input_schema.as_ref().try_into_kernel()?);
let evaluator =
ARROW_HANDLER.new_expression_evaluator(input_schema, expression.into(), table_schema);
let input_schema = match self.snapshot().inner.scan_row_parsed_schema_arrow() {
Ok(schema) => schema,
Err(e) => return stream::once(ready(Err(e.into()))).boxed(),
};

let results = self
.files
.iter()
.map(|file| evaluator.evaluate_arrow(file.clone()))
.collect::<Result<Vec<_>, _>>()?;
let input_schema = match input_schema.as_ref().try_into_kernel() {
Ok(schema) => Arc::new(schema),
Err(e) => return stream::once(ready(Err(e.into()))).boxed(),
};

let result = concat_batches(results[0].schema_ref(), &results)?;
let evaluator =
ARROW_HANDLER.new_expression_evaluator(input_schema, expression.into(), table_schema);

if flatten {
Ok(result.normalize(".", None)?)
} else {
Ok(result)
}
let files = self.files.clone(); // race condition exists here
stream::iter(files)
.map(move |file| {
let batch = evaluator.evaluate_arrow(file)?;
if flatten {
Ok(batch.normalize(".", None)?)
} else {
Ok(batch)
}
})
.boxed()
}
}
4 changes: 2 additions & 2 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ from typing import (

from arro3.core import DataType as ArrowDataType
from arro3.core import Field as ArrowField
from arro3.core import RecordBatch, RecordBatchReader
from arro3.core import RecordBatchReader
from arro3.core import Schema as ArrowSchema
from arro3.core.types import ArrowSchemaExportable

Expand Down Expand Up @@ -191,7 +191,7 @@ class RawDeltaTable:
partition_filters: FilterConjunctionType | None,
) -> list[Any]: ...
def create_checkpoint(self) -> None: ...
def get_add_actions(self, flatten: bool) -> RecordBatch: ...
def get_add_actions(self, flatten: bool) -> RecordBatchReader: ...
def delete(
self,
predicate: str | None,
Expand Down
8 changes: 5 additions & 3 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Union,
)

from arro3.core import RecordBatch, RecordBatchReader
from arro3.core import RecordBatchReader
from arro3.core.types import (
ArrowArrayExportable,
ArrowSchemaExportable,
Expand Down Expand Up @@ -542,7 +542,9 @@ def count(self) -> int:
"""
total_rows = 0

for value in self.get_add_actions().column("num_records").to_pylist():
for value in (
self.get_add_actions().read_all().column("num_records").to_pylist()
):
# Add action file statistics are optional and so while most modern
# tables are _likely_ to have this information it is not
# guaranteed.
Expand Down Expand Up @@ -1018,7 +1020,7 @@ def _stringify_partition_values(
out.append((field, op, str_value))
return out

def get_add_actions(self, flatten: bool = False) -> RecordBatch:
def get_add_actions(self, flatten: bool = False) -> RecordBatchReader:
"""
Return a dataframe with all current add actions.

Expand Down
Loading
Loading