-
Couldn't load subscription status.
- Fork 118
feat: Add row tracking support #1375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
7aa1d7f
working! just need to add back a test
nicklan d263f2a
Merge branch 'main' into row-tracking-take-1
nicklan 397af57
cleanup, add back tests
nicklan aa68b80
handle selecting row index and row id
nicklan a0f11a6
initial tests
nicklan 4d27d6d
fix clippy
nicklan e1931dd
Merge branch 'main' into row-tracking-take-1
nicklan 723ab1a
finish up state info tests
nicklan bffa4e8
clippy
nicklan 515ee7c
add one transform test
nicklan be7d60d
Add log_replay transform test
nicklan 975d2b6
Merge branch 'main' into row-tracking-take-1
nicklan 6e96dc2
fmt
nicklan e46cb15
Merge branch 'main' into row-tracking-take-1
nicklan b91e50d
move StateInfo into its own module
nicklan 798abbb
working, needs simplification
nicklan 2f1cba7
cleanup
nicklan 693ab5e
cleanup
nicklan 8ef9487
add some more tests
nicklan 07b84af
Merge branch 'main' into row-tracking-take-1
nicklan d707a5f
unneeded mod path
nicklan fa9c6b1
address comment
nicklan 0d57b88
remove unneeded
nicklan 6100e6f
more coverage
nicklan cae7eaf
address comment
nicklan 3a43190
Merge branch 'main' into row-tracking-take-1
nicklan 1e36f00
simplify schema logic
nicklan 2822bb8
better name
nicklan a4cbedc
move partition col check out of loop
nicklan 8f25b5e
consolidate assertions on the row id transform
nicklan 194cb98
comments
nicklan 8f0ff20
factor out `validate_metadata_columns`
nicklan e451940
Merge branch 'main' into row-tracking-take-1
nicklan 502a967
update comment
nicklan 6c744bc
address final comments
nicklan dafe973
Merge branch 'main' into row-tracking-take-1
nicklan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,8 @@ use std::collections::{HashMap, HashSet}; | |
| use std::sync::{Arc, LazyLock}; | ||
|
|
||
| use super::data_skipping::DataSkippingFilter; | ||
| use super::{PhysicalPredicate, ScanMetadata, StateInfo}; | ||
| use super::state_info::StateInfo; | ||
| use super::{PhysicalPredicate, ScanMetadata}; | ||
| use crate::actions::deletion_vector::DeletionVectorDescriptor; | ||
| use crate::actions::get_log_add_schema; | ||
| use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; | ||
|
|
@@ -105,8 +106,9 @@ impl AddRemoveDedupVisitor<'_> { | |
| const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters | ||
| const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters | ||
| const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns | ||
| const REMOVE_PATH_INDEX: usize = 5; // Position of "remove.path" in getters | ||
| const REMOVE_DV_START_INDEX: usize = 6; // Start position of remove deletion vector columns | ||
| const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters | ||
| const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters | ||
| const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns | ||
|
|
||
| fn new( | ||
| seen: &mut HashSet<FileActionKey>, | ||
|
|
@@ -195,10 +197,19 @@ impl AddRemoveDedupVisitor<'_> { | |
| if self.deduplicator.check_and_record_seen(file_key) || !is_add { | ||
| return Ok(false); | ||
| } | ||
| let base_row_id: Option<i64> = | ||
| getters[Self::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?; | ||
| let transform = self | ||
| .transform_spec | ||
| .as_ref() | ||
| .map(|transform| get_transform_expr(transform, partition_values, &self.physical_schema)) | ||
| .map(|transform| { | ||
| get_transform_expr( | ||
| transform, | ||
| partition_values, | ||
| &self.physical_schema, | ||
| base_row_id, | ||
| ) | ||
| }) | ||
| .transpose()?; | ||
| if transform.is_some() { | ||
| // fill in any needed `None`s for previous rows | ||
|
|
@@ -215,13 +226,15 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { | |
| static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| { | ||
| const STRING: DataType = DataType::STRING; | ||
| const INTEGER: DataType = DataType::INTEGER; | ||
| const LONG: DataType = DataType::LONG; | ||
| let ss_map: DataType = MapType::new(STRING, STRING, true).into(); | ||
| let types_and_names = vec![ | ||
| (STRING, column_name!("add.path")), | ||
| (ss_map, column_name!("add.partitionValues")), | ||
| (STRING, column_name!("add.deletionVector.storageType")), | ||
| (STRING, column_name!("add.deletionVector.pathOrInlineDv")), | ||
| (INTEGER, column_name!("add.deletionVector.offset")), | ||
| (LONG, column_name!("add.baseRowId")), | ||
| (STRING, column_name!("remove.path")), | ||
| (STRING, column_name!("remove.deletionVector.storageType")), | ||
| (STRING, column_name!("remove.deletionVector.pathOrInlineDv")), | ||
|
|
@@ -236,13 +249,13 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { | |
| } else { | ||
| // All checkpoint actions are already reconciled and Remove actions in checkpoint files | ||
| // only serve as tombstones for vacuum jobs. So we only need to examine the adds here. | ||
| (&names[..5], &types[..5]) | ||
| (&names[..6], &types[..6]) | ||
| } | ||
| } | ||
|
|
||
| fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { | ||
| let is_log_batch = self.deduplicator.is_log_batch(); | ||
| let expected_getters = if is_log_batch { 9 } else { 5 }; | ||
| let expected_getters = if is_log_batch { 10 } else { 6 }; | ||
| require!( | ||
| getters.len() == expected_getters, | ||
| Error::InternalError(format!( | ||
|
|
@@ -266,8 +279,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { | |
| pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(|| { | ||
| // Note that fields projected out of a nullable struct must be nullable | ||
| let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); | ||
| let file_constant_values = | ||
| StructType::new_unchecked([StructField::nullable("partitionValues", partition_values)]); | ||
| let file_constant_values = StructType::new_unchecked([ | ||
| StructField::nullable("partitionValues", partition_values), | ||
| StructField::nullable("baseRowId", DataType::LONG), | ||
| ]); | ||
|
Comment on lines
+282
to
+285
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: my async prototype would also let us avoid a lot this static schema munging since the schema can be inferred from a plan. |
||
| Arc::new(StructType::new_unchecked([ | ||
| StructField::nullable("path", DataType::STRING), | ||
| StructField::nullable("size", DataType::LONG), | ||
|
|
@@ -290,9 +305,10 @@ fn get_add_transform_expr() -> ExpressionRef { | |
| column_expr_ref!("add.modificationTime"), | ||
| column_expr_ref!("add.stats"), | ||
| column_expr_ref!("add.deletionVector"), | ||
| Arc::new(Expression::Struct(vec![column_expr_ref!( | ||
| "add.partitionValues" | ||
| )])), | ||
| Arc::new(Expression::Struct(vec![ | ||
| column_expr_ref!("add.partitionValues"), | ||
| column_expr_ref!("add.baseRowId"), | ||
| ])), | ||
| ])) | ||
| }); | ||
| EXPR.clone() | ||
|
|
@@ -311,6 +327,7 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef { | |
| column_expr_ref!("modificationTime"), | ||
| column_expr_ref!("stats"), | ||
| column_expr_ref!("deletionVector"), | ||
| column_expr_ref!("fileConstantValues.baseRowId"), | ||
| ], | ||
| ))])) | ||
| }); | ||
|
|
@@ -377,15 +394,19 @@ mod tests { | |
| use std::{collections::HashMap, sync::Arc}; | ||
|
|
||
| use crate::actions::get_log_schema; | ||
| use crate::expressions::Scalar; | ||
| use crate::expressions::{BinaryExpressionOp, Scalar, VariadicExpressionOp}; | ||
| use crate::log_replay::ActionsBatch; | ||
| use crate::scan::state::{DvInfo, Stats}; | ||
| use crate::scan::state_info::tests::{ | ||
| assert_transform_spec, get_simple_state_info, get_state_info, | ||
| }; | ||
| use crate::scan::state_info::StateInfo; | ||
| use crate::scan::test_utils::{ | ||
| add_batch_simple, add_batch_with_partition_col, add_batch_with_remove, | ||
| run_with_validate_callback, | ||
| add_batch_for_row_id, add_batch_simple, add_batch_with_partition_col, | ||
| add_batch_with_remove, run_with_validate_callback, | ||
| }; | ||
| use crate::scan::{PhysicalPredicate, StateInfo}; | ||
| use crate::table_features::ColumnMappingMode; | ||
| use crate::scan::PhysicalPredicate; | ||
| use crate::schema::MetadataColumnSpec; | ||
| use crate::Expression as Expr; | ||
| use crate::{ | ||
| engine::sync::SyncEngine, | ||
|
|
@@ -473,15 +494,8 @@ mod tests { | |
| StructField::new("value", DataType::INTEGER, true), | ||
| StructField::new("date", DataType::DATE, true), | ||
| ])); | ||
| let partition_cols = ["date".to_string()]; | ||
| let state_info = StateInfo::try_new( | ||
| schema.clone(), | ||
| &partition_cols, | ||
| ColumnMappingMode::None, | ||
| None, | ||
| crate::scan::field_classifiers::ScanTransformFieldClassifier, | ||
| ) | ||
| .unwrap(); | ||
| let partition_cols = vec!["date".to_string()]; | ||
| let state_info = get_simple_state_info(schema, partition_cols).unwrap(); | ||
| let batch = vec![add_batch_with_partition_col()]; | ||
| let iter = scan_action_iter( | ||
| &SyncEngine::new(), | ||
|
|
@@ -525,4 +539,77 @@ mod tests { | |
| validate_transform(transforms[3].as_ref(), 17510); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_row_id_transform() { | ||
| let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( | ||
| "value", | ||
| DataType::INTEGER, | ||
| true, | ||
| )])); | ||
| let state_info = get_state_info( | ||
| schema.clone(), | ||
| vec![], | ||
| None, | ||
| [ | ||
| ("delta.enableRowTracking", "true"), | ||
| ( | ||
| "delta.rowTracking.materializedRowIdColumnName", | ||
| "row_id_col", | ||
| ), | ||
| ] | ||
| .iter() | ||
| .map(|(k, v)| (k.to_string(), v.to_string())) | ||
| .collect(), | ||
| vec![("row_id", MetadataColumnSpec::RowId)], | ||
| ) | ||
| .unwrap(); | ||
OussamaSaoudi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| let transform_spec = state_info.transform_spec.as_ref().unwrap(); | ||
| assert_transform_spec( | ||
| transform_spec, | ||
| false, | ||
| "row_id_col", | ||
| "row_indexes_for_row_id_0", | ||
| ); | ||
|
|
||
| let batch = vec![add_batch_for_row_id(get_log_schema().clone())]; | ||
| let iter = scan_action_iter( | ||
| &SyncEngine::new(), | ||
| batch | ||
| .into_iter() | ||
| .map(|batch| Ok(ActionsBatch::new(batch as _, true))), | ||
| Arc::new(state_info), | ||
| ); | ||
|
|
||
| for res in iter { | ||
| let scan_metadata = res.unwrap(); | ||
| let transforms = scan_metadata.scan_file_transforms; | ||
| assert_eq!(transforms.len(), 1, "Should have 1 transform"); | ||
| if let Some(Expr::Transform(transform_expr)) = transforms[0].as_ref().map(Arc::as_ref) { | ||
| assert!(transform_expr.input_path.is_none()); | ||
| let row_id_transform = transform_expr | ||
| .field_transforms | ||
| .get("row_id_col") | ||
| .expect("Should have row_id_col transform"); | ||
| assert!(row_id_transform.is_replace); | ||
| assert_eq!(row_id_transform.exprs.len(), 1); | ||
| let expr = &row_id_transform.exprs[0]; | ||
| let expeceted_expr = Arc::new(Expr::variadic( | ||
| VariadicExpressionOp::Coalesce, | ||
| vec![ | ||
| Expr::column(["row_id_col"]), | ||
| Expr::binary( | ||
| BinaryExpressionOp::Plus, | ||
| Expr::literal(42i64), | ||
| Expr::column(["row_indexes_for_row_id_0"]), | ||
| ), | ||
| ], | ||
| )); | ||
| assert_eq!(expr, &expeceted_expr); | ||
| } else { | ||
| panic!("Should have been a transform expression"); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to update the TransformFieldClassifier description to describe its optional nature now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure where you mean to update? I think the docs already say it's optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ Update it clarify that general field handling happens in StateInfo?