Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
7aa1d7f
working! just need to add back a test
nicklan Oct 3, 2025
d263f2a
Merge branch 'main' into row-tracking-take-1
nicklan Oct 6, 2025
397af57
cleanup, add back tests
nicklan Oct 7, 2025
aa68b80
handle selecting row index and row id
nicklan Oct 7, 2025
a0f11a6
initial tests
nicklan Oct 7, 2025
4d27d6d
fix clippy
nicklan Oct 7, 2025
e1931dd
Merge branch 'main' into row-tracking-take-1
nicklan Oct 7, 2025
723ab1a
finish up state info tests
nicklan Oct 7, 2025
bffa4e8
clippy
nicklan Oct 7, 2025
515ee7c
add one transform test
nicklan Oct 7, 2025
be7d60d
Add log_replay transform test
nicklan Oct 8, 2025
975d2b6
Merge branch 'main' into row-tracking-take-1
nicklan Oct 8, 2025
6e96dc2
fmt
nicklan Oct 8, 2025
e46cb15
Merge branch 'main' into row-tracking-take-1
nicklan Oct 10, 2025
b91e50d
move StateInfo into its own module
nicklan Oct 10, 2025
798abbb
working, needs simplification
nicklan Oct 10, 2025
2f1cba7
cleanup
nicklan Oct 10, 2025
693ab5e
cleanup
nicklan Oct 10, 2025
8ef9487
add some more tests
nicklan Oct 10, 2025
07b84af
Merge branch 'main' into row-tracking-take-1
nicklan Oct 10, 2025
d707a5f
unneeded mod path
nicklan Oct 11, 2025
fa9c6b1
address comment
nicklan Oct 11, 2025
0d57b88
remove unneeded
nicklan Oct 11, 2025
6100e6f
more coverage
nicklan Oct 11, 2025
cae7eaf
address comment
nicklan Oct 17, 2025
3a43190
Merge branch 'main' into row-tracking-take-1
nicklan Oct 17, 2025
1e36f00
simplify schema logic
nicklan Oct 17, 2025
2822bb8
better name
nicklan Oct 17, 2025
a4cbedc
move partition col check out of loop
nicklan Oct 17, 2025
8f25b5e
consolidate assertions on the row id transform
nicklan Oct 17, 2025
194cb98
comments
nicklan Oct 17, 2025
8f0ff20
factor out `validate_metadata_columns`
nicklan Oct 17, 2025
e451940
Merge branch 'main' into row-tracking-take-1
nicklan Oct 21, 2025
502a967
update comment
nicklan Oct 22, 2025
6c744bc
address final comments
nicklan Oct 22, 2025
dafe973
Merge branch 'main' into row-tracking-take-1
nicklan Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions kernel/examples/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use delta_kernel::{
arrow::array::RecordBatch,
engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine},
scan::Scan,
schema::Schema,
schema::{MetadataColumnSpec, Schema},
DeltaResult, SnapshotRef,
};

Expand Down Expand Up @@ -61,9 +61,18 @@ pub struct ScanArgs {
#[arg(long)]
pub schema_only: bool,

/// Comma separated list of columns to select
#[arg(long, value_delimiter=',', num_args(0..))]
pub columns: Option<Vec<String>>,
/// Comma separated list of columns to select. Must be passed as a single string, leading and
/// trailing spaces for each column name will be trimmed
#[arg(long)]
pub columns: Option<String>,

/// Include a _metadata.row_index field
#[arg(long)]
pub with_row_index: bool,

/// Include a _metadata.row_id field if row-tracking is enabled
#[arg(long)]
pub with_row_id: bool,
}

pub trait ParseWithExamples<T> {
Expand Down Expand Up @@ -182,9 +191,10 @@ pub fn get_scan(snapshot: SnapshotRef, args: &ScanArgs) -> DeltaResult<Option<Sc

let read_schema_opt = args
.columns
.clone()
.as_ref()
.map(|cols| -> DeltaResult<_> {
let table_schema = snapshot.schema();
let cols: Vec<&str> = cols.split(",").map(str::trim).collect();
let selected_fields = cols.iter().map(|col| {
table_schema
.field(col)
Expand All @@ -193,9 +203,42 @@ pub fn get_scan(snapshot: SnapshotRef, args: &ScanArgs) -> DeltaResult<Option<Sc
"Table has no such column: {col}"
)))
});
Schema::try_from_results(selected_fields).map(Arc::new)
let schema = Schema::try_from_results(selected_fields);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all seems like it could be simplified like so:

// Use table schema by default
let mut table_schema = snapshot.schema();

// Project columns 
if let Some(columns) = args.columns.as_ref() {
    let cols: Vec<&str> = cols.split(",").map(str::trim).collect();
    table_schema = table_schema.project_as_struct(&cols);
}

// Add row index column
if args.with_row_index {
     schema.add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)}
}

// Add row id column
if args.with_row_id {
    schema.add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good call. I should have just given up on keeping the schema Optional :)

let schema = if args.with_row_index {
schema.and_then(|schema| {
schema.add_metadata_column("row_index", MetadataColumnSpec::RowIndex)
})
} else {
schema
};
schema.map(Arc::new)
})
.transpose()?;

let read_schema_opt = read_schema_opt.or_else(|| {
(args.with_row_index || args.with_row_id).then(|| {
let schema = snapshot.schema();
let schema = if args.with_row_index {
Arc::new(
schema
.add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)
.unwrap(),
)
} else {
schema
};
if args.with_row_id {
Arc::new(
schema
.add_metadata_column("_metadata.row_id", MetadataColumnSpec::RowId)
.unwrap(),
)
} else {
schema
}
})
});

Ok(Some(
snapshot
.scan_builder()
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,13 @@ fn get_indices(
Arc::new(field.try_into_arrow()?),
));
}
Some(MetadataColumnSpec::RowId) => {
debug!("Inserting a row index column for row ids: {}", field.name());
reorder_indices.push(ReorderIndex::row_index(
requested_position,
Arc::new(field.try_into_arrow()?),
));
}
Some(metadata_spec) => {
return Err(Error::Generic(format!(
"Metadata column {metadata_spec:?} is not supported by the default parquet reader"
Expand Down
34 changes: 7 additions & 27 deletions kernel/src/scan/field_classifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,19 @@ pub(crate) trait TransformFieldClassifier {
&self,
field: &StructField,
field_index: usize,
partition_columns: &[String],
last_physical_field: &Option<String>,
) -> Option<FieldTransformSpec>;
}

/// Regular scan field classifier for standard Delta table scans.
/// Handles partition columns as metadata-derived fields.
pub(crate) struct ScanTransformFieldClassifier;
impl TransformFieldClassifier for ScanTransformFieldClassifier {
// Empty classifier, always returns None
impl TransformFieldClassifier for () {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to update the TransformFieldClassifier description to describe its optional nature now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure where you mean to update? I think the docs already say it's optional?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// Trait for classifying fields during StateInfo construction.
/// Allows different scan types (regular, CDF) to customize field handling.
pub(crate) trait TransformFieldClassifier {

^ Update it clarify that general field handling happens in StateInfo?

fn classify_field(
&self,
field: &StructField,
field_index: usize,
partition_columns: &[String],
last_physical_field: &Option<String>,
_: &StructField,
_: usize,
_: &Option<String>,
) -> Option<FieldTransformSpec> {
if partition_columns.contains(field.name()) {
// Partition column: needs transform to inject metadata
Some(FieldTransformSpec::MetadataDerivedColumn {
field_index,
insert_after: last_physical_field.clone(),
})
} else {
// Regular physical field - no transform needed
None
}
None
}
}

Expand All @@ -53,7 +40,6 @@ impl TransformFieldClassifier for CdfTransformFieldClassifier {
&self,
field: &StructField,
field_index: usize,
partition_columns: &[String],
last_physical_field: &Option<String>,
) -> Option<FieldTransformSpec> {
match field.name().as_str() {
Expand All @@ -70,13 +56,7 @@ impl TransformFieldClassifier for CdfTransformFieldClassifier {
insert_after: last_physical_field.clone(),
})
}
// Defer to default classifier for partition columns and physical fields
_ => ScanTransformFieldClassifier.classify_field(
field,
field_index,
partition_columns,
last_physical_field,
),
_ => None,
}
}
}
124 changes: 100 additions & 24 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};

use super::data_skipping::DataSkippingFilter;
use super::{PhysicalPredicate, ScanMetadata, StateInfo};
use super::state_info::StateInfo;
use super::{PhysicalPredicate, ScanMetadata};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::get_log_add_schema;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
Expand Down Expand Up @@ -105,8 +106,9 @@ impl AddRemoveDedupVisitor<'_> {
const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters
const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters
const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns
const REMOVE_PATH_INDEX: usize = 5; // Position of "remove.path" in getters
const REMOVE_DV_START_INDEX: usize = 6; // Start position of remove deletion vector columns
const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters
const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters
const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns

fn new(
seen: &mut HashSet<FileActionKey>,
Expand Down Expand Up @@ -195,10 +197,19 @@ impl AddRemoveDedupVisitor<'_> {
if self.deduplicator.check_and_record_seen(file_key) || !is_add {
return Ok(false);
}
let base_row_id: Option<i64> =
getters[Self::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?;
let transform = self
.transform_spec
.as_ref()
.map(|transform| get_transform_expr(transform, partition_values, &self.physical_schema))
.map(|transform| {
get_transform_expr(
transform,
partition_values,
&self.physical_schema,
base_row_id,
)
})
.transpose()?;
if transform.is_some() {
// fill in any needed `None`s for previous rows
Expand All @@ -215,13 +226,15 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const INTEGER: DataType = DataType::INTEGER;
const LONG: DataType = DataType::LONG;
let ss_map: DataType = MapType::new(STRING, STRING, true).into();
let types_and_names = vec![
(STRING, column_name!("add.path")),
(ss_map, column_name!("add.partitionValues")),
(STRING, column_name!("add.deletionVector.storageType")),
(STRING, column_name!("add.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("add.deletionVector.offset")),
(LONG, column_name!("add.baseRowId")),
(STRING, column_name!("remove.path")),
(STRING, column_name!("remove.deletionVector.storageType")),
(STRING, column_name!("remove.deletionVector.pathOrInlineDv")),
Expand All @@ -236,13 +249,13 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
} else {
// All checkpoint actions are already reconciled and Remove actions in checkpoint files
// only serve as tombstones for vacuum jobs. So we only need to examine the adds here.
(&names[..5], &types[..5])
(&names[..6], &types[..6])
}
}

fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let is_log_batch = self.deduplicator.is_log_batch();
let expected_getters = if is_log_batch { 9 } else { 5 };
let expected_getters = if is_log_batch { 10 } else { 6 };
require!(
getters.len() == expected_getters,
Error::InternalError(format!(
Expand All @@ -266,8 +279,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(|| {
// Note that fields projected out of a nullable struct must be nullable
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
let file_constant_values =
StructType::new_unchecked([StructField::nullable("partitionValues", partition_values)]);
let file_constant_values = StructType::new_unchecked([
StructField::nullable("partitionValues", partition_values),
StructField::nullable("baseRowId", DataType::LONG),
]);
Comment on lines +282 to +285
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: my async prototype would also let us avoid a lot this static schema munging since the schema can be inferred from a plan.

Arc::new(StructType::new_unchecked([
StructField::nullable("path", DataType::STRING),
StructField::nullable("size", DataType::LONG),
Expand All @@ -290,9 +305,10 @@ fn get_add_transform_expr() -> ExpressionRef {
column_expr_ref!("add.modificationTime"),
column_expr_ref!("add.stats"),
column_expr_ref!("add.deletionVector"),
Arc::new(Expression::Struct(vec![column_expr_ref!(
"add.partitionValues"
)])),
Arc::new(Expression::Struct(vec![
column_expr_ref!("add.partitionValues"),
column_expr_ref!("add.baseRowId"),
])),
]))
});
EXPR.clone()
Expand All @@ -311,6 +327,7 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef {
column_expr_ref!("modificationTime"),
column_expr_ref!("stats"),
column_expr_ref!("deletionVector"),
column_expr_ref!("fileConstantValues.baseRowId"),
],
))]))
});
Expand Down Expand Up @@ -380,12 +397,14 @@ mod tests {
use crate::expressions::Scalar;
use crate::log_replay::ActionsBatch;
use crate::scan::state::{DvInfo, Stats};
use crate::scan::state_info::tests::{get_simple_state_info, get_state_info};
use crate::scan::state_info::StateInfo;
use crate::scan::test_utils::{
add_batch_simple, add_batch_with_partition_col, add_batch_with_remove,
run_with_validate_callback,
add_batch_for_row_id, add_batch_simple, add_batch_with_partition_col,
add_batch_with_remove, run_with_validate_callback,
};
use crate::scan::{PhysicalPredicate, StateInfo};
use crate::table_features::ColumnMappingMode;
use crate::scan::PhysicalPredicate;
use crate::schema::MetadataColumnSpec;
use crate::Expression as Expr;
use crate::{
engine::sync::SyncEngine,
Expand Down Expand Up @@ -473,15 +492,8 @@ mod tests {
StructField::new("value", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
]));
let partition_cols = ["date".to_string()];
let state_info = StateInfo::try_new(
schema.clone(),
&partition_cols,
ColumnMappingMode::None,
None,
crate::scan::field_classifiers::ScanTransformFieldClassifier,
)
.unwrap();
let partition_cols = vec!["date".to_string()];
let state_info = get_simple_state_info(schema, partition_cols).unwrap();
let batch = vec![add_batch_with_partition_col()];
let iter = scan_action_iter(
&SyncEngine::new(),
Expand Down Expand Up @@ -525,4 +537,68 @@ mod tests {
validate_transform(transforms[3].as_ref(), 17510);
}
}

#[test]
fn test_row_id_transform() {
let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new(
"value",
DataType::INTEGER,
true,
)]));
let state_info = get_state_info(
schema.clone(),
vec![],
None,
[
("delta.enableRowTracking", "true"),
(
"delta.rowTracking.materializedRowIdColumnName",
"row_id_col",
),
]
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
vec![("row_id", MetadataColumnSpec::RowId)],
)
.unwrap();
let batch = vec![add_batch_for_row_id(get_log_schema().clone())];
let iter = scan_action_iter(
&SyncEngine::new(),
batch
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
Arc::new(state_info),
);

for res in iter {
let scan_metadata = res.unwrap();
let transforms = scan_metadata.scan_file_transforms;
assert_eq!(transforms.len(), 1, "Should have 1 transform");
if let Some(Expr::Transform(transform_expr)) = transforms[0].as_ref().map(Arc::as_ref) {
assert!(transform_expr.input_path.is_none());
let row_id_transform = transform_expr
.field_transforms
.get("row_id_col")
.expect("Should have row_id_col transform");
assert!(row_id_transform.is_replace);
assert_eq!(row_id_transform.exprs.len(), 1);
let expr = &row_id_transform.exprs[0];
let expeceted_expr = Arc::new(Expr::variadic(
crate::expressions::VariadicExpressionOp::Coalesce,
vec![
Expr::column(["row_id_col"]),
Expr::binary(
crate::expressions::BinaryExpressionOp::Plus,
Expr::literal(42i64),
Expr::column(["row_indexes_for_row_id_0"]),
),
],
));
assert_eq!(expr, &expeceted_expr);
} else {
panic!("Should have been a transform expression");
}
}
}
}
Loading
Loading