-
Notifications
You must be signed in to change notification settings - Fork 104
Add ComplexFilterPushdown
support
#477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v1.4-andium
Are you sure you want to change the base?
Conversation
Tests failure is unrelated I think:
|
d94bdab
to
45fb399
Compare
Update I implemented using the CTEs for column stats, this changes the generated sql to below. I add MATERIALIZATED hint whenever it is referenced multiple times, although duckdb defaults to materializing I wasn't sure if other databases used as the metastore would. This change to using CTEs reduced the slowdown observed now its ~.12 seconds instead of ~.25 seconds, but is still there. Running the query alone via CLI takes ~.037 seconds Updated detailed profiling output
WITH col_21_stats AS MATERIALIZED (
SELECT
data_file_id,
max_value,
min_value
FROM "__ducklake_metadata_ducklake"."main".ducklake_file_column_stats
WHERE
column_id = 21 AND table_id = 155
), col_19_stats AS MATERIALIZED (
SELECT
data_file_id,
min_value,
max_value
FROM "__ducklake_metadata_ducklake"."main".ducklake_file_column_stats
WHERE
column_id = 19 AND table_id = 155
)
SELECT
data.path,
data.path_is_relative,
data.file_size_bytes,
data.footer_size,
data.row_id_start,
data.begin_snapshot,
data.partial_file_info,
data.mapping_id,
del.path,
del.path_is_relative,
del.file_size_bytes,
del.footer_size
FROM "__ducklake_metadata_ducklake"."main".ducklake_data_file AS data
LEFT JOIN (
SELECT
*
FROM "__ducklake_metadata_ducklake"."main".ducklake_delete_file
WHERE
table_id = 155
AND 310 >= begin_snapshot
AND (
310 < end_snapshot OR end_snapshot IS NULL
)
) AS del
USING (data_file_id)
WHERE
data.table_id = 155
AND 310 >= data.begin_snapshot
AND (
310 < data.end_snapshot OR data.end_snapshot IS NULL
)
AND (
(
(
data_file_id IN (
SELECT
data_file_id
FROM col_19_stats
WHERE
min_value IS NULL
OR max_value IS NULL
OR max_value > '9400_12699'
AND min_value < '9400_12701'
)
AND data_file_id IN (
SELECT
data_file_id
FROM col_21_stats
WHERE
max_value IS NULL
OR min_value IS NULL
OR 0 BETWEEN TRY_CAST(min_value AS BIGINT) AND TRY_CAST(max_value AS BIGINT)
)
)
)
OR (
(
data_file_id IN (
SELECT
data_file_id
FROM col_19_stats
WHERE
max_value IS NULL
OR min_value IS NULL
OR '9400_12700' BETWEEN min_value AND max_value
)
AND data_file_id IN (
SELECT
data_file_id
FROM col_21_stats
WHERE
max_value IS NULL
OR min_value IS NULL
OR 1 BETWEEN TRY_CAST(min_value AS BIGINT) AND TRY_CAST(max_value AS BIGINT)
)
)
)
OR (
data_file_id IN (
SELECT
data_file_id
FROM col_19_stats
WHERE
max_value IS NULL
OR min_value IS NULL
OR '8900_10000' BETWEEN min_value AND max_value
)
)
)
|
Hi @J-Meyers thanks for the PR, I skimmed over the code, and it seems to go in the right direction. I'll have a deeper look later this week, probably tomorrow. The CI failure is definitely unrelated, and should now be fixed, could you merge again? Regarding the tests, unfortunately, AI-generated code/tests go against our guidelines, since these add a high burden on the team to review the code and to maintain it in the future. Could you replace the test with isolated portions (having a minimal number of columns and expressions to express the desired test). Also, break it down into multiple file tests to make it more modularized. Thanks again! |
Happy to write the tests if this seems like something that could get merged. Are there any particular things you want to see tested? The general things I was looking for:
I will wait to push merge with dev until tests are written to avoid running CI twice |
From my point of view we should initially focus on supporting and testing the cases covered by the filter combiner, since that's also what iceberg and delta supports, but I'm not super involved with the Complex/Dynamic-Filter, so I've asked @samansmink to review the PR and maybe he has a better answer to it :-) |
Added new smaller tests in separate files, these aren't as extensive as the others, but should cover the major cases. I improved the logic handling when a complex filter is followed by a dynamic filter or more complex filters so that it keeps all the initial filters and only adds new ones if they're not identical to existing ones (if they're functionally the same, but not actually identical this is not recognized). Looking forward to a review. |
Hi @J-Meyers thanks for the effort in cleaning up the PR, is indeed much more readable now, I'm not a File Reader Filter pushdown specialist, but I was a bit surprised with the complexity of the code when compared to iceberg, where it seems that the |
This is doing more work that the FilterCombiner is not capable of, if you try any other scanner on these tests it will fail, currently the return type of the filter combiner is a TableFilterSet which gives one set of restrictions per column and then all restrictions must be AND ed together from all of the columns. This sort of filtering is appropriate when reading the metadata from the columns is quite expensive and expressing more comlex logic is difficult; however, with ducklake we have the nice advantage that all of the metadata is in a database accessible by SQL so we need not be restricted to expressing a single filter per column and anding them all together. The TableFilterSet cannot express I'll also say that specifically with iceberg lots of the logic is hidden in other places for when the filters are actually evaluated, not when they are pushed down, which in iceberg is just adding them to a TableFilterSet. These other multifile list scanners like delta and iceberg don't actually support complex pushdown, instead as their first step they simplify the complex filters into a simple TableFilterSet that can then be evaluated because evaluating arbitrary expressions unless you already have an engine for it is hard and there are more significant costs to reading in the data. Edit: The current test failure seems unrelated, it never runs anything along the multi_file_list pushdown codepaths. And that test passed ona different architecture. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @J-Meyers, I think things generally look very good! Thanks again for all the effort!
I had some comments and clarification questions (that could potentially become code comments), but otherwise it looks great.
One thing I’m thinking: it’s probably worth running the DuckDB tests on this to improve correctness. A nice way of achieving this could be to have a CI run with the target_file_size
limited to a very small size.
I think that can be achievable by having an extra run at https://github.com/duckdb/ducklake/blob/main/.github/workflows/ConfigTests.yml with a new config file using sth like CALL my_ducklake.set_option('target_file_size', 'something_sensible'
); at https://github.com/duckdb/ducklake/blob/main/test/configs/attach_ducklake.json#L3
Otherwise, I'll leave it to @samansmink to do another review pass!
//! Deferred filter evaluation state | ||
mutable bool filters_evaluated = false; | ||
//! Complex filters stored as vector of expressions for deferred evaluation | ||
mutable vector<unique_ptr<Expression>> pending_complex_filters; | ||
//! Dynamic filters stored as TableFilterSet for deferred evaluation | ||
mutable TableFilterSet pending_dynamic_filters; | ||
//! Column information for deferred filter evaluation | ||
mutable vector<column_t> deferred_column_ids; | ||
//! ClientContext for deferred filter evaluation | ||
mutable ClientContext *deferred_context = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we wrap this to a DeferredFilters struct/class? with a description also what a deferred filter is, in this context. We could probably separate the files as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also move methods related to them to the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a struct with a Copy function. However, none of the other functions are currently associated with the class, I was trying to keep most of that within the .cpp file to avoid binary bloat. What functions do you want associated with the class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I addressed all the core code comments.
For the small files I ended up running with 1KB that felt really small to me, but maybe even smaller is appropriate.
//! Deferred filter evaluation state | ||
mutable bool filters_evaluated = false; | ||
//! Complex filters stored as vector of expressions for deferred evaluation | ||
mutable vector<unique_ptr<Expression>> pending_complex_filters; | ||
//! Dynamic filters stored as TableFilterSet for deferred evaluation | ||
mutable TableFilterSet pending_dynamic_filters; | ||
//! Column information for deferred filter evaluation | ||
mutable vector<column_t> deferred_column_ids; | ||
//! ClientContext for deferred filter evaluation | ||
mutable ClientContext *deferred_context = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a struct with a Copy function. However, none of the other functions are currently associated with the class, I was trying to keep most of that within the .cpp file to avoid binary bloat. What functions do you want associated with the class?
// Check for duplicates while building the new filter list | ||
vector<unique_ptr<Expression>> new_filters_to_add; | ||
|
||
for (auto &filter : filters) { | ||
bool is_duplicate = false; | ||
for (auto &existing_filter : pending_complex_filters) { | ||
if (filter->Equals(*existing_filter)) { | ||
is_duplicate = true; | ||
break; | ||
} | ||
} | ||
if (!is_duplicate) { | ||
new_filters_to_add.push_back(filter->Copy()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the underlying cause so don't want to annotate it with something wrong, but for the same query ComplexFilterPushdown is called repeatedly, for example for the query that is the first test in equality_propagation.test
query I
SELECT COUNT(*) FROM ducklake.test_table WHERE id = 50 AND id = duplicate_id;
----
1
It is first called with filters:
(id = duplicate_id)
(id = 50)
(duplicate_id = 50)
But then again called with filter:
(id = duplicate_id)
But for example this also happens with relatively simple cases too where for the first case of basic_cross_column_pushdown.test
query II
EXPLAIN ANALYZE SELECT COUNT(*) FROM ducklake.test_table WHERE id = 50 OR category = 3;
----
analyzed_plan <REGEX>:.*Total Files Read: 2.*
It is called twice with the same filters of:
((id = 50) OR (category = 3))
I wanted to handle whatever they push, in all of the ducklake tests I have not observed multiple calls producing new filters, but when running the join tests in the duckdb repo there does seem to be additional new filters. If they do produce different filters that need to be unified, my interpretation was that they are all additive since any one individually must be valid, which is why I add any new ones to the same vector of expressions that are all eventually ANDed together so that we are maximally restrictive in the list that we provide.
Example that produces unique filters;
(from duckdb test/sql/join/inner/test_using_join.test)
statement ok
CREATE TABLE t1 (a INTEGER, b INTEGER, c INTEGER);
statement ok
INSERT INTO t1 VALUES (1,2,3);
statement ok
CREATE TABLE t2 (a INTEGER, b INTEGER, c INTEGER);
statement ok
INSERT INTO t2 VALUES (1,2,3), (2,2,4), (1,3,4);
query III
SELECT t2.a, t2.b, t2.c FROM t1 JOIN t2 USING(a,b)
----
1 2 3
The first call supplies filter: (a <= 1)
The second call supplies filter: (b <= 2)
This test failed but was unrelated prior to adding the small file stuff SELECT (SELECT rowid FROM a LIMIT 1) Running the test locally I was able to very inconsistently replicate the error I had to exclude the read only tests from duckdb since we have to modify the database to set the target file size. You can see that all tests passed once those were excluded here: https://github.com/duckdb/ducklake/actions/runs/18204520486/job/51831513956?pr=477 The current failure is a seemingly unrelated error that seems to happen spuriously see the CI that adds the != comparison, but passes on all subsequent test runs: https://github.com/duckdb/ducklake/actions/runs/18170768231/job/51724562395 |
Hi @J-Meyers, thanks again for all the adjustments! I had a quick internal chat about this PR, and indeed, as I had pointed out before, the work on generating the filters should be handled by the filter combiner rather than in DuckLake, since these filters can also be pushed down for Iceberg and Delta. The idea would be to move that part of the code to DuckDB. We could perhaps make the filter combiner configurable in terms of which filters it should push down, depending on the scanner, or scanners can also ignore filters they can't pushdown, although i believe the former is cleaner. In DuckLake, we should keep the filter to query transformation and the small file CI. @Mytherin, maybe you’d like to expand on this? |
What should the output type be? Right now filters can be only of a single column, and TableFilterSets can only express a series of restrictions on various single columns all ANDed together, so returning an Expression seems like it would make sense, but transforming from an expression to filters is a lot of the work of the FilterCombiner, and that would leave that transformation to the downstream users somehow. The FilterCombiner itself can only represent equivalence sets at a single level really right now, how would it work, should I still recursively create FilterCombiner s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really cool work @J-Meyers!
I took a look here and my general impression is that this is a high quality PR adding a desired feature. I have a bit of a worry though, and that is that this PR is quite complex. It's quite dense to review and making changes to or debugging this code in the future would require significant effort from our side. This fact combined with what @pdet stated about wanting to support this generically for DuckDB to also serve the delta and iceberg extensions, makes me slightly hesitant here.
My recommendation would to either:
- split this up into more easily reviewable chunks, to ensure this is more manageable to maintain and change once the duckdb team implements more of this generically duckdb side
- immediately switch to implementing the complex parts of this PR DuckDB, side potentially with some guidance from our end
This adds support for relatively complex filters being pushed down via
ComplexFilterPushdown
Followup to #471
Motivation:
As mentioned in my previous PR even relatively simple queries currently can't be pushed down without this
Where the manifesto says
But it doesn't currently actually apply in a variety of cases, to the point of actually worse performance than hive structured files (I think) in some cases because the base duckdb will pushdown complex filters with hive
With optimizations like this I don't think any fair/realistic benchmarks can be made since it's just 1/however big N you want
Design:
The general design is for each expression try and pass them through the FilterCombiner since that can do a lot of work, then if there are leftovers recurse into the ORs and ANDs, if the combiner was able to extract useful groups of functions then parse them as we did before. Propogate when pieces are unsatisfiable, but in general keep as many restrictions as possible. Since we're interacting in SQL we can use that to handle whatever arbitrary expression trees we want rather than just being restricted to single column filters as in TableFilterSet.
The guiding principle is to have the catalog do as much work as possible given the expected balance between the size of ducklake (massive) and the size of data for common queries (small enough for a single machine).
This will likely add some tiny overhead on queries where the filters don't actually eliminate any files from being included but are complicated queries, particularly when the data in ducklake is of comparable size to the metadata (which seems incredibly rare).
Annoyingly, the actual creation of the queries has to be delayed a bit because we may later receive a dynamic pushdown, we want that to still keep all our old more complex filters and not add new filters unless otherwise necessary.
Performance:
When running the generated query via CLI on realistic datasets just running the query via cli takes < .05 seconds
However instead when logging the time within
DuckLakeTransaction::Query
when running the relevant user query the generated query takes ~0.25 seconds I'm not sure why there is this disconnect, but it is pretty significantThese numbers are when running with duckdb as the metadata database, the performance may be different on different metadata stores, but still should be faster than reading hundreds of parquet files.
We reference the same column for stats multiple times it's possible for there to be slight performance improvements by creating materialized CTE that gets the relevant stats for each column, but I'm not sure how significant it will be vs the added complexity. duckdb/duckdb#19080 should begin the process of doing it automatically at least within duckdb.Flame graph running in debug
Flame graph running in release
Detailed profiling output when running within ducklake (weird disconnect between the Total Time and the time of any actual step)
Json Output (does not show the extra delay)
Future work:
FilterCombiner
likely there is further room for simplification between branches in the expression tree that theFilterCombiner
does not currently handleEXPLAIN ANALYZE
to analyze the time spent doing these queries instead I was just logging them and running them separatelyExample:
Incoming Query:
Previously this query that I actually ran (with the bounding comparisons replaced with an equality) queried all 65740 files, where now it queries just 5 relevant ones
Generated (after formatting with sqlglot):
Disclaimer: I used AI to write the tests provided here and then manually modified them, just due to the necessity to cover a lot of cases and annotated them with where our performance could be improved a bit