-
Couldn't load subscription status.
- Fork 537
[RFC] feat!: kernel based log replay - take 2 #3474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3474 +/- ##
==========================================
- Coverage 74.33% 74.13% -0.21%
==========================================
Files 150 157 +7
Lines 45033 45765 +732
Branches 45033 45765 +732
==========================================
+ Hits 33476 33928 +452
- Misses 9401 9616 +215
- Partials 2156 2221 +65 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| fn logical_files_stream(&self, predicate: Option<PredicateRef>) -> SendableRBStream { | ||
| let scan = match self | ||
| .inner | ||
| .clone() | ||
| .scan_builder() | ||
| .with_predicate(predicate) | ||
| .build() | ||
| { | ||
| Ok(scan) => scan, | ||
| Err(err) => { | ||
| return Box::pin(futures::stream::once(async { | ||
| Err(DeltaTableError::KernelError(err)) | ||
| })) | ||
| } | ||
| }; | ||
|
|
||
| // TODO: which capacity to choose? | ||
| let mut builder = RecordBatchReceiverStreamBuilder::new(100); | ||
| let tx = builder.tx(); | ||
|
|
||
| let engine = self.engine.clone(); | ||
| builder.spawn_blocking(move || { | ||
| let mut scan_iter = scan.scan_metadata_arrow(engine.as_ref())?; | ||
| for res in scan_iter { | ||
| let batch = res?.scan_files; | ||
| if tx.blocking_send(Ok(batch)).is_err() { | ||
| break; | ||
| } | ||
| } | ||
| Ok(()) | ||
| }); | ||
|
|
||
| builder.build() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zachschuermann - any input / opinion on this?
| /// - `Some(expr)`: Apply this expression to transform the data to match [`Scan::schema()`]. | ||
| /// - `None`: No transformation is needed; the data is already in the correct logical form. | ||
| /// | ||
| /// Note: This vector can be indexed by row number. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the order of this vec guaranteed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, its an invariant that rows in the batch and entries in the vecotor are aligned.
| let scan_file_transforms = metadata | ||
| .scan_file_transforms | ||
| .into_iter() | ||
| .enumerate() | ||
| .filter_map(|(i, v)| metadata.scan_files.selection_vector[i].then_some(v)) | ||
| .collect(); | ||
| let batch = ArrowEngineData::try_from_engine_data(metadata.scan_files.data)?.into(); | ||
| let scan_files = filter_record_batch( | ||
| &batch, | ||
| &BooleanArray::from(metadata.scan_files.selection_vector), | ||
| )?; | ||
| Ok(ScanMetadataArrow { | ||
| scan_files, | ||
| scan_file_transforms, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kernel to arrow seems to filter to active add actions, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small note would help, I had to look up the delta-kernel-rs docs to understand it ^^
| fn logical_files(&self, predicate: Option<PredicateRef>) -> SendableRBStream { | ||
| if let Some(predicate) = predicate { | ||
| self.snapshot.logical_files(Some(predicate)) | ||
| } else { | ||
| let batch = self.files.clone(); | ||
| return Box::pin(futures::stream::once(async move { Ok(batch) })); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite follow this, why if you pass a predicate, you filter the lazySnapshot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats probably b/c it is not finished :).
What needs to happen here: The snapshot can be created with a predicate. If there is an existing predicate we need to validate that the new predicate would skip all the files the current predicate did skip. This is non trivial logic, so for now well likely jsut allow no existing predicate ... this check will eventually be integrated in kernel.
SO if there is one, we will be able to replay using the existing data - but only if the predicate is valid.
| let scan = snapshot.inner.clone().scan_builder().build()?; | ||
| let engine = snapshot.engine_ref().clone(); | ||
| // TODO: process blocking iterator | ||
| let files: Vec<_> = scan | ||
| .scan_metadata_from_arrow( | ||
| engine.as_ref(), | ||
| current, | ||
| Box::new(std::iter::once(self.files.clone())), | ||
| self.predicate.clone(), | ||
| )? | ||
| .map_ok(|s| s.scan_files) | ||
| .try_collect()?; | ||
|
|
||
| self.files = concat_batches(&files[0].schema(), &files)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does scan_builder re-use the existing state it has?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scan_metadata_from does exactly that. It will treat the existing data as if it were a snapshot as of the current version. Depending on what it finds internally it will re-use that data. (if there is a new snapshot, it will currently do a new log repay, but this can be improved in the future.
| /// Size of the file in bytes. | ||
| pub fn size(&self) -> i64 { | ||
| self.files | ||
| .column(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference would be to do column_by_name, I find column ordering more risky then incorrect naming : P, it;s also more explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, locally we are already computing the indices for the published (and validated) schema of the scan row.
| // TODO: which capacity to choose? | ||
| let mut builder = RecordBatchReceiverStreamBuilder::new(100); | ||
| let tx = builder.tx(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use these channels and not async iterators?
It's very difficult I guess to define the channel size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we mean with async iterators? streams?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would a stream by any other name be as fast? Streams are basically channels which are basically async Iterators., Functionally it is something which has poll_next on it, channels are typically what we call something which allows cross task/thread streams, which is usually why these things have to be bounded.
My recommendation would be to define an environment value and default, e.g.
| // TODO: which capacity to choose? | |
| let mut builder = RecordBatchReceiverStreamBuilder::new(100); | |
| let tx = builder.tx(); | |
| let mut builder = RecordBatchReceiverStreamBuilder::new(std::env::var("RECORD_BATCH_STREAM").unwrap_or(1024)); | |
| let tx = builder.tx(); |
| let table_url = if let Some(op_id) = operation_id { | ||
| #[allow(deprecated)] | ||
| log_store.transaction_url(op_id, &log_store.table_root_url())? | ||
| } else { | ||
| log_store.table_root_url() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really need the transaction url for reading tbf
| /// Trait for types that stream [RecordBatch] | ||
| /// | ||
| /// See [`SendableRecordBatchStream`] for more details. | ||
| pub trait RecordBatchStream: Stream<Item = DeltaResult<RecordBatch>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a similar trait in arrow-rs, why didn't you use that?
e872fa2 to
25434bd
Compare
|
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
cd7afcc to
7a0bf1a
Compare
Signed-off-by: Robert Pack <[email protected]>
Description
This is a redo of #3137.
Since the first attempt, kernel has evolved quite a bit, as has our codebase. We also learned a lot, particularly around some of the complexities of "nested" async evaluation that comes with kernels engine concepts.
A few things that could guide designs:
Engine: Anything we can do using kernelsEngineabstractions, we can do without the need to have datafusion enabled. Potentially significantly extending what we can offer to arrow-only users.async: kernel exposes blocking iterators that only "appear to be sync". We need to figure out how to best handle these. Kernel may one day also expose apis or utilities to more seamlessly integrate with rustasyncso we should hide these complexities from end users.Since this will (and should) have a significant impact on large parts of the codebase, we should not even try to do a once-off switch. Rather we propose the following migration strategy.
Snapshotthat exposes a minimal API we require in operations planning etc.This should allow us to incrementally "kernelize" our codebase and give us the opportunity to properly test the new snapshots. Opt-in to using new snapshots should for now be done via either feature flags or (maybe even better) runtime configuration.
Still figuring out some basics, but putting it up anyway for feedback.
Major breaking changes
A lot of APIs will be broken eventually, but listing some of the more fundamental changes.
No more tombstones
Currently we expose tombstones as part of our state / snapshots. Since Checkpoint writing uses kernel now, we really only need that to plan vacuums, which is a "special" or at least table format specific maintenance operation. As such, we internalise the logic how a vacuum is planned.