-
Couldn't load subscription status.
- Fork 118
refactor: Consolidate physical/logical info into StateInfo #1350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Consolidate physical/logical info into StateInfo #1350
Conversation
dc9e0c6 to
6732472
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1350 +/- ##
==========================================
+ Coverage 84.80% 84.83% +0.02%
==========================================
Files 113 113
Lines 28642 28735 +93
Branches 28642 28735 +93
==========================================
+ Hits 24289 24376 +87
- Misses 3196 3203 +7
+ Partials 1157 1156 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
6732472 to
7fc84e1
Compare
| // Extract the physical predicate from StateInfo's PhysicalPredicate enum. | ||
| // The DataSkippingFilter and partition_filter components expect the predicate | ||
| // in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from | ||
| // the enum representation to the tuple format. | ||
| let physical_predicate = match &state_info.physical_predicate { | ||
| PhysicalPredicate::Some(predicate, schema) => { | ||
| // Valid predicate that can be used for data skipping and partition filtering | ||
| Some((predicate.clone(), schema.clone())) | ||
| } | ||
| _ => { | ||
| // Either PhysicalPredicate::None (no predicate provided) or | ||
| // PhysicalPredicate::StaticSkipAll (predicate always false). | ||
| // StaticSkipAll is handled at a higher level, so here we treat both as None. | ||
| None | ||
| } | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not put this below in scan_metadata_inner with the other PhysicalPredicate cases?
if let PhysicalPredicate::StaticSkipAll = self.state_info.physical_predicate {
return Ok(None.into_iter().flatten());
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we want to filter it out StaticSkipAll cases in scan_metadata_inner, but if we parse it there we'll have to add an extra param to scan_action_iter. However the extra param contains data already contained in StateInfo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After taking a look, I think it's fine as it is, unless we want to add a variable to StateInfo that says "canStaticallySkip" and have the predicate stored as an option there.
Do you have a better idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like how it is rn :) Just add a debug_assert! that ensures we're not reaching here with a StaticSkipAll case
|
I'm loving these changes 🔥 Cleans up kernel's concepts significantly |
| let mut last_physical_field: Option<String> = None; | ||
|
|
||
| // Loop over all selected fields and build both the physical schema and transform spec | ||
| for (index, logical_field) in logical_schema.fields().enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: Hide Whitespace helps
| // Extract the physical predicate from StateInfo's PhysicalPredicate enum. | ||
| // The DataSkippingFilter and partition_filter components expect the predicate | ||
| // in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from | ||
| // the enum representation to the tuple format. | ||
| let physical_predicate = match &state_info.physical_predicate { | ||
| PhysicalPredicate::Some(predicate, schema) => { | ||
| // Valid predicate that can be used for data skipping and partition filtering | ||
| Some((predicate.clone(), schema.clone())) | ||
| } | ||
| _ => { | ||
| // Either PhysicalPredicate::None (no predicate provided) or | ||
| // PhysicalPredicate::StaticSkipAll (predicate always false). | ||
| // StaticSkipAll is handled at a higher level, so here we treat both as None. | ||
| None | ||
| } | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like how it is rn :) Just add a debug_assert! that ensures we're not reaching here with a StaticSkipAll case
| if let PhysicalPredicate::StaticSkipAll = self.state_info.physical_predicate { | ||
| return Ok(None.into_iter().flatten()); | ||
| } | ||
| let it = scan_action_iter(engine, action_batch_iter, self.state_info.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we discussed pushing this down? Did that not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't successful, the way it's implemented made it so there wasn't an easy way to just return a null iterator.
| }; | ||
|
|
||
| let transform_spec = | ||
| if !transform_spec.is_empty() || column_mapping_mode != ColumnMappingMode::None { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're only generating a transform because of column mapping, that's not really a transform, it's more like a "schema change". I guess we could model that as some kind of Identity expression + a schema, or maybe there's a better way to do it, but regardless, generating a bunch of identity transforms per column is probably silly.
not to be fixed in this PR, but can we make a follow-up issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made an issue, feel free to expand on it #1369
c3fa41b to
d7e99c3
Compare
What changes are proposed in this pull request?
Issue #1336
As identified in #1330
the codebase had fragmented schema state management - physical/logical schemas, predicates, and transform specs were scattered across multiple structs (Scan, ScanLogReplayProcessor, etc.) We had
This PR consolidates all schema-related state into StateInfo as the single source of truth.
This change should also somewhat improve performance, as transform specs are now computed once during scan building and not during every scan execution.
This PR only targets scan, CDF is still using the old pattern. We will followup to eliminate ColumnType/all_fields completely.
How was this change tested?
Existing Unit tests + added tests for StateInfo