feat: datafusion table provider next #3849
Draft
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
This PR adds a new table provider, in the hopes of applying all the learnings we had and leveraging modern datafusion APIs. There are several aspects we need to consider.
Implementations
Thus far we implement
TableProviderforDeltaTableand a dedicatedDeltaTableProvider. Specifically the implementation forDeltaTableis problematic, since we do not (or at least may not) know important information (i.e. schema) about the table.For log replay we implement
ScanFileStreamwhich consumes the kernelScanMetadatastream and processes it to collect file skipping stats and extract datafusionStatisticsto include in parquet execution planning.Statistics & file skipping
Both delta-kernel and datafusion's parquet handling allow optimising queries via predicates. We pass the predicate into the kernel scan to leverage kernels file skipping. We also add statistics to the
PartitionedFiles the get passed into the parquet plan to allow datrafusion to do its thing.However we no longer expose statistics on the
TableProvidersince this would always require a full log replay prior to constructing theTableProvider, which we do want to move away from.ListingTablein datafusion - which is likely most similar to our provider - takes a similar approach.Execution metrics
Thus far we collect operation statistics in several ways, including the custom
MetricsObservernode. While we likely need to retain this functionality, there are several stats we can collect more efficiently. Specifically we track files skipped and scanned when we do the log replay to plan the scan.Future work
push deletion vectors into parquet read
Currently we process deletion vectors after loading the data from the parquet file. This is due to uncertainties in handling row ids and other features that might be affected by skipping individual rows.