-
Couldn't load subscription status.
- Fork 537
fix: avoid overflow for large table state #3801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3801 +/- ##
==========================================
+ Coverage 74.31% 74.37% +0.06%
==========================================
Files 145 145
Lines 39441 39482 +41
Branches 39441 39482 +41
==========================================
+ Hits 29309 29365 +56
+ Misses 8729 8719 -10
+ Partials 1403 1398 -5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I can confirm that this does not exhaust memory on said table: |
|
@rtyler - the PR grew a bit in size, but hopefully for good cause. We may now see that we are no longer using as much memory, since we are no longer tracking the serialised stats as part of the file data. Would you mind confirming? 😄 |
| let mut pruned_batches = Vec::new(); | ||
| let mut mask_offset = 0; | ||
|
|
||
| for batch in &self.snapshot.files { | ||
| let batch_size = batch.num_rows(); | ||
| let batch_mask = &mask[mask_offset..mask_offset + batch_size]; | ||
| let batch_mask_array = BooleanArray::from(batch_mask.to_vec()); | ||
| let pruned_batch = filter_record_batch(batch, &batch_mask_array)?; | ||
| if pruned_batch.num_rows() > 0 { | ||
| pruned_batches.push(pruned_batch); | ||
| } | ||
| mask_offset += batch_size; | ||
| } | ||
|
|
||
| LogDataHandler::new(&pruned_batches, es.table_configuration()).statistics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was not really nice before and unfortunately got a bit less nice.
Longer therm i think we may have to decide if we need additional skipping from datafsuion, or rely on the file skipping in delta-kernel to be selective (we have no reason to believe it would not be :)).
| let stats_schema = self.stats_schema()?; | ||
| let stats_schema: ArrowSchema = stats_schema.as_ref().try_into_arrow()?; | ||
| fields.push(Arc::new(Field::new( | ||
| fields[stats_idx] = Arc::new(Field::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are now replacing the existing stats field with the stats_parsed rather than amending the parsed ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While not entirely clean yet, we aim to isolate processing of the data we get from kernels log replay in this module. Essentially we need to revert when we do when receiving data when we feed it back into a scan / replay.
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| let this = self.project(); | ||
| match this.stream.poll_next(cx) { | ||
| Poll::Ready(Some(Ok(batch))) => match parse_stats_column(&this.snapshot, &batch) { | ||
| Ok(batch) => Poll::Ready(Some(Ok(batch))), | ||
| Err(err) => Poll::Ready(Some(Err(err))), | ||
| }, | ||
| other => other, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems work has started to support async/streams directly from kernel. As such we start to move some processing onto streams rather that doing it in iterator world.
This should also align well when we work on the datafusion integrations, since there we find the same model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[citation needed] 😆
There's been a lot of talk that I have heard but I haven't seen any concrete changes, do you have some to link?
| pub fn stats(&self) -> Option<String> { | ||
| let stats = self.stats_parsed()?.slice(self.index, 1); | ||
| let value = to_json(&stats) | ||
| .ok() | ||
| .map(|arr| arr.as_string::<i32>().value(0).to_string()); | ||
| value.and_then(|v| (!v.is_empty()).then_some(v)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we now need to serialise individual fields to get the json stats (for add actions) since we are no longer stacking the stats column.
| /// | ||
| /// A stream of [`LogicalFileView`] objects. | ||
| pub fn files( | ||
| pub fn file_views( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed this for consistency since it is returning file views after all.
| .map(|file| evaluator.evaluate_arrow(file.clone())) | ||
| .collect::<Result<Vec<_>, _>>()?; | ||
|
|
||
| let result = concat_batches(results[0].schema_ref(), &results)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still concatenate the add actions table. in a follow-up we should also move this to a stream and expose that via record batch readers in python.
Opened #3811 to track this.
Signed-off-by: Robert Pack <[email protected]>
Signed-off-by: Robert Pack <[email protected]>
Signed-off-by: Robert Pack <[email protected]>
Signed-off-by: Robert Pack <[email protected]>
| itertools = "0.14" | ||
| parking_lot = "0.12" | ||
| percent-encoding = "2" | ||
| pin-project-lite = "^0.2.7" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this dependency crept back in, I'll just have to remove it again 😆
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| let this = self.project(); | ||
| match this.stream.poll_next(cx) { | ||
| Poll::Ready(Some(Ok(batch))) => match parse_stats_column(&this.snapshot, &batch) { | ||
| Ok(batch) => Poll::Ready(Some(Ok(batch))), | ||
| Err(err) => Poll::Ready(Some(Err(err))), | ||
| }, | ||
| other => other, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[citation needed] 😆
There's been a lot of talk that I have heard but I haven't seen any concrete changes, do you have some to link?
Description
When loading the active add files into memory, we concatenate the batches read from the log. For very large logs, we may exceed the admissible size for an individual array, specifically likely for large stats fields.
@rtyler - mind checking out if this fixes the issue we see on large tables? And, do we have an issue for this?
closes: #3767