Skip to content

dynamic filter refactor #15685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft

Conversation

jayzhan211
Copy link
Contributor

Which issue does this PR close?

  • Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label Apr 11, 2025
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for any alternative proposal to be viable it needs to work with the current tests without calling new methods, or making copies differently, etc. The test was crafted that way because it reflects what actually happens during execution based on #15301

let dynamic_filter_1 = dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#);
let dynamic_filter_2 = dynamic_filter.with_schema(Arc::clone(&filter_schema_2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is expected to call with_schema? This seems like a new method on DynamicFilterPhysicalExpr. We need reassign_predicate_columns to work, that's what gets called from within ParquetSource, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you need reassign_predicate_columns, basically it re-project columns based on the provided schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but the constraint is that we can't modify what reassign_predicate_columns and similar do internally, otherwise that's a ton of API churn. The existing design works within the confines of the existing APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need reassign_predicate_columns to work, that's what gets called from within ParquetSource, etc

We can also change reassign_predicate_columns inside Parquet if that bring us to the better state. From my view, reassign_predicate_columns is the root cause why you ends up with_new_children that doesn't actually updating the "children".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise that's a ton of API churn.

We only use in DatafusionArrowPredicate, is there any other places we need to change?

The main point is that with_new_children in the main branch isn't doing the right thing, it should update the source filter instead, but you only update the remapped filter schema. I think the filter schema is "parameters" for remapping column indexes, it doesn't need to be part of the filter expressions at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really want to keep reassign_predicate_columns for whatever reason, you should pass the inner of DynamicFilterPhysicalExpr instead, so you are only modifying the inner and not the whole DynamicFilterPhysicalExpr. The difference is that the with_new_children is not called on the DynamicFilterPhysicalExpr level.

Comment on lines -162 to -163
let inner =
Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that evaluate no longer uses a version with remapped children right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now you evaluate based on the snapshot. snapshot is the remapped filter with your filter schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we're going in circles... this is the thing that was expected to be used to produce the snapshots... now we're using snapshots to produce it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the difference is that

Your version

  1. we have source filter A
  2. create dynamic filter a and b by different filter schema
  3. you create snapshot a, b from them.
  4. evaluate batches by snapshot
  5. update source filter A to B
  6. dynamic filter a, b remap based on source filter B when you call evaluate

My version

  1. we have source filter A
  2. create snapshot based on source filter A + filter schema A and B
  3. evaluate batches by snapshot
  4. update source filter A to B
  5. create snapshot based on source filter B + filter schema A and B
  6. evaluate batches by snapshot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snapshot is actually an evaluated dynamic filter based on the schema.

Basically, what you have is just the filter expression. You provide the schema to remap the column indexes. You get yet another filter expression.

Comment on lines 48 to 40
inner: Arc<RwLock<PhysicalExprRef>>,
inner: PhysicalExprRef,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayzhan211 how can this have multiple readers and a writer updating with some sort of write lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need it. Given a source filter, you create snapshot with the schema. Then you evaluate based on the remapped filter. When you need a new source filter, instead of updating it, just create a new one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how do you pipe the new filter down into other operators?

The whole point is that you can create a filter at planning time, bind it to a ParquetSource and a SortExec (for example) and then the SortExec can dynamically update it at runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point is that you can create a filter at planning time, bind it to a ParquetSource and a SortExec (for example) and then the SortExec can dynamically update it at runtime.

instead of sending the filter down, the change I have is sending the filter schema down. It is used to create another filter (snapshot) in SortExec dynamically at runtime.

Comment on lines +134 to +135
pub fn update(&mut self, filter: PhysicalExprRef) {
self.inner = filter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will writers have mutable access to this if they have to package it up in an Arc?

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Apr 12, 2025

#15568 (comment)

why the change is equivalent to yours in the high level idea.

  1. DynamicFilterPhysicalExpr gets initialized at planning time with a known set of children but a placeholder expression (lit(true))

The same.

  1. with_new_children is called making a new DynamicFilterPhysicalExpr but with the children replaced (let's ignore how that happens internally for now)

We need to replace children, and we can achieve this and get the result by snapshot

  1. update is called on the original reference with an expression that references the original children. This is propagated to all references, including those with new children, because of the Arc<RwLock<...>>.

We can keep Arc<RwLock<T>> and update the inner or even create another new source filter.

  1. evaluate is called on one of the references that previously had with_new_children called on it. Since update was called, which swapped out inner, the children of this new inner need to be remapped to the children that we currently expose externally.

We can call evaluate on snapshot because snapshot is already remapped. Your version need to remap for each evaluate called, but snapshot in my version done it once, and we evaluate on it without remap.

The improvement of this chanage

  1. We have correct with_new_children because we update the source filter now.
  2. DynamicFilterPhysicalExpr is basically filter expression: Arc<dyn PhysicalExpr>>. We have a simple interface with the same capability now.

Concern

  1. reassign_predicate_columns is replaced by snapshot but you think we can't do this kind of change because of API churn. I think this is not an issue because only DatafusionArrowPredicate is used.
  2. Do we need Lock for source filter? I think we can create another new DynamicFilterPhysicalExpr at all. But maybe there is some reasons we can't, we can discuss further on this.

@adriangb
Copy link
Contributor

Hey @jayzhan211 thank you for putting the work into trying to clarify this.

At this point I think it would be best to wait for #15566 or a PR that replaces it to be merged so that we can work against an actual use case / implementation of these dynamic filters. Otherwise I think it's a bit hard to communicate in such abstract terms. Once we're looking at a concrete use case it will be easier to make a PR to replace this implementation.

Would it be okay with you to wait until that happens to continue this discussion?

Sorry if merging a PR with a bad implementation becomes problematic... luckily it's problematic for us, not end users, since this is all private implementations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Changes to the physical-expr crates
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants