Skip to content

Add ability to set statistics in the SQLExecutor#134

Merged
trueleo merged 3 commits into
datafusion-contrib:mainfrom
nuno-faria:statistics
Jul 9, 2025
Merged

Add ability to set statistics in the SQLExecutor#134
trueleo merged 3 commits into
datafusion-contrib:mainfrom
nuno-faria:statistics

Conversation

@nuno-faria
Copy link
Copy Markdown
Collaborator

The current SQLExecutor does not allow us to provide statistics, meaning that the result sets using datafusion-federation will always have Rows=Absent, .... This can be a problem when joining large amounts of data in Datafusion, as a suboptimal plan is often selected.

This PR adds the partition_statistics function to SQLExecutor, so it can be implemented to provide more information to the planner. It also adds it to VirtualExecutionPlan and SchemaCastScanExec, so it can reach Datafusion.

By default, it simply returns Statistics::new_unknown, so existing implementations of SQLExecutor can continue to work as normal.

Comment thread datafusion-federation/src/sql/executor.rs Outdated
@trueleo
Copy link
Copy Markdown
Collaborator

trueleo commented Jul 4, 2025

The idea of providing statistics-based optimization for federation nodes seems quite valuable. I believe this could even benefit logical planning down the line similar to how DataFusion optimizes for ParquetSource.

That said, I would like to understand the use case for this. Do you have an example of how a SqlExecutor provides useful statistics during the planning phase? Perhaps this could be a hook into the SqlPlanner, where we might be able to use async context to retrieve column statistics, assuming the remote sources support that. (I don’t have a concrete example for this yet.)

@nuno-faria
Copy link
Copy Markdown
Collaborator Author

That said, I would like to understand the use case for this. Do you have an example of how a SqlExecutor provides useful statistics during the planning phase?

In my case, the main use case is efficient hash join orderings. Consider this simple example:

SELECT *
FROM t1
JOIN t2 ON t1.k = t2.k
WHERE t2.v < 10
  • t1 is a mem table in Datafusion, with 10M rows.
  • t2 is a Postgres table (using datafusion-table-providers).

When executing this query without statistics for t2, this is the physical plan after the join_selection optimization:

OutputRequirementExec
  HashJoinExec: ...
    DataSourceExec: ... <- t1
    SchemaCastScanExec
      VirtualExecutionPlan ... <- t2 where v < 10

We see that the HashJoinExec has t1 on the left and t2 on the right. Since the join always builds the hash table from the table on the left, it results in the following:

...
HashJoinExec:
    mode=Partitioned,
    join_type=Inner,
    on=[(k@0, k@0)], 
    metrics=[
        output_rows=9,
        build_input_batches=1173,
        build_input_rows=10000000, <-- the hash table will be built from the 10M rows of t1
        input_batches=6,
        input_rows=9,
        output_batches=6,
        build_mem_used=293979552,
        build_time=2.2196258s, <-- taking a combined 2.2s to build
        join_time=6.713302ms
    ]
...
time = 0.6246694s

The problem is that t1 will be way larger than the result of t2, which is filtered by t2.v < 10, returning just 9 rows. If we add statistics to the executor, we will have the following plan after the join_selection optimization:

OutputRequirementExec
  ProjectionExec: ...
    HashJoinExec: ...
      SchemaCastScanExec
        VirtualExecutionPlan ... <- t2 where v < 10
      DataSourceExec: ... <- t1

Now t2 is on the left, as the planner identified it to be the smaller one. This results in the following join operation:

...
HashJoinExec:
    mode=CollectLeft,
    join_type=Inner,
    on=[(k@0, k@0)],
    metrics=[
        output_rows=9,
        build_input_batches=1,
        build_input_rows=9, <-- smaller number of rows to build the hash table
        input_batches=1221,
        input_rows=10000000,
        output_batches=1221,
        build_mem_used=8520,
        build_time=592.007µs, <-- resulting in a faster build time
        join_time=267.117401ms
    ] 
...
time = 0.0542689s <-- the overall query time is more than 10x faster

@trueleo
Copy link
Copy Markdown
Collaborator

trueleo commented Jul 6, 2025

@nuno-faria Thanks! This now makes some more sense, although I want to point out this in your example which confirms what i was initially thinking about

The problem is that t1 will be way larger than the result of t2, which is filtered by t2.v < 10, returning just 9 rows.

Here the fact that t2.v < 10 will return 9 rows is entirely based on the table constraints you are working with. But on the similar lines one can easily imagine a query with LIMIT, from which the output rows are known before the execution.

I think this can be added but I feel like this can be implemented a bit differently. Where VirtualExecutionPlan is extended to have statistics field and that field is populated by SqlPlanner during plan_federation by calling statistics method ( which now can be async ).

This will look something like this

#[async_trait]
impl FederationPlanner for SQLFederationPlanner {
    async fn plan_federation(
        &self,
        node: &FederatedPlanNode,
        _session_state: &SessionState,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let schema = Arc::new(node.plan().schema().as_arrow().clone());
        let plan = node.plan().clone();
        let statistics = self.executor.statistics(&plan).await?;
        let input = Arc::new(VirtualExecutionPlan::new(
            plan,
            Arc::clone(&self.executor),
            statistics,
        ));
        let schema_cast_exec = schema_cast::SchemaCastScanExec::new(input, schema);
        Ok(Arc::new(schema_cast_exec))
    }
}

@hozan23

@hozan23
Copy link
Copy Markdown
Collaborator

hozan23 commented Jul 7, 2025

Thanks @trueleo, I agree with your suggestion. Fetching the statistics before constructing the VirtualExecutionPlan makes sense

@nuno-faria
Copy link
Copy Markdown
Collaborator Author

@trueleo @hozan23 I've updated the code to get the statistics at plan time.

@trueleo trueleo merged commit 78a6521 into datafusion-contrib:main Jul 9, 2025
7 checks passed
@github-actions github-actions Bot mentioned this pull request Jul 9, 2025
@nuno-faria nuno-faria deleted the statistics branch July 9, 2025 14:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants