Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ static LOG_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
)]))
});

static LOG_REMOVE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
REMOVE_NAME,
Remove::to_schema(),
)]))
});

static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
COMMIT_INFO_NAME,
Expand Down Expand Up @@ -120,6 +127,10 @@ pub(crate) fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
}

pub(crate) fn get_log_remove_schema() -> &'static SchemaRef {
&LOG_REMOVE_SCHEMA
}

pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
&LOG_COMMIT_INFO_SCHEMA
}
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_expression/evaluate_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ pub fn evaluate_expression(
(Struct(fields), Some(DataType::Struct(output_schema))) => {
evaluate_struct_expression(fields, batch, output_schema)
}
(Struct(_), _) => Err(Error::generic(
"Data type is required to evaluate struct expressions",
)),
(Struct(_), dt) => Err(Error::Generic(format!(
"Struct expression expects a DataType::Struct result, but got {dt:?}"
))),
(Transform(transform), Some(DataType::Struct(output_schema))) => {
evaluate_transform_expression(transform, batch, output_schema)
}
Expand Down
34 changes: 28 additions & 6 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,21 +260,37 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
}
}

pub(crate) static FILE_CONSTANT_VALUES_NAME: &str = "fileConstantValues";
pub(crate) static BASE_ROW_ID_NAME: &str = "baseRowId";
pub(crate) static DEFAULT_ROW_COMMIT_VERSION_NAME: &str = "defaultRowCommitVersion";
pub(crate) static TAGS_NAME: &str = "tags";

// NB: If you update this schema, ensure you update the comment describing it in the doc comment
// for `scan_row_schema` in scan/mod.rs! You'll also need to update ScanFileVisitor as the
// indexes will be off, and [`get_add_transform_expr`] below to match it.
pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(|| {
// Note that fields projected out of a nullable struct must be nullable
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
let file_constant_values =
StructType::new_unchecked([StructField::nullable("partitionValues", partition_values)]);
let file_constant_values = StructType::new_unchecked([
StructField::nullable("partitionValues", partition_values),
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
StructField::nullable(
"tags",
MapType::new(
DataType::STRING,
DataType::STRING,
/*valueContainsNull*/ true,
),
),
]);
Arc::new(StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("size", DataType::LONG),
StructField::nullable("modificationTime", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
StructField::nullable("fileConstantValues", file_constant_values),
StructField::nullable(FILE_CONSTANT_VALUES_NAME, file_constant_values),
]))
});

Expand All @@ -290,9 +306,12 @@ fn get_add_transform_expr() -> ExpressionRef {
column_expr_ref!("add.modificationTime"),
column_expr_ref!("add.stats"),
column_expr_ref!("add.deletionVector"),
Arc::new(Expression::Struct(vec![column_expr_ref!(
"add.partitionValues"
)])),
Arc::new(Expression::Struct(vec![
column_expr_ref!("add.partitionValues"),
column_expr_ref!("add.baseRowId"),
column_expr_ref!("add.defaultRowCommitVersion"),
column_expr_ref!("add.tags"),
])),
]))
});
EXPR.clone()
Expand All @@ -311,6 +330,9 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef {
column_expr_ref!("modificationTime"),
column_expr_ref!("stats"),
column_expr_ref!("deletionVector"),
column_expr_ref!("fileConstantValues.tags"),
column_expr_ref!("fileConstantValues.baseRowId"),
column_expr_ref!("fileConstantValues.defaultRowCommitVersion"),
],
))]))
});
Expand Down
14 changes: 13 additions & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, EmptyColumnResol
use crate::listed_log_files::ListedLogFiles;
use crate::log_replay::{ActionsBatch, HasSelectionVector};
use crate::log_segment::LogSegment;
use crate::scan::log_replay::BASE_ROW_ID_NAME;
use crate::scan::state::{DvInfo, Stats};
use crate::schema::ToSchema as _;
use crate::schema::{
Expand Down Expand Up @@ -498,6 +499,8 @@ impl Scan {
_existing_predicate: Option<PredicateRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>>>> {
static RESTORED_ADD_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
use crate::scan::log_replay::DEFAULT_ROW_COMMIT_VERSION_NAME;

let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
DataType::struct_type_unchecked(vec![StructField::nullable(
"add",
Expand All @@ -508,6 +511,12 @@ impl Scan {
StructField::nullable("modificationTime", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
StructField::nullable(
"tags",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
]),
)])
});
Expand Down Expand Up @@ -737,7 +746,10 @@ impl Scan {
/// cardinality: long,
/// },
/// fileConstantValues: {
/// partitionValues: map<string, string>
/// partitionValues: map<string, string>,
/// tags: map<string, string>,
/// baseRowId: long,
/// defaultRowCommitVersion: long,
/// }
/// }
/// ```
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 10,
getters.len() == 13,
Error::InternalError(format!(
"Wrong number of ScanFileVisitor getters: {}",
getters.len()
Expand Down
131 changes: 123 additions & 8 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use crate::SchemaTransform;
use std::borrow::Cow;
use std::collections::HashSet;
use std::iter;
use std::ops::Deref;
use std::sync::{Arc, LazyLock};

use url::Url;

use crate::actions::{
as_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema,
get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction,
get_log_remove_schema, get_log_txn_schema,
};
use crate::actions::{CommitInfo, DomainMetadata, SetTransaction};
use crate::engine_data::FilteredEngineData;
use crate::error::Error;
use crate::expressions::{ArrayData, Transform, UnaryExpressionOp::ToJson};
use crate::path::ParsedLogPath;
use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor};
use crate::scan::log_replay::{
BASE_ROW_ID_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME, FILE_CONSTANT_VALUES_NAME, TAGS_NAME,
};
use crate::scan::scan_row_schema;
use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType};
use crate::snapshot::SnapshotRef;
use crate::utils::current_time_ms;
Expand All @@ -22,6 +27,7 @@ use crate::{
RowVisitor, Version,
};
use delta_kernel_derive::internal_api;
use url::Url;

/// Type alias for an iterator of [`EngineData`] results.
type EngineDataResultIterator<'a> =
Expand All @@ -40,7 +46,6 @@ pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new
]))
});

/// Returns a reference to the mandatory fields in an add action.
///
/// Note this does not include "dataChange" which is a required field but
/// but should be set on the transactoin level. Getting the full schema
Expand Down Expand Up @@ -121,6 +126,7 @@ pub struct Transaction {
operation: Option<String>,
engine_info: Option<String>,
add_files_metadata: Vec<Box<dyn EngineData>>,
remove_files_metadata: Vec<FilteredEngineData>,
// NB: hashmap would require either duplicating the appid or splitting SetTransaction
// key/payload. HashSet requires Borrow<&str> with matching Eq, Ord, and Hash. Plus,
// HashSet::insert drops the to-be-inserted value without returning the existing one, which
Expand Down Expand Up @@ -167,6 +173,7 @@ impl Transaction {
operation: None,
engine_info: None,
add_files_metadata: vec![],
remove_files_metadata: vec![],
set_transactions: vec![],
commit_timestamp,
domain_metadatas: vec![],
Expand Down Expand Up @@ -231,17 +238,20 @@ impl Transaction {
let domain_metadata_actions =
self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?;

// Step 5: Commit the actions as a JSON file to the Delta log
// Step 5: Generate remove actions
let remove_actions = self.generate_remove_actions(engine)?;

// Step 6: Commit the actions as a JSON file to the Delta log
let commit_path =
ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?;
let actions = iter::once(commit_info_action)
.chain(add_actions)
.chain(set_transaction_actions)
.chain(domain_metadata_actions);

// Convert EngineData to FilteredEngineData with all rows selected
let filtered_actions = actions
.map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected));
.map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected))
.chain(remove_actions);

let json_handler = engine.json_handler();
match json_handler.write_json_file(&commit_path.location, Box::new(filtered_actions), false)
Expand Down Expand Up @@ -585,6 +595,112 @@ impl Transaction {
error,
}
}
/// Remove files from the table in this transaction. This API generally enables the engine to
/// delete data (at file-level granularity) from the table. Note that this API can be called
/// multiple times to remove multiple batches.
///
/// the expected schema for `remove_metadata` is given by [`scan_row_schema`] it, is expected
/// this will be the result of passing [`FilteredEngineData`] returned from a scan
/// with rows modified for removal (selected rows in FilteredEngineData are the ones to be removed).
pub fn remove_files(&mut self, remove_metadata: FilteredEngineData) {
self.remove_files_metadata.push(remove_metadata);
}

fn generate_remove_actions<'a>(
&'a self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send + 'a> {
// This is a workaround due to the fact that expression evaluation happens
// on the whole EngineData instead of accounting for filtered rows, which can lead to null values in
// required fields.
// TODO: Move this to a common place (dedupe from data_skipping.rs) or remove when evaluations work
// on FilteredEngineData directly.
struct NullableStatsTransform;
impl<'a> SchemaTransform<'a> for NullableStatsTransform {
fn transform_struct_field(
&mut self,
field: &'a StructField,
) -> Option<Cow<'a, StructField>> {
use Cow::*;
let field = match self.transform(&field.data_type)? {
Borrowed(_) if field.is_nullable() => Borrowed(field),
data_type => Owned(StructField {
name: field.name.clone(),
data_type: data_type.into_owned(),
nullable: true,
metadata: field.metadata.clone(),
}),
};
Some(field)
}
}

let input_schema = scan_row_schema();
let target_schema = NullableStatsTransform
.transform_struct(get_log_remove_schema())
.ok_or_else(|| Error::generic("Failed to transform remove schema"))?
.into_owned();
let evaluation_handler = engine.evaluation_handler();

Ok(self
.remove_files_metadata
.iter()
.map(move |file_metadata_batch| {
let transform = Expression::transform(
Transform::new_top_level()
.with_inserted_field(
Some("path"),
Expression::literal(self.commit_timestamp).into(),
)
.with_inserted_field(
Some("path"),
Expression::literal(self.data_change).into(),
)
.with_inserted_field(
// extended_file_metadata
Some("path"),
Expression::literal(true).into(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to figure out if this needs to be adaptive when the fields it is true for are null, or if hard-coded true is sufficient.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Java kernel also hard-codes this.

)
.with_inserted_field(
Some("path"),
Expression::column([FILE_CONSTANT_VALUES_NAME, "partitionValues"])
.into(),
)
// tags
.with_inserted_field(
Some("stats"),
Expression::column([FILE_CONSTANT_VALUES_NAME, TAGS_NAME]).into(),
)
.with_inserted_field(
Some("deletionVector"),
Expression::column([FILE_CONSTANT_VALUES_NAME, BASE_ROW_ID_NAME])
.into(),
)
.with_inserted_field(
Some("deletionVector"),
Expression::column([
FILE_CONSTANT_VALUES_NAME,
DEFAULT_ROW_COMMIT_VERSION_NAME,
])
.into(),
)
.with_dropped_field(FILE_CONSTANT_VALUES_NAME)
.with_dropped_field("modificationTime"),
);
let expr = Expression::struct_from([transform]);
let file_action_eval = evaluation_handler.new_expression_evaluator(
input_schema.clone(),
Arc::new(expr),
target_schema.clone().into(),
);

let updated_engine_data = file_action_eval.evaluate(file_metadata_batch.data())?;
FilteredEngineData::try_new(
updated_engine_data,
file_metadata_batch.selection_vector().to_vec(),
)
}))
}
}

/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to
Expand Down Expand Up @@ -737,7 +853,6 @@ mod tests {
// TODO: create a finer-grained unit tests for transactions (issue#1091)

#[test]

fn test_add_files_schema() -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path =
Expand Down
Loading
Loading