Skip to content

perf(join): vectorize high-fanout inner join build takes#7143

Open
RitwijParmar wants to merge 2 commits into
Eventual-Inc:mainfrom
RitwijParmar:codex/daft-fanout-aware-join-probe
Open

perf(join): vectorize high-fanout inner join build takes#7143
RitwijParmar wants to merge 2 commits into
Eventual-Inc:mainfrom
RitwijParmar:codex/daft-fanout-aware-join-probe

Conversation

@RitwijParmar

Copy link
Copy Markdown
Contributor

Summary

  • collect inner-join probe/build matches once and route high-fanout cases through build-side take batches instead of per-row GrowableRecordBatch::extend
  • preserve output order by vectorizing only consecutive runs from the same build-side record batch, with conservative fanout/run-length thresholds and a growable fallback
  • add focused high-build-side-fanout correctness coverage and a matching microbenchmark case

Fixes #7076

Tests

  • cargo fmt --check
  • cargo check -p daft-local-execution
  • git diff --check
  • Not run locally: targeted Python pytest, because this machine had only ~317 MiB free after the Rust check; the PR adds focused Python coverage for CI.

@RitwijParmar RitwijParmar requested a review from a team as a code owner June 18, 2026 22:55
@github-actions github-actions Bot added the perf label Jun 18, 2026
@greptile-apps

greptile-apps Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR optimises the inner-join probe path for high-fanout workloads by first collecting all (probe_row_idx, build_rb_idx, build_row_idx) match tuples into a single Vec, then selecting between a new vectorised take-batch path or the existing GrowableRecordBatch path based on configurable fanout and run-length thresholds.

  • The vectorised path groups consecutive matches from the same build-side record batch into runs, issues one batched take per run, and concats the results — amortising per-row dispatch overhead for high-fanout joins.
  • Correctness coverage is added via a 64-key × 32-fanout Python test (1 and 4 partitions) and a matching microbenchmark.

Confidence Score: 4/5

Safe to merge; the change is a pure performance optimisation on the hash-join probe path with a correct growable fallback, and the new Python test exercises the targeted scenario.

The vectorised path produces correct output and the output-order guarantee holds because both the build-side concat and the probe-side index extraction walk matches in the same order. Two minor gaps exist: count_build_table_runs is traversed twice on the hot path (once in the threshold check, once for capacity), and the fanout guard divides by total probe rows rather than matched probe rows, which can silently skip the optimisation in sparse-match workloads where it would actually help.

src/daft-local-execution/src/join/inner_join.rs — specifically the threshold logic in should_use_vectorized_take and the capacity pre-computation in build_side_with_vectorized_take.

Important Files Changed

Filename Overview
src/daft-local-execution/src/join/inner_join.rs Core hash-join probe path: collects all matches into a Vec then routes to vectorized-take or growable path based on fanout/run-length heuristics. Correctness looks sound; two minor efficiency gaps noted (double run-count scan, fanout threshold against total vs. matched probe rows).
tests/dataframe/test_joins.py Adds a focused correctness test for the high-build-side-fanout case (64 keys × 32 fanout), parameterized over 1 and 4 partitions; uses the existing with_default_morsel_size fixture correctly.
tests/microbenchmarks/test_join.py Adds a microbenchmark for the high-fanout inner join (1 000 keys × 32 fanout, 1 and 10 partitions) with a basic result-count assertion; follows the existing benchmark style.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[probe_inner: iterate input tables] --> B[For each input_table: collect all matches into Vec BuildMatch]
    B --> C{should_use_vectorized_take?}
    C -->|matches >= 1024 AND fanout >= 4x probe_rows AND avg run len >= 8| D[build_side_with_vectorized_take]
    C -->|otherwise| E[build_side_with_growable]
    D --> F[Group consecutive same-build-table matches into runs]
    F --> G[For each run: table.take row_idxs]
    G --> H[RecordBatch::concat taken_tables]
    E --> I[GrowableRecordBatch::extend per match]
    I --> J[GrowableRecordBatch::build]
    H --> K[Extract probe indices from matches]
    J --> K
    K --> L[input_table.take probe_indices]
    L --> M[Assemble final joined RecordBatch]
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A[probe_inner: iterate input tables] --> B[For each input_table: collect all matches into Vec BuildMatch]
    B --> C{should_use_vectorized_take?}
    C -->|matches >= 1024 AND fanout >= 4x probe_rows AND avg run len >= 8| D[build_side_with_vectorized_take]
    C -->|otherwise| E[build_side_with_growable]
    D --> F[Group consecutive same-build-table matches into runs]
    F --> G[For each run: table.take row_idxs]
    G --> H[RecordBatch::concat taken_tables]
    E --> I[GrowableRecordBatch::extend per match]
    I --> J[GrowableRecordBatch::build]
    H --> K[Extract probe indices from matches]
    J --> K
    K --> L[input_table.take probe_indices]
    L --> M[Assemble final joined RecordBatch]
Loading

Reviews (1): Last reviewed commit: "perf(join): vectorize high-fanout inner ..." | Re-trigger Greptile

return build_side_with_growable(build_side_tables, matches);
}

let mut taken_tables = Vec::with_capacity(count_build_table_runs(matches));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Redundant O(n) traversal of matches

count_build_table_runs is called here to size taken_tables, but the same traversal was already performed inside should_use_vectorized_take when deciding to take this path. For large match sets (≥1024 entries, which is when vectorized take is even attempted), this doubles the scan cost just to compute capacity. Passing the pre-computed run count through from the dispatch site would eliminate the redundancy without changing behaviour.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +104 to +115
fn should_use_vectorized_take(matches: &[BuildMatch], probe_rows: usize) -> bool {
if matches.len() < MIN_MATCHES_FOR_VECTORIZED_TAKE || probe_rows == 0 {
return false;
}

if matches.len() < probe_rows.saturating_mul(MIN_FANOUT_FOR_VECTORIZED_TAKE) {
return false;
}

let run_count = count_build_table_runs(matches);
matches.len() >= run_count.saturating_mul(MIN_AVG_RUN_LEN_FOR_VECTORIZED_TAKE)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Fanout threshold uses total probe rows, not matched probe rows

probe_rows is input_table.len(), the total number of probe-side rows in the current micro-batch, including rows that produced zero matches. In sparse-match scenarios (e.g. 1 000 probe rows, 10 of which each match 200 build rows), matches.len() is 2 000 but probe_rows * MIN_FANOUT_FOR_VECTORIZED_TAKE is 4 000, so the fanout guard short-circuits and the growable path is always taken — even though average run length easily clears MIN_AVG_RUN_LEN_FOR_VECTORIZED_TAKE. This won't produce wrong output, but the optimization won't fire in some high-fanout, low-selectivity workloads where it would help most.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

perf: hash join probe loop has iterator overhead and per-row Growable extends

1 participant