-
Couldn't load subscription status.
- Fork 118
feat: Add row tracking support #1375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1375 +/- ##
==========================================
+ Coverage 84.61% 84.78% +0.17%
==========================================
Files 117 118 +1
Lines 29936 30286 +350
Branches 29936 30286 +350
==========================================
+ Hits 25330 25678 +348
Misses 3382 3382
- Partials 1224 1226 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
kernel/src/scan/mod.rs
Outdated
| .metadata() | ||
| .configuration() | ||
| .get("delta.rowTracking.materializedRowIdColumnName") | ||
| .ok_or(Error::generic("No delta.rowTracking.materializedRowIdColumnName key found in metadata configuration"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we not supporting generated row ids on purpose? From the protocol:
delta.rowTracking.materializedRowIdColumnName keyin the configuration of the table's metaData action. This column may contain null values meaning that the corresponding row has no materialized Row ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment below, we handle the null case
Broken, checkpoint before re-arch
| log_replay::SCAN_ROW_SCHEMA.clone() | ||
| } | ||
|
|
||
| /// All the state needed to process a scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all moved into state_info.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, small nits
| pub(crate) struct ScanTransformFieldClassifier; | ||
| impl TransformFieldClassifier for ScanTransformFieldClassifier { | ||
| // Empty classifier, always returns None | ||
| impl TransformFieldClassifier for () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to update the TransformFieldClassifier description to describe its optional nature now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure where you mean to update? I think the docs already say it's optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Trait for classifying fields during StateInfo construction.
/// Allows different scan types (regular, CDF) to customize field handling.
pub(crate) trait TransformFieldClassifier {
^ Update it clarify that general field handling happens in StateInfo?
| predicate: Option<PredicateRef>, | ||
| classifier: C, | ||
| ) -> DeltaResult<Self> { | ||
| let partition_columns = table_configuration.metadata().partition_columns(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to pull these out into individual variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I removed partition_columns since we only use that once. But we use column_mapping_mode twice and it's net more lines of code to not pull it out since it's more characters to get to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually with a refactor I put it back in because it ends up being fewer overall lines
kernel/src/scan/state_info.rs
Outdated
| if table_configuration.table_properties().enable_row_tracking != Some(true) | ||
| { | ||
| return Err(Error::unsupported( | ||
| "Row ids are not enabled on this table", | ||
| )); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do this check on line 59 above where we run through the metadata columns and set
if let Some(MetadataColumnSpec::RowIndex) = metadata_column.get_metadata_column_spec() ?
Also to clarify, can we have row index without row tracking support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have row-index without row-tracking yeah. Row index is just a parquet reader feature to return the index of the row in the file
kernel/src/scan/state_info.rs
Outdated
| .get("delta.rowTracking.materializedRowIdColumnName") | ||
| .ok_or(Error::generic("No delta.rowTracking.materializedRowIdColumnName key found in metadata configuration"))?; | ||
|
|
||
| // we can `take` as we should only have one RowId col |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we validate in the loop in line 58?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate which? The config key? We need to get it out here no matter what so seems fine to validate here or we're looking it up twice (or storing it in a variable I guess)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I think we should have a function validate_metadata_columns that does the following:
- Ensure materializedRowIdColumnName is present and extracts it
- Ensures that row tracking is enabled if metadata columns are present.
- Ensures partition columns don't conflict with with metadata columns.
EDIT: the goal is to keep this inner loop as simple as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah also extract any state that's needed during the loop^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm, I see what you're saying. But as we add more metadata cols that might need to extract more information and use it (like row_id_col) the return from that function is going to be a mess of optionals, which I don't love. That's why I choose to keep the validation and state extraction here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing comments.
kernel/examples/common/src/lib.rs
Outdated
| ))) | ||
| }); | ||
| Schema::try_from_results(selected_fields).map(Arc::new) | ||
| let schema = Schema::try_from_results(selected_fields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all seems like it could be simplified like so:
// Use table schema by default
let mut table_schema = snapshot.schema();
// Project columns
if let Some(columns) = args.columns.as_ref() {
let cols: Vec<&str> = cols.split(",").map(str::trim).collect();
table_schema = table_schema.project_as_struct(&cols);
}
// Add row index column
if args.with_row_index {
schema.add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)}
}
// Add row id column
if args.with_row_id {
schema.add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good call. I should have just given up on keeping the schema Optional :)
| let file_constant_values = StructType::new_unchecked([ | ||
| StructField::nullable("partitionValues", partition_values), | ||
| StructField::nullable("baseRowId", DataType::LONG), | ||
| ]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: my async prototype would also let us avoid a lot this static schema munging since the schema can be inferred from a plan.
| transform_spec, | ||
| partition_values, | ||
| physical_schema, | ||
| None, /* base_row_id */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO issue to get the base_row_id for CDF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DrakeLin this also indicates that we may want a is_cdf_supported(TableFeature::RowTracking)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does CDF support row tracking? what are the semantics in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah you're right:
https://docs.databricks.com/aws/en/delta/row-tracking#limitations
I think this is a bit odd tho. It's like not being able to access your primary key during a CDC. In any case, let's block now. Good callout 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a note to get_cdf_transform_expr that makes it clear that we do not support reading row tracking info during CDF and that this is a known delta limitation? I want to avoid losing this context :)
kernel/src/transforms.rs
Outdated
| assert!(get_transform_expr( | ||
| &transform_spec, | ||
| metadata_values, | ||
| &physical_schema, | ||
| None, /* base_row_id */ | ||
| ) | ||
| .is_err()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls use assert_result_error_with_message so we ensure we get the expected error
kernel/src/scan/state_info.rs
Outdated
| if logical_field.is_metadata_column() { | ||
| return Err(Error::Schema(format!( | ||
| "Metadata column names must not match partition columns: {}", | ||
| logical_field.name() | ||
| ))); | ||
| } | ||
| // push the transform for this partition column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done as a check above in for metadata_column in logical_schema.metadata_columns()
kernel/src/scan/state_info.rs
Outdated
| if table_configuration.table_properties().enable_row_tracking != Some(true) | ||
| { | ||
| return Err(Error::unsupported( | ||
| "Row ids are not enabled on this table", | ||
| )); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check can be done above. We do the following:
for metadata_column in logical_schema.metadata_columns() {
if let Some(MetadataColumnSpec::RowIndex) = metadata_column.get_metadata_column_spec() {
selected_row_index_col_name = Some(metadata_column.name().to_string());
}
metadata_field_names.insert(metadata_column.name());
}
Let's factor that out and do all our row tracking checks (including partition columns).
I want to make this inner loop very simple and clear.
kernel/src/scan/state_info.rs
Outdated
| .get("delta.rowTracking.materializedRowIdColumnName") | ||
| .ok_or(Error::generic("No delta.rowTracking.materializedRowIdColumnName key found in metadata configuration"))?; | ||
|
|
||
| // we can `take` as we should only have one RowId col |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I think we should have a function validate_metadata_columns that does the following:
- Ensure materializedRowIdColumnName is present and extracts it
- Ensures that row tracking is enabled if metadata columns are present.
- Ensures partition columns don't conflict with with metadata columns.
EDIT: the goal is to keep this inner loop as simple as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
| pub(crate) struct ScanTransformFieldClassifier; | ||
| impl TransformFieldClassifier for ScanTransformFieldClassifier { | ||
| // Empty classifier, always returns None | ||
| impl TransformFieldClassifier for () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Trait for classifying fields during StateInfo construction.
/// Allows different scan types (regular, CDF) to customize field handling.
pub(crate) trait TransformFieldClassifier {
^ Update it clarify that general field handling happens in StateInfo?
a1d2a98 to
502a967
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just various cleanups and small comments.
While the unit tests are good, I would advocate that we don't publish any kernel releases until we add some integration tests validate row_index/row_id.
kernel/tests/read.rs
Outdated
| let scan = snapshot.scan_builder().with_schema(schema).build(); | ||
| match scan { | ||
| Err(e) => { | ||
| let error_msg = e.to_string(); | ||
| assert!( | ||
| error_msg.contains(error_text), | ||
| "Expected {error_msg} to contain {error_text}" | ||
| ); | ||
| } | ||
| Ok(_) => { | ||
| panic!( | ||
| "Expected error for {} metadata column, but scan succeeded", | ||
| error_text | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think there may be an unwrap_error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, that's useful
| transform_spec, | ||
| partition_values, | ||
| physical_schema, | ||
| None, /* base_row_id */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah you're right:
https://docs.databricks.com/aws/en/delta/row-tracking#limitations
I think this is a bit odd tho. It's like not being able to access your primary key during a CDC. In any case, let's block now. Good callout 👍
kernel/src/scan/state_info.rs
Outdated
| match get_state_info( | ||
| schema.clone(), | ||
| vec!["part_col".to_string()], | ||
| None, | ||
| HashMap::new(), | ||
| vec![("part_col", MetadataColumnSpec::RowId)], | ||
| ) { | ||
| Ok(_) => { | ||
| panic!("Should not have succeeded generating state info with invalid config") | ||
| } | ||
| Err(e) => { | ||
| assert_eq!(e.to_string(), | ||
| "Schema error: Metadata column names must not match partition columns: part_col") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| match get_state_info( | |
| schema.clone(), | |
| vec!["part_col".to_string()], | |
| None, | |
| HashMap::new(), | |
| vec![("part_col", MetadataColumnSpec::RowId)], | |
| ) { | |
| Ok(_) => { | |
| panic!("Should not have succeeded generating state info with invalid config") | |
| } | |
| Err(e) => { | |
| assert_eq!(e.to_string(), | |
| "Schema error: Metadata column names must not match partition columns: part_col") | |
| } | |
| } | |
| let res = get_state_info( | |
| schema.clone(), | |
| vec!["part_col".to_string()], | |
| None, | |
| HashMap::new(), | |
| vec![("part_col", MetadataColumnSpec::RowId)], | |
| ); | |
| assert_result_error_with_message( | |
| res, | |
| "Schema error: Metadata column names must not match partition columns: part_col" | |
| ); |
kernel/src/scan/state_info.rs
Outdated
| match get_state_info( | ||
| schema.clone(), | ||
| vec![], | ||
| None, | ||
| get_string_map(&[("delta.columnMapping.mode", "name")]), | ||
| vec![("other", MetadataColumnSpec::RowIndex)], | ||
| ) { | ||
| Ok(_) => { | ||
| panic!("Should not have succeeded generating state info with invalid config") | ||
| } | ||
| Err(e) => { | ||
| assert_eq!(e.to_string(), | ||
| "Schema error: Metadata column names must not match physical columns, but logical column 'id' has physical name 'other'"); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| match get_state_info( | |
| schema.clone(), | |
| vec![], | |
| None, | |
| get_string_map(&[("delta.columnMapping.mode", "name")]), | |
| vec![("other", MetadataColumnSpec::RowIndex)], | |
| ) { | |
| Ok(_) => { | |
| panic!("Should not have succeeded generating state info with invalid config") | |
| } | |
| Err(e) => { | |
| assert_eq!(e.to_string(), | |
| "Schema error: Metadata column names must not match physical columns, but logical column 'id' has physical name 'other'"); | |
| } | |
| } | |
| } | |
| let res = get_state_info( | |
| schema.clone(), | |
| vec![], | |
| None, | |
| get_string_map(&[("delta.columnMapping.mode", "name")]), | |
| vec![("other", MetadataColumnSpec::RowIndex)], | |
| ); | |
| assert_result_error_with_message( | |
| res, | |
| "Schema error: Metadata column names must not match physical columns, but logical column 'id' has physical name 'other'" | |
| ); | |
| } |
kernel/src/scan/state_info.rs
Outdated
| match get_state_info(schema.clone(), vec![], None, metadata_config, metadata_cols) { | ||
| Ok(_) => { | ||
| panic!("Should not have succeeded generating state info with invalid config") | ||
| } | ||
| Err(e) => { | ||
| assert_eq!( | ||
| e.to_string(), | ||
| expected_error, | ||
| ) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| match get_state_info(schema.clone(), vec![], None, metadata_config, metadata_cols) { | |
| Ok(_) => { | |
| panic!("Should not have succeeded generating state info with invalid config") | |
| } | |
| Err(e) => { | |
| assert_eq!( | |
| e.to_string(), | |
| expected_error, | |
| ) | |
| } | |
| } | |
| } | |
| let res = get_state_info(schema.clone(), vec![], None, metadata_config, metadata_cols); | |
| assert_result_error_with_message(res, expected_error); | |
| } |
kernel/src/scan/state_info.rs
Outdated
| /// What are the names of the requested metadata fields | ||
| metadata_field_names: HashSet<&'a String>, | ||
| /// The name of the column that's selecting row indexes if that's been requested or None if they | ||
| /// are not requested . We remember this if it's been requested explicitly. this is so we can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// are not requested . We remember this if it's been requested explicitly. this is so we can | |
| /// are not requested. We remember this if it's been requested explicitly. This is so we can |
| transform_spec, | ||
| partition_values, | ||
| physical_schema, | ||
| None, /* base_row_id */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a note to get_cdf_transform_expr that makes it clear that we do not support reading row tracking info during CDF and that this is a known delta limitation? I want to avoid losing this context :)
Thanks, added #1417 to track |
What changes are proposed in this pull request?
StaticReplaceas a transform. the original intention was that it would be used for row ids, but we can't know the expression statically because the base row id changes for each file.How was this change tested?
To reviewers: I can add a new table with rowIds enabled to do an e2e test, not sure if we want to keep bloating the tables we check into the repo though.