-
Couldn't load subscription status.
- Fork 537
refactor: use EagerSnapshot in datafusion module #3796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
38d7807 to
928471e
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3796 +/- ##
==========================================
- Coverage 76.09% 76.06% -0.03%
==========================================
Files 145 145
Lines 45200 45273 +73
Branches 45200 45273 +73
==========================================
+ Hits 34397 34439 +42
- Misses 9107 9144 +37
+ Partials 1696 1690 -6 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| #[deprecated(since = "0.30.0", note = "Use `files` with kernel predicate instead.")] | ||
| pub fn file_views_by_partitions( | ||
| &self, | ||
| log_store: &dyn LogStore, | ||
| filters: &[PartitionFilter], | ||
| ) -> BoxStream<'_, DeltaResult<LogicalFileView>> { | ||
| if filters.is_empty() { | ||
| return self.files(log_store, None); | ||
| } | ||
| let predicate = match to_kernel_predicate(filters, self.snapshot.schema()) { | ||
| Ok(predicate) => Arc::new(predicate), | ||
| Err(err) => return Box::pin(futures::stream::once(async { Err(err) })), | ||
| }; | ||
| self.files(log_store, Some(predicate)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a migration helper. We should move the conversion of PartitionFilter to predicate either to the python crate, or even better translate DNF directly to kernel predicates. Getting rid of this completely would have required a much larger change.
| let context = SessionContext::new(); | ||
| let df_schema = logical_schema.clone().to_dfschema()?; | ||
|
|
||
| let logical_filter = self | ||
| .filter | ||
| .clone() | ||
| .map(|expr| simplify_expr(&context, &df_schema, expr)); | ||
| .map(|expr| simplify_expr(self.session, &df_schema, expr)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a drive-by fix and a good example of what @rtyler raised w.r.t. LogStores. We were creating a new session while also tracking a session on the operation. i.e. using inconsistent datafusion sessions in the same operation.
Signed-off-by: Robert Pack <[email protected]>
928471e to
1fa9932
Compare
Signed-off-by: Robert Pack <[email protected]>
674ea63 to
c0b8935
Compare
|
|
||
| let provider = DeltaTableProvider::try_new( | ||
| table.snapshot()?.clone(), | ||
| table.snapshot()?.snapshot().clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆
| futures::stream::iter(iter).boxed() | ||
| } | ||
|
|
||
| #[deprecated(since = "0.30.0", note = "Use `files` with kernel predicate instead.")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆 what version number is this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was hoping the net one we release 😆
| } | ||
|
|
||
| #[deprecated( | ||
| since = "0.1.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well ... i feel like we knew this was coming early on.
crates/core/src/operations/cdc.rs
Outdated
| table.load().await.expect("Failed to reload table"); | ||
| let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); | ||
| let result = | ||
| should_write_cdc(&table.snapshot().unwrap().snapshot).expect("Failed to use table"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we were hacking yesterday this kind of accessing into the snapshot field of the EagerSnapshot is something we should probably be removing.
Is it possible to put a #[cfg(test)] around that to keep its isolation into tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to get rid of some more call sites of this bit not all. its pub(crate) for now and should go away as we consolidate.
Co-authored-by: Ion Koutsouris <[email protected]> Signed-off-by: Robert Pack <[email protected]>
Signed-off-by: Robert Pack <[email protected]>
0cc7d26 to
b18ebf6
Compare
|
@rtyler @ion-elgreco - addressed your feedback and would appreciate some new stamps :). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☑️
Description
While migrating to kernel log replay we took on a lot of tech dept, that we no need to clean up :).
One reason for bloat and is the similar nature of
DeltaTableState,EagerSnapshot, andSnapshot. In this PR we reduce the API surface that useDeltaTableStatein favour of usingEagerSnapshot. While we still require some pathfinding, the most likely candidate to consolidate is usingEagerSnapshotand getting rid of the others.Almost all operations are migrated, except
Vacuumwhich would have required too much changes to business logic and will be migrated later.Related Issue(s)
related #3733