Skip to content

feat: support runtime filter in shuffle join #17952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jun 4, 2025

Conversation

SkyFan2002
Copy link
Member

@SkyFan2002 SkyFan2002 commented May 17, 2025

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

Overview

This PR extends the runtime filter functionality to support shuffle join operations. Previously, runtime filters were only supported for broadcast join scenarios. This enhancement enables runtime filters to be utilized in distributed join operations where data is shuffled across nodes.

Key Changes

  1. Enhanced JoinRuntimeFilter::build_runtime_filter to support runtime filter generation in shuffle join scenarios
  2. Implemented broadcast_id generation mechanism for shuffle join build side, leveraging the channel infrastructure introduced in feat: support using fragment forest to execute additional broadcast operations. #17872
  3. Added support for merging filter packets from multiple build nodes to ensure consistent filtering across the cluster
  4. Refactored runtime filter related code to improve maintainability and extensibility

Example

Prepare table:

create or replace table test1(x int) row_per_block = 1;
create or replace table test2(x int) row_per_block = 1;
insert into test1 select * from numbers(100000);
insert into test2 select * from numbers(1000);

The EXPLAIN output shows that runtime filters are being applied in this join operation:

root@localhost:8000/tpch> explain select * from test1 join test2 on test1.x = test2.x;

explain
select
  *
from
  test1
  join test2 on test1.x = test2.x

-[ EXPLAIN ]-----------------------------------
Exchange
├── output columns: [test1.x (#0), test2.x (#1)]
├── exchange type: Merge
└── HashJoin
    ├── output columns: [test1.x (#0), test2.x (#1)]
    ├── join type: INNER
    ├── build keys: [test2.x (#1)]
    ├── probe keys: [test1.x (#0)]
    ├── keys is null equal: [false]
    ├── filters: []
    ├── build join filters:
    │   └── filter id:0, build key:test2.x (#1), probe key:test1.x (#0), filter type:inlist,min_max
    ├── estimated rows: 1000.00
    ├── Exchange(Build)
    │   ├── output columns: [test2.x (#1)]
    │   ├── exchange type: Broadcast
    │   └── TableScan
    │       ├── table: default.tpch.test2
    │       ├── output columns: [x (#1)]
    │       ├── read rows: 1000
    │       ├── read size: 35.16 KiB
    │       ├── partitions total: 1000
    │       ├── partitions scanned: 1000
    │       ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1000 to 1000>]
    │       ├── push downs: [filters: [], limit: NONE]
    │       └── estimated rows: 1000.00
    └── TableScan(Probe)
        ├── table: default.tpch.test1
        ├── output columns: [x (#0)]
        ├── read rows: 100000
        ├── read size: 3.43 MiB
        ├── partitions total: 100
        ├── partitions scanned: 100000
        ├── pruning stats: [segments: <range pruning: 100 to 100>, blocks: <range pruning: 100000 to 100000>]
        ├── push downs: [filters: [], limit: NONE]
        ├── apply join filters: [#0]
        └── estimated rows: 100000.00

37 rows explain in 0.232 sec. Processed 0 rows, 0 B (0 row/s, 0 B/s)

The log from three nodes in cluster shows that runtime filters are being applied effectively in this query:

81c9d95b-2e8d-45e6-b3fb-25578388ce98 2025-06-02T22:52:27.571807+08:00  INFO databend_common_storages_fuse::operations::read::parquet_data_transform_reader: parquet_data_transform_reader.rs:218 [RUNTIME-FILTER]ReadParquetDataTransform finished, scan_id: 0, blocks_total: 34000, blocks_pruned: 34000

81c9d95b-2e8d-45e6-b3fb-25578388ce98 2025-06-02T22:52:27.538115+08:00  INFO databend_common_storages_fuse::operations::read::parquet_data_transform_reader: parquet_data_transform_reader.rs:218 [RUNTIME-FILTER]ReadParquetDataTransform finished, scan_id: 0, blocks_total: 33000, blocks_pruned: 33000

81c9d95b-2e8d-45e6-b3fb-25578388ce98 2025-06-02T22:52:27.566167+08:00  INFO databend_common_storages_fuse::operations::read::parquet_data_transform_reader: parquet_data_transform_reader.rs:218 [RUNTIME-FILTER]ReadParquetDataTransform finished, scan_id: 0, blocks_total: 33000, blocks_pruned: 32000

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test - Explain why

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

@github-actions github-actions bot added the pr-feature this PR introduces a new feature to the codebase label May 17, 2025
@SkyFan2002 SkyFan2002 added the ci-benchmark Benchmark: run all test label May 19, 2025
Copy link
Contributor

Docker Image for PR

  • tag: pr-17952-c1292cd-1747623804

note: this image tag is only available for internal use.

@SkyFan2002 SkyFan2002 added ci-benchmark Benchmark: run all test and removed ci-benchmark Benchmark: run all test labels May 27, 2025
Copy link
Contributor

Docker Image for PR

  • tag: pr-17952-f863686-1748338293

note: this image tag is only available for internal use.

@SkyFan2002 SkyFan2002 marked this pull request as ready for review June 2, 2025 16:51
@BohuTANG BohuTANG merged commit 05232d7 into databendlabs:main Jun 4, 2025
87 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-benchmark Benchmark: run all test pr-feature this PR introduces a new feature to the codebase
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants