Skip to content

feat: Refactor NLJ into an extensible framework for specialized joins#21983

Open
2010YOUY01 wants to merge 1 commit intoapache:mainfrom
2010YOUY01:join-accelerator-refactor
Open

feat: Refactor NLJ into an extensible framework for specialized joins#21983
2010YOUY01 wants to merge 1 commit intoapache:mainfrom
2010YOUY01:join-accelerator-refactor

Conversation

@2010YOUY01
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

See issue for details.

The high-level idea is to make it easier to add specialized joins that can be accelerated with runtime indexes or dynamic filters.

With this refactor, future accelerated joins only need to implement the JoinAccelerator trait to define how to build and probe a runtime join index for target join conditions. The join state machine, outer join state tracking, and residual join predicate handling can then be handled reliably by common driver code in NLJ.

What changes are included in this PR?

  • Introducing JoinAccelerator API for runtime index, the dynamic filter part is planned to be added in a follow-up PR
  • Refactor NLJ with JoinAccelerator

Are these changes tested?

Existing tests

Are there any user-facing changes?

No

@github-actions github-actions Bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels May 2, 2026

/// Add one build-side input batch to the buffer, and optionally build a
/// runtime index.
fn add_build_batch(&mut self, batch: RecordBatch) -> Result<()>;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To extend dynamic filter: a common pattern for it is calculating bounds from the build side, and build an expression to filter the probe side.
The bound calculation step can be included inside this API naturally, and we only have to extend an API for returning the dynamic filter, in the follow-up work.

/// The default implementation is the nested-loop fallback: it returns the
/// probe batch unchanged and emits one buffered build-side row at a time.
/// The caller joins that row against the current probe batch.
fn init_prober(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function creates an iterator on candidates for ALL_BUILD_BATCHES x CURRENT_PROBE_BATCH

An easier alternative can be ALL_BUILD_BATCHES x CURRENT_PROBE_ROW for common indexed joins, however I think processing a probe batch each step is more flexible. For example, probe side may need certain batched pre-processing on the probe batch. So I prefer this design for now.

/// This materializes the candidate pairs, evaluates any remaining join
/// filter, updates match bitmaps for joins that need unmatched rows, and
/// produces the final joined batch.
fn process_probe_candidates(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a major place that is needed to update when the first indexed join implementation is added.

Now the BuildRow variant is a specialized implementation that is the most efficient for NLJ cartesian join fallback, for indexed join it needs a more general representation like (build_index, probe_index).

@2010YOUY01
Copy link
Copy Markdown
Contributor Author

NLJ microbenchmark with ./bench.sh run nlj in benchmarks/

Difference should be local noise -- the execution flow has almost no change.

┏━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃       main ┃ join-accelerator ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1  │   86.79 ms │         88.24 ms │     no change │
│ QQuery 2  │  106.46 ms │        107.17 ms │     no change │
│ QQuery 3  │  158.76 ms │        156.66 ms │     no change │
│ QQuery 4  │  321.95 ms │        338.41 ms │  1.05x slower │
│ QQuery 5  │  242.35 ms │        235.31 ms │     no change │
│ QQuery 6  │ 1684.31 ms │       1698.49 ms │     no change │
│ QQuery 7  │  238.59 ms │        235.03 ms │     no change │
│ QQuery 8  │ 1688.57 ms │       1710.65 ms │     no change │
│ QQuery 9  │  277.63 ms │        272.64 ms │     no change │
│ QQuery 10 │  481.27 ms │        486.05 ms │     no change │
│ QQuery 11 │  217.54 ms │        213.55 ms │     no change │
│ QQuery 12 │  220.03 ms │        216.65 ms │     no change │
│ QQuery 13 │   77.50 ms │         76.21 ms │     no change │
│ QQuery 14 │   76.57 ms │         75.62 ms │     no change │
│ QQuery 15 │   79.10 ms │         77.63 ms │     no change │
│ QQuery 16 │   79.50 ms │         77.09 ms │     no change │
│ QQuery 17 │   81.12 ms │         75.51 ms │ +1.07x faster │
└───────────┴────────────┴──────────────────┴───────────────┘

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant