-
Couldn't load subscription status.
- Fork 537
feat: datafusion based kernel engine #3831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| let handler: Arc<dyn JsonHandler> = match self.handle.runtime_flavor() { | ||
| RuntimeFlavor::MultiThread => Arc::new(DefaultJsonHandler::new( | ||
| store, | ||
| Arc::new(TokioMultiThreadExecutor::new(self.handle.clone())), | ||
| )), | ||
| RuntimeFlavor::CurrentThread => Arc::new(DefaultJsonHandler::new( | ||
| store, | ||
| Arc::new(TokioBackgroundExecutor::new()), | ||
| )), | ||
| _ => panic!("unsupported runtime flavor"), | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We replicate this logic inside functions to avoid the generic parameter spilling out into our implementations. This tends to bubble up quite quickly since TaskExecutor is not dyn.
Once we move to datafusion specific implementations, these will not be generic over the runtime since datafusion and we are tied to tokio.
| Ok(Box::new( | ||
| grouped_files | ||
| .into_iter() | ||
| .map(|(url, files)| { | ||
| self.get_or_create_pq(url)?.read_parquet_files( | ||
| &files.to_vec(), | ||
| physical_schema.clone(), | ||
| predicate.clone(), | ||
| ) | ||
| }) | ||
| // TODO: this should not do any blocking operations, since this should | ||
| // happen when the iterators are polled and we are just creating a vec of iterators. | ||
| // Is this correct? | ||
| .try_collect::<_, Vec<_>, _>()? | ||
| .into_iter() | ||
| .flatten(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would love some feedback on this comment if I made the right assumptions here. We can also handle this in the map, but this gets a bit messy...
| pub(crate) fn group_by_store<T: IntoIterator<Item = impl AsObjectStoreUrl>>( | ||
| files: T, | ||
| ) -> std::collections::HashMap<ObjectStoreUrl, Vec<T::Item>> { | ||
| files | ||
| .into_iter() | ||
| .map(|item| (item.as_object_store_url(), item)) | ||
| .into_group_map() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the core piece for routing things to individual sub-instances scoped to an individual store.
There is a caveat in there in that we could be changing the order of inputs, which, of course, hurts log replay. But will look closer into this. The good news is that logs reside in a single store; only data files are potentially distributed across stores.
ba9df85 to
86e7655
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3831 +/- ##
==========================================
- Coverage 73.99% 73.74% -0.25%
==========================================
Files 148 151 +3
Lines 38904 39117 +213
Branches 38904 39117 +213
==========================================
+ Hits 28788 28848 +60
- Misses 8850 8999 +149
- Partials 1266 1270 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Robert Pack <[email protected]>
86e7655 to
7eeb6e6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this code is a no-op and not currently used, I think it's safe to merge in order to advance more collaboration in this area
Description
This is the first direct step toward kernelized scans. The core of this will be a new
TableProvider. This new engine directly integrates with the datafusion session (more specifically,TaskContext). I thought about making a more generic engine that we could also use for non-datafusion code-paths. However, we would like to exchange the "inner" implementations to also leverage datafusion abstractions, at which point these would likely diverge. Thus, hoping to integrate this deeper first, before seeing if and how we can share between implementations.Why do we need a dedicated engine? URL support and moving away from object store paths.
I have this integrated with a table provider locally, but I am trying to keep things reviewable.