Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/kernel/arrow/engine_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) struct ScanMetadataArrow {
/// - `None`: No transformation is needed; the data is already in the correct logical form.
///
/// Note: This vector can be indexed by row number.
pub scan_file_transforms: Vec<Option<ExpressionRef>>,
scan_file_transforms: Vec<Option<ExpressionRef>>,
}

/// Internal extension traits to the Kernel Snapshot.
Expand Down
128 changes: 128 additions & 0 deletions crates/core/src/operations/generate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//!
//! The generate supports the fairly simple "GENERATE" operation which produces a
//! [symlink_format_manifest](https://docs.delta.io/delta-utility/#generate-a-manifest-file) file
//! when needed for an external engine such as Presto or BigQuery.
//!
use bytes::{BufMut, BytesMut};
use futures::future::BoxFuture;
use std::sync::Arc;

use object_store::path::Path;
use tracing::log::*;

use super::{CustomExecuteHandler, Operation};
use crate::kernel::{resolve_snapshot, EagerSnapshot};
use crate::logstore::object_store::PutPayload;
use crate::logstore::LogStoreRef;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable};

/// Simple builder to generate the manifest
#[derive(Clone)]
pub struct GenerateBuilder {
/// A snapshot of the table state to be generated
snapshot: Option<EagerSnapshot>,
log_store: LogStoreRef,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

impl GenerateBuilder {
/// Create a new [GenerateBuilder]
///
/// Currently only one mode is supported: [GenerateMode::SymlinkFormatManifest]
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
snapshot,
log_store,
custom_execute_handler: None,
}
}
}

impl super::Operation for GenerateBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

impl std::future::IntoFuture for GenerateBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let snapshot = resolve_snapshot(this.log_store(), this.snapshot.clone(), true).await?;
let mut payload = BytesMut::new();

for add in this
.snapshot
.clone()
.expect("A GenerateBuilder with no snapshot is not a valid state!")
.log_data()
.into_iter()
{
let uri = this.log_store().to_uri(&add.object_store_path());
trace!("Prepare {uri} for the symlink_format_manifest");
payload.put(uri.as_bytes());
payload.put_u8(b'\n');
}
debug!("Generate manifest {} bytes prepared", payload.len());
let payload = PutPayload::from(payload.freeze());
this.log_store()
.object_store(None)
.put(&Path::from("_symlink_format_manifest/manifest"), payload)
.await?;
Ok(DeltaTable::new_with_state(
this.log_store().clone(),
DeltaTableState::new(snapshot),
))
})
}
}

#[cfg(test)]
mod tests {
use super::*;

use crate::kernel::schema::{DataType, PrimitiveType};
use crate::kernel::Action;
use crate::DeltaOps;

use futures::StreamExt;

#[tokio::test]
async fn test_generate() -> DeltaResult<()> {
use crate::kernel::Add;
let actions = vec![Action::Add(Add {
path: "some-files.parquet".into(),
..Default::default()
})];
let table = DeltaOps::new_in_memory()
.create()
.with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
.with_actions(actions)
.await?;

let generate = GenerateBuilder::new(table.log_store(), table.state.map(|s| s.snapshot));
let table = generate.await?;

let store = table.log_store().object_store(None);
let mut stream = store.list(None);
let mut found = false;
while let Some(meta) = stream.next().await.transpose().unwrap() {
// Printing out the files so the failed assertion below will include the actual
// contents of the table's prefix in the log
println!("Name: {}, size: {}", meta.location, meta.size);
if meta.location == Path::from("_symlink_format_manifest/manifest") {
found = true;
break;
}
}
assert!(found, "The _symlink_format_manifest/manifest was not found in the Delta table's object store prefix");
Ok(())
}
}
7 changes: 7 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use self::{
};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::logstore::LogStoreRef;
use crate::operations::generate::GenerateBuilder;
use crate::table::builder::{ensure_table_uri, DeltaTableBuilder};
use crate::table::config::{TablePropertiesExt as _, DEFAULT_NUM_INDEX_COLS};
use crate::DeltaTable;
Expand All @@ -43,6 +44,7 @@ pub mod convert_to_delta;
pub mod create;
pub mod drop_constraints;
pub mod filesystem_check;
pub mod generate;
pub mod restore;
pub mod update_field_metadata;
pub mod update_table_metadata;
Expand Down Expand Up @@ -222,6 +224,11 @@ impl DeltaOps {
CreateBuilder::default().with_log_store(self.0.log_store)
}

/// Generate a symlink_format_manifest for other engines
pub fn generate(self) -> GenerateBuilder {
GenerateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
}

/// Load data from a DeltaTable
#[cfg(feature = "datafusion")]
#[must_use]
Expand Down
9 changes: 9 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,15 @@ def protocol(self) -> ProtocolVersions:
"""
return ProtocolVersions(*self._table.protocol_versions())

def generate(self) -> None:
"""
Generate symlink manifest for engines that cannot read native Delta Lake tables.

Creates symbolic links to Delta table data files, enabling compatibility with
processing engines that require direct file access instead of Delta protocol support.
"""
return self._table.generate()

def history(self, limit: int | None = None) -> list[dict[str, Any]]:
"""
Run the history command on the DeltaTable.
Expand Down
8 changes: 8 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,14 @@ impl RawDeltaTable {
Ok(())
}

#[pyo3()]
pub fn generate(&self, _py: Python) -> PyResult<()> {
let table = self._table.lock().map_err(to_rt_err)?.clone();
rt().block_on(async { DeltaOps(table).generate().await })
.map_err(PythonError::from)?;
Ok(())
}

#[pyo3(signature = (
starting_version = None,
ending_version = None,
Expand Down