diff --git a/crates/core/src/kernel/arrow/engine_ext.rs b/crates/core/src/kernel/arrow/engine_ext.rs index 6614e5a8e4..e89ee341da 100644 --- a/crates/core/src/kernel/arrow/engine_ext.rs +++ b/crates/core/src/kernel/arrow/engine_ext.rs @@ -41,7 +41,7 @@ pub(crate) struct ScanMetadataArrow { /// - `None`: No transformation is needed; the data is already in the correct logical form. /// /// Note: This vector can be indexed by row number. - pub scan_file_transforms: Vec>, + scan_file_transforms: Vec>, } /// Internal extension traits to the Kernel Snapshot. diff --git a/crates/core/src/operations/generate.rs b/crates/core/src/operations/generate.rs new file mode 100644 index 0000000000..b3baa77261 --- /dev/null +++ b/crates/core/src/operations/generate.rs @@ -0,0 +1,128 @@ +//! +//! The generate supports the fairly simple "GENERATE" operation which produces a +//! [symlink_format_manifest](https://docs.delta.io/delta-utility/#generate-a-manifest-file) file +//! when needed for an external engine such as Presto or BigQuery. +//! +use bytes::{BufMut, BytesMut}; +use futures::future::BoxFuture; +use std::sync::Arc; + +use object_store::path::Path; +use tracing::log::*; + +use super::{CustomExecuteHandler, Operation}; +use crate::kernel::{resolve_snapshot, EagerSnapshot}; +use crate::logstore::object_store::PutPayload; +use crate::logstore::LogStoreRef; +use crate::table::state::DeltaTableState; +use crate::{DeltaResult, DeltaTable}; + +/// Simple builder to generate the manifest +#[derive(Clone)] +pub struct GenerateBuilder { + /// A snapshot of the table state to be generated + snapshot: Option, + log_store: LogStoreRef, + custom_execute_handler: Option>, +} + +impl GenerateBuilder { + /// Create a new [GenerateBuilder] + /// + /// Currently only one mode is supported: [GenerateMode::SymlinkFormatManifest] + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { + Self { + snapshot, + log_store, + custom_execute_handler: None, + } + } +} + +impl super::Operation for GenerateBuilder { + fn log_store(&self) -> &LogStoreRef { + &self.log_store + } + fn get_custom_execute_handler(&self) -> Option> { + self.custom_execute_handler.clone() + } +} + +impl std::future::IntoFuture for GenerateBuilder { + type Output = DeltaResult; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + Box::pin(async move { + let snapshot = resolve_snapshot(this.log_store(), this.snapshot.clone(), true).await?; + let mut payload = BytesMut::new(); + + for add in this + .snapshot + .clone() + .expect("A GenerateBuilder with no snapshot is not a valid state!") + .log_data() + .into_iter() + { + let uri = this.log_store().to_uri(&add.object_store_path()); + trace!("Prepare {uri} for the symlink_format_manifest"); + payload.put(uri.as_bytes()); + payload.put_u8(b'\n'); + } + debug!("Generate manifest {} bytes prepared", payload.len()); + let payload = PutPayload::from(payload.freeze()); + this.log_store() + .object_store(None) + .put(&Path::from("_symlink_format_manifest/manifest"), payload) + .await?; + Ok(DeltaTable::new_with_state( + this.log_store().clone(), + DeltaTableState::new(snapshot), + )) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::kernel::schema::{DataType, PrimitiveType}; + use crate::kernel::Action; + use crate::DeltaOps; + + use futures::StreamExt; + + #[tokio::test] + async fn test_generate() -> DeltaResult<()> { + use crate::kernel::Add; + let actions = vec![Action::Add(Add { + path: "some-files.parquet".into(), + ..Default::default() + })]; + let table = DeltaOps::new_in_memory() + .create() + .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None) + .with_actions(actions) + .await?; + + let generate = GenerateBuilder::new(table.log_store(), table.state.map(|s| s.snapshot)); + let table = generate.await?; + + let store = table.log_store().object_store(None); + let mut stream = store.list(None); + let mut found = false; + while let Some(meta) = stream.next().await.transpose().unwrap() { + // Printing out the files so the failed assertion below will include the actual + // contents of the table's prefix in the log + println!("Name: {}, size: {}", meta.location, meta.size); + if meta.location == Path::from("_symlink_format_manifest/manifest") { + found = true; + break; + } + } + assert!(found, "The _symlink_format_manifest/manifest was not found in the Delta table's object store prefix"); + Ok(()) + } +} diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 20284447a6..e122ce5584 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -33,6 +33,7 @@ use self::{ }; use crate::errors::{DeltaResult, DeltaTableError}; use crate::logstore::LogStoreRef; +use crate::operations::generate::GenerateBuilder; use crate::table::builder::{ensure_table_uri, DeltaTableBuilder}; use crate::table::config::{TablePropertiesExt as _, DEFAULT_NUM_INDEX_COLS}; use crate::DeltaTable; @@ -43,6 +44,7 @@ pub mod convert_to_delta; pub mod create; pub mod drop_constraints; pub mod filesystem_check; +pub mod generate; pub mod restore; pub mod update_field_metadata; pub mod update_table_metadata; @@ -222,6 +224,11 @@ impl DeltaOps { CreateBuilder::default().with_log_store(self.0.log_store) } + /// Generate a symlink_format_manifest for other engines + pub fn generate(self) -> GenerateBuilder { + GenerateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) + } + /// Load data from a DeltaTable #[cfg(feature = "datafusion")] #[must_use] diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d5e2f4e12f..903db36698 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -507,6 +507,15 @@ def protocol(self) -> ProtocolVersions: """ return ProtocolVersions(*self._table.protocol_versions()) + def generate(self) -> None: + """ + Generate symlink manifest for engines that cannot read native Delta Lake tables. + + Creates symbolic links to Delta table data files, enabling compatibility with + processing engines that require direct file access instead of Delta protocol support. + """ + return self._table.generate() + def history(self, limit: int | None = None) -> list[dict[str, Any]]: """ Run the history command on the DeltaTable. diff --git a/python/src/lib.rs b/python/src/lib.rs index 02e6ad67f9..8dc9158bfa 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -848,6 +848,14 @@ impl RawDeltaTable { Ok(()) } + #[pyo3()] + pub fn generate(&self, _py: Python) -> PyResult<()> { + let table = self._table.lock().map_err(to_rt_err)?.clone(); + rt().block_on(async { DeltaOps(table).generate().await }) + .map_err(PythonError::from)?; + Ok(()) + } + #[pyo3(signature = ( starting_version = None, ending_version = None,