Skip to content

Conversation

@oerling
Copy link
Contributor

@oerling oerling commented Dec 29, 2025

Differential Revision: D89875337

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Dec 29, 2025
@meta-codesync
Copy link

meta-codesync bot commented Dec 29, 2025

@oerling has exported this pull request. If you are a Meta employee, you can view the originating Diff in D89875337.

Orri Erling and others added 9 commits January 14, 2026 14:48
Summary:
This diff adds support for exposing partition-level statistics from the Prism connector, enabling the optimizer to make better decisions for partitioned tables. The implementation focuses on the LocalHive connector and introduces a new `getPartitionStatistics()` API that returns detailed statistics for specific partitions and columns.

The key motivation is to provide per-partition statistics (row counts, file counts, column statistics including min/max, null percentages, distinct counts, etc.) that can be used for query optimization, particularly for pruning partitions and estimating cardinalities. Previously, only table-level statistics were available.

Main changes:
1. Added `PartitionStatistics` struct to ConnectorMetadata.h that holds per-partition statistics including numRows, numFiles, column names, and per-column statistics
2. Added `getPartitionStatistics()` virtual method to ConnectorSplitManager interface that takes a span of partition handles and column names and returns statistics for each partition
3. Enhanced `PartitionHandle` base class with `partition()` method to return partition path string in Hive format (e.g., "ds=2023-01-01/product=p1")
4. Implemented `HivePartitionHandle::makePartitionString()` to create Hive-formatted partition strings from partition key maps
5. Implemented partition filtering in `LocalHiveSplitManager::listPartitions()` to support partition pruning based on filters in the table handle - extracts filters that apply to partition columns and tests partition values against them
6. Implemented `LocalHiveSplitManager::getPartitionStatistics()` to return pre-computed statistics from LocalHivePartitionHandle objects
7. Added helper functions for converting partition values (stored as strings) to typed values and testing them against filters (testPartitionValue, makePartitionVector)
8. Enhanced `LocalHiveTableLayout::gatherFileStatistics()` to collect both data column and partition column statistics by:
   - Separating data and partition columns
   - Reading data column statistics from data files using existing file readers
   - Computing partition column statistics from partition value lists
   - Merging statistics and storing them per partition
9. Added `ColumnStatistics::toString()` method for debugging and logging
10. Changed mutex type from `std::mutex` to `std::recursive_mutex` in LocalHiveConnectorMetadata to support nested locking scenarios
11. Added `discretePredicateColumns()` override in LocalHiveTableLayout to expose partition columns for partition pruning

The implementation reads statistics from data files (Parquet, ORC, etc.) for data columns and computes statistics for partition columns by examining the partition values. All statistics are pre-computed during table metadata initialization and cached in LocalHivePartitionHandle objects, making the getPartitionStatistics() call very fast.

Differential Revision: D88694200
Summary:
This diff introduces a comprehensive filter selectivity estimation framework for the Axiom optimizer. Filter selectivity estimation is crucial for query optimization as it enables accurate cardinality estimation after filters are applied, which directly impacts join ordering, physical operator selection, and overall query plan quality.

The implementation provides sophisticated selectivity estimation for various predicate types including comparisons, ranges, IN clauses, AND/OR combinations, and null checks. The framework leverages column statistics (min/max values, cardinalities, null fractions) to compute mathematically sound selectivity estimates with proper NULL semantics handling.

Main changes:

1. **New Filters.cpp/Filters.h (1,400+ lines)**: Core selectivity estimation framework with the following key functions:
   - `exprSelectivity()`: Entry point that dispatches to appropriate selectivity estimator based on expression type
   - `conjunctsSelectivity()`: Combines selectivity of AND predicates using probability multiplication with proper NULL handling
   - `combineConjuncts()`: Computes P(TRUE) and P(NULL) for conjunctions using formula: P(TRUE) = ∏ trueFractions, P(NULL) = ∏(trueFraction + nullFraction) - P(TRUE)
   - `combineDisjuncts()`: Computes selectivity for OR predicates using: P(TRUE) = 1 - ∏(1 - trueFraction)
   - `comparisonSelectivity()`: Estimates selectivity for equality and inequality comparisons between columns
   - `columnComparisonSelectivity()`: Template-based implementation for type-specific comparison selectivity using range overlap analysis
   - `rangeSelectivity()`: Estimates selectivity for range predicates (>, <, >=, <=) by computing intersection of constraint ranges
   - `inSelectivity()`: Handles IN clause selectivity by intersecting IN lists and counting matching values
   - Support for constraint propagation through `updateConstraints` parameter

2. **Comparison selectivity algorithm**: For comparing two columns a and b with ranges [al, ah] and [bl, bh] and cardinalities ac and bc:
   - For equality (a = b): Assumes uniform distribution within ranges. Computes overlap range, estimates distinct values in overlap for each column, takes minimum as matching values, divides by (ac × bc)
   - For inequality (a < b): Computes fraction of a's range below b using continuous approximation and integral over overlap region
   - Handles NULL semantics: comparisons with NULL yield NULL, all probabilities scaled by (1 - P(either null))
   - Returns `Selectivity` struct with both `trueFraction` and `nullFraction`

3. **Range selectivity algorithm**: For predicates like `col > 100 AND col < 200`:
   - Groups multiple comparisons on same column to analyze as unified range constraint
   - Tracks lower and upper bounds, IN lists, and equality constraints
   - Computes intersection of constraint range with column's min/max range
   - For IN clauses, prunes IN list to intersection and uses list size / cardinality
   - For numeric types: selectivity = (effectiveMax - effectiveMin) / (colMax - colMin)
   - For VARCHAR: uses first character ASCII value for range approximation
   - For other types: conservative 0.1 default selectivity

4. **Constraint propagation**: When `updateConstraints=true`, equality predicates update a constraint map:
   - For `a = b`, creates intersection constraint with min(distinct_in_overlap), stores for both column IDs
   - Subsequent predicates on same columns use updated constraints for more accurate estimates
   - Enables correlation-aware selectivity estimation across multiple filters

5. **StatisticsBuilder.cpp fixes**: Critical bug fix for min/max variant creation:
   - IntegerStatisticsBuilder uses int64_t internally but must create Variants with correct TypeKind
   - Added switch statement to cast int64_t to proper type (TINYINT→int8_t, SMALLINT→int16_t, INTEGER→int32_t, BIGINT→int64_t, HUGEINT→int128_t)
   - Similar fix for DoubleStatisticsBuilder: REAL columns need float Variants, not double
   - Without this fix, comparison selectivity would fail due to type mismatches between Variants

6. **Integration changes**:
   - Moved `flattenAll()` from anonymous namespace to public in DerivedTable.cpp (needed by Filters.cpp to flatten AND/OR expressions)
   - Added helper functions in PlanUtils.h for working with expression trees
   - Updated Schema.cpp/Schema.h to support constraint tracking
   - Modified ToGraph.cpp to use filter selectivity for cardinality estimation
   - Updated RelationOp to propagate constraints through plan nodes

7. **FiltersTest.cpp (900+ lines)**: Comprehensive test suite covering:
   - Selectivity validation (trueFraction and nullFraction in valid ranges)
   - Conjunction and disjunction combining with various NULL fractions
   - Comparison selectivity for all operators (=, <, <=, >, >=) with different range configurations
   - Range selectivity with multiple predicates on same column
   - IN clause selectivity with list pruning and intersection
   - Constraint propagation through equality predicates
   - Edge cases: empty ranges, no overlap, full overlap, NULL handling
   - Type-specific tests for INTEGER, BIGINT, REAL, DOUBLE, VARCHAR, TINYINT, SMALLINT

8. **Mathematical soundness**:
   - All selectivity calculations preserve proper three-valued logic (TRUE/FALSE/NULL)
   - `validateSelectivity()` enforces: 0 ≤ trueFraction ≤ 1, 0 ≤ nullFraction ≤ 1, trueFraction + nullFraction ≤ 1
   - NULL semantics: NULL AND TRUE = NULL, NULL OR FALSE = NULL, comparisons with NULL = NULL
   - Smooth interpolation near minimum selectivity bounds to avoid optimizer instability

The motivation for this work is to enable cost-based optimization decisions that depend on accurate cardinality estimates after filter application. This is particularly critical for:
- Join ordering: choosing to filter high-selectivity predicates early
- Hash join vs. merge join selection based on filtered cardinality
- Partition count selection for hash aggregations
- Memory budget allocation for operators

The implementation is designed to be extensible for future enhancements like:
- Histogram-based selectivity estimation
- Multi-column correlation analysis
- Machine learning-based selectivity models
- Query feedback loop for selectivity correction

Differential Revision: D88164653
Summary:
This diff implements end-to-end constraint propagation through the Axiom query plan construction process. Building on the filter selectivity estimation framework (D88164653), this change tracks refined Value constraints (min/max ranges, cardinalities, null fractions) from leaf scans through joins, filters, projects, and aggregations, enabling more accurate cardinality estimates at every level of the plan.

The core motivation is to improve cost-based optimization decisions by maintaining up-to-date Value metadata as constraints are derived from predicates and joins. For example, after a filter `WHERE age > 18 AND age < 65`, the optimizer now knows the age column has min=18, max=65, and reduced cardinality, which influences downstream join cost estimates. Similarly, join equalities like `customer.id = order.customer_id` update both columns' constraints to reflect the intersection of their ranges and cardinalities.

This constraint tracking is essential for:
- Accurate join cardinality estimation (especially for multi-way joins)
- Proper null handling in outer joins (tracking which columns become nullable)
- Aggregate cardinality estimation (result size of GROUP BY)
- Project cardinality propagation (computing distinct counts through expressions)
- Filter selectivity estimation (more accurate with refined input constraints)

Main changes:

1. **PlanState constraint tracking** (Plan.h, Plan.cpp):
   - Added `ConstraintMap constraints` field to PlanState to track Value constraints by expression ID during plan construction
   - Added `exprConstraint()` function that derives constraints for expressions based on their type:
     - Literals: use the literal's min/max (already set in expr->value())
     - Columns: look up in state.constraints, fall back to expr->value()
     - Field access: propagate cardinality from base expression
     - Aggregates: set cardinality from state.cost.cardinality (group count)
     - Function calls: use functionConstraint metadata if available, otherwise max of argument cardinalities
   - Modified Plan constructor to populate Plan::constraints from state.constraints using QueryGraphContext::registerAny() for lifetime management
   - Modified NextJoin to preserve constraints across join candidates
   - Added PlanStateSaver to save/restore constraints along with cost and placed sets

2. **Join constraint propagation** (RelationOp.cpp):
   - Added `addJoinConstraint()` function to update constraints for join key pairs:
     - For inner joins: both sides become non-nullable (nullFraction=0), ranges intersect via columnComparisonSelectivity
     - For outer joins: optional side becomes nullable, nullFraction set to (1 - innerFanout)
     - Non-key columns from optional side get nullFraction = (1 - innerFanout)
   - Added `addJoinConstraints()` to apply constraint updates for all key pairs
   - Modified Join constructor to:
     - Accept new `innerFanout` parameter (fanout if this were an inner join, used for null fraction calculation)
     - Call addJoinConstraints() with join type information
     - Handle filter expressions using conjunctsSelectivity() to get filter selectivity
     - For semi-joins and anti-joins, multiply fanout by filter selectivity
     - For semi-project (mark joins), update the mark column's trueFraction
     - Propagate non-key nullable constraints for outer joins

3. **Filter constraint integration** (Filters.cpp, Filters.h):
   - Modified `value()` function to first check state.constraints before falling back to expr->value()
   - Added `addConstraint()` helper that enforces type-based cardinality limits:
     - BOOLEAN: max 2 distinct values
     - TINYINT: max 256 distinct values
     - SMALLINT: max 65,536 distinct values
   - Modified `conjunctsSelectivity()` to call `exprConstraint()` for each conjunct before computing selectivity, ensuring constraints are computed and cached
   - Changed `exprSelectivity()` and `conjunctsSelectivity()` signatures to take non-const `PlanState&` to allow updating state.constraints
   - Added `constraintsString()` debugging helper to format ConstraintMap as readable string

4. **Project constraint propagation** (RelationOp.cpp):
   - Modified Project constructor to accept PlanState parameter
   - Added loop to derive and store constraints for each output column by calling exprConstraint() on projection expressions
   - This ensures that projected expressions (e.g., `col + 1`, `upper(name)`) have accurate cardinality estimates

5. **Aggregation constraint propagation** (RelationOp.cpp):
   - Modified Aggregation constructor to accept PlanState parameter
   - Modified `setCostWithGroups()` to accept PlanState
   - Grouping keys get cardinality from the aggregation's result cardinality (number of groups)
   - Aggregate functions get cardinality from exprConstraint() evaluation

6. **VeloxHistory integration** (VeloxHistory.cpp, VeloxHistory.h):
   - Added `setBaseTableValues()` function to update BaseTable column Values from ConstraintMap
   - Modified `findLeafSelectivity()` to always:
     1. Call conjunctsSelectivity() with updateConstraints=true to get constraints
     2. Update BaseTable column Values using setBaseTableValues()
     3. Optionally sample if sampling is enabled
   - This ensures filter-derived constraints (min/max, cardinality, null fractions) are applied to base table columns before planning

7. **Optimization.cpp updates**:
   - Updated all RelationOp construction sites to pass PlanState:
     - Project: added state parameter to constructor calls
     - Aggregation: added state parameter, including planSingleAggregation()
     - Join: added innerFanout parameter and state
     - Filter: already had state
   - Modified PrecomputeProjection::maybeProject() to accept PlanState parameter
   - Updated makeDistinct() to accept PlanState
   - Updated Join::makeCrossJoin() to accept PlanState
   - Ensured PlanStateSaver preserves constraints when exploring alternative join orders

8. **QueryGraphContext lifetime management** (QueryGraphContext.h):
   - Added `registerAny()` template function to take ownership of arbitrary objects (stored as shared_ptr<void>)
   - Added `ownedObjects_` set and `mutex_` for thread-safe lifetime management
   - Used to manage ConstraintMap pointers in Plan objects, ensuring they remain valid throughout optimization

9. **Schema.h/Schema.cpp updates**:
   - Changed Value::cardinality from const to non-const to allow constraint updates
   - Added Value assignment operator that validates type equality before assigning
   - Added Value::toString() for debugging

10. **ToGraph.cpp updates**:
    - Updated all plan construction to pass PlanState to constructors
    - Modified constant deduplication to set min/max for literals to the literal value

11. **FunctionRegistry.h extension**:
    - Added optional `functionConstraint` callback to FunctionMetadata
    - Allows functions to provide custom constraint derivation logic
    - Returns `std::optional<Value>` with refined constraints for the function result

12. **Comprehensive test coverage** (ConstraintsTest.cpp, 347 lines):
    - `scanEquality`: Tests join equality constraints (n_nationkey = r_regionkey), verifies min/max ranges intersect and cardinality is limited to intersection size
    - `aggregateConstraint`: Tests grouping key cardinality propagates to all output columns
    - `projectConstraint`: Tests projected expression cardinality (col+1, col+col) inherits from source columns
    - `outer`: Tests outer join null fraction propagation to optional side (left join, right side becomes nullable with nullFraction ≈ 0.8)
    - `bitwiseAnd`: Tests custom functionConstraint for bitwise_and, verifies min=0 and max=min(arg1.max, arg2.max)

The implementation maintains the invariant that state.constraints contains the most up-to-date Value metadata for all expressions in the current plan state. When exploring alternative join orders or plan structures, PlanStateSaver ensures constraints are properly saved and restored. This enables accurate "what-if" analysis during optimization without polluting the global expression Value metadata.

The constraint propagation integrates seamlessly with the filter selectivity estimation from D88164653:
- Filters call conjunctsSelectivity() which updates state.constraints
- Joins use columnComparisonSelectivity() which updates constraints for join keys
- All downstream operators see refined constraints via value(state, expr)
- Cost estimation at each operator uses the most accurate available cardinality

Future enhancements enabled by this infrastructure:
- Constraint-based partition pruning (skip partitions outside min/max range)
- Dynamic filter pushdown (propagate join-derived constraints to scans)
- Constraint-based empty result detection (min > max indicates zero rows)
- Correlation detection (tracking when columns have matching values)

Differential Revision: D89130357
… down filters

Summary:
This diff restructures the optimizer's statistics gathering phase to fetch table statistics and join samples in parallel after the query graph is fully constructed and filter pushdown is complete. Previously, statistics were fetched sequentially during query graph construction, causing significant latency for queries with many tables, specially if going to remote metadata serers. This also removes duplicate work in cases where a table acquires filters throughout the query graph generation.

The core motivation is twofold:
1. **Performance**: Sequential statistics gathering created a bottleneck for queries with many tables. A query joining 20 tables would wait for 20 sequential stat fetches, even though most could run in parallel. This change enables concurrent statistics fetching across all tables.
2. **Accuracy**: Fetching statistics before filter pushdown meant filters weren't available to reduce the data scanned. By moving statistics collection after filter pushdown, partition pruning and filter selectivity estimation can reduce the amount of data sampled, leading to faster and more accurate statistics.

For example, with a query like:
```sql
SELECT * FROM orders o, customer c, lineitem l
WHERE o.orderkey = l.orderkey AND o.custkey = c.custkey
  AND o.orderdate >= '2024-01-01'
  AND c.nationkey = 5
```

**Before**: Fetched full table stats for orders, customer, lineitem sequentially (no filters applied), then pushed down filters, then planned joins.

**After**: Build query graph → push down filters (orderdate >= '2024-01-01', nationkey = 5) → fetch stats for filtered tables in parallel → plan joins.

This enables partition pruning for the orderdate filter before sampling, and parallel stat fetches for all three tables.

Main changes:

1. **Deferred statistics fetching** (Optimization.cpp, DerivedTable.cpp):
   - Added `statsFetched_` flag to Optimization to track whether initial statistics have been gathered
   - Added `getInitialStats()` method to fetch statistics for all tables after query graph construction
   - Modified `DerivedTable::planJoins()` to skip planning if statistics haven't been fetched yet
   - Moved `guessFanout()` calls from individual join construction to centralized `getInitialStats()`
   - Statistics are now fetched in this order: build query graph → distribute conjuncts (filter pushdown) → add implied joins → link tables to joins → **fetch all statistics in parallel** → plan joins

2. **Parallel statistics infrastructure** (VeloxHistory.cpp, VeloxHistory.h, Cost.h):
   - Added `guessFanouts()` method to History interface that takes a span of JoinEdges and processes them in parallel
   - Implementation strategy:
     - First pass: check cache for already-known fanouts
     - Second pass: identify unique (left table, right table) pairs that need sampling
     - Third pass: launch parallel sampling tasks using folly::futures::collectAll
     - Final pass: update all JoinEdges with sampled fanouts
   - Added `combineStatistics()` method to sample a subset of partitions and extrapolate table-level statistics
   - Changed History mutex from `std::mutex` to `std::recursive_mutex` to support nested locking
   - Modified `setLeafSelectivity()` to return void instead of bool (consistent async interface)

3. **Parallel table statistics collection** (Optimization.cpp):
   - Added `collectTablesRecursive()` helper to traverse DerivedTable tree and collect BaseTables and DerivedTables in dependency order
   - Added `getInitialStats()` implementation:
     - Collects all BaseTables from the query graph
     - Launches parallel `setLeafSelectivity()` calls using folly::futures::collectAll
     - After base table stats complete, processes DerivedTables (union, etc.) in dependency order
     - Finally, calls `guessFanouts()` to sample all join fanouts in parallel
   - Uses velox::AsyncSource for async execution of stat fetching tasks

4. **Thread-safe sampling context** (LocalHiveConnectorMetadata.cpp, JoinSample.cpp):
   - Added `SampleContext` struct to encapsulate executor, memory pool, and QueryCtx for sampling operations
   - Added `createSampleContext()` function to create isolated sampling contexts with their own thread pools and memory pools
   - Modified partition sampling and join sampling to create dedicated contexts instead of sharing connector-level contexts
   - This eliminates race conditions when multiple sampling operations run concurrently
   - Proper RAII cleanup: SampleContext destructor joins executor, frees QueryCtx, then frees memory pool in correct order
   - Each sampling operation now runs in its own isolated context with configurable thread count

5. **Join sampling refactoring** (JoinSample.cpp):
   - Completely rewrote `prepareSampleRunner()` to build sampling plans using PlanBuilder (logical plan) instead of RelationOp (physical plan)
   - New approach:
     - Uses logical_plan::PlanBuilder to construct: TableScan → Project(hash(keys)) → Filter(sample) → Project(hash)
     - Translates to Velox plan using Optimization::toVeloxPlan
     - Creates dedicated SampleContext with configurable thread pool size
     - Returns SampleRunner with context ownership
   - Added `collectColumnNames()` helper to recursively extract column names from join key expressions
   - Renamed `sampleJoin()` to `sampleJoinByPartitions()` and added new `sampleJoin()` wrapper
   - Improved hash mixing: handles arbitrary expressions in join keys, not just simple columns
   - Better resource management: sampling contexts are properly cleaned up after each join sample

6. **Optimizer options for sampling** (OptimizerOptions.h, SqlQueryRunner.cpp, Console.cpp):
   - Added `sampleJoins` flag to OptimizerOptions (default: false)
   - Added `sampleFilters` flag to OptimizerOptions (default: false)
   - Added command-line flags `--sample_joins` and `--sample_filters` to Console
   - RunOptions in SqlQueryRunner now propagates these flags to OptimizerOptions
   - This allows disabling sampling for testing or when historical statistics are preferred

7. **Static plan generation** (Optimization.cpp):
   - Added static `toVeloxPlan()` method that takes LogicalPlanNode, pool, options, runner options, and optional history
   - Used by join sampling to translate sampling plans without requiring full Optimization object
   - Enables standalone plan translation for utilities and testing

8. **Partition statistics interface** (ConnectorSplitManager.h, HiveConnectorMetadata.cpp):
   - Added `toString()` method to PartitionHandle base class (returns "PartitionHandle")
   - Implemented `HivePartitionHandle::toString()` to return formatted partition info with bucket number
   - Improved formatting for partition paths in debugging output
   - Added `hiveConfig()` getter to LocalHiveConnectorMetadata for accessing connector configuration

9. **DerivedTable constraint propagation** (DerivedTable.cpp):
   - Modified `planJoins()` to copy constraints from best Plan to DerivedTable output columns after planning
   - Uses `Column::setValue()` to update column Values with refined constraints
   - Enables constraint propagation across DerivedTable boundaries (important for CTEs and subqueries)

10. **Union statistics aggregation** (Optimization.cpp):
    - Added `makeUnionDistributionAndStats()` helper to combine statistics from union children
    - Sums cardinalities from all union branches
    - Aggregates column statistics (min, max, cardinality) across branches
    - Properly handles set operations (UNION, INTERSECT, EXCEPT)

11. **Error handling and robustness**:
    - Added VELOX_UNREACHABLE() after switch statement in explain handling
    - Added `(void)passingRows` to silence unused variable warning in sampling code
    - Improved move semantics for SampleContext (move constructor and assignment operator)
    - Better resource cleanup ordering in SampleContext destructor

12. **Testing and debugging**:
    - Modified FiltersTest to work with deferred statistics
    - Updated StatsAndHistory tests to handle parallel statistics fetching
    - Removed unused query config from HiveQueriesTestBase

The parallelization strategy uses velox::AsyncSource to launch all statistics gathering operations concurrently and wait for all to complete before proceeding with join planning. The level of parallelism is controlled by:
- Velox executor thread pool size for join sampling
- Individual SampleContext thread pools for partition sampling

Differential Revision: D88916977
Differential Revision: D89559598
Differential Revision: D89635456
Summary:
This diff implements merge join as a new physical join algorithm in the Axiom optimizer. Merge join is a highly efficient join method for pre-sorted inputs, avoiding the hash table construction overhead of hash joins. When both join inputs are sorted on the join keys (or can be cheaply sorted), merge join streams through both inputs in lockstep, matching rows with equal keys. The applicability is limited to tables which are already copartitioned and where the join keys contain all the bucketing keys. For Hive  partitioned   tables, both sides must have a single hive partition or must have all hive partitioning columns on both sides in the join keys.

Main changes:

1. **Added joinByMerge() method** (Optimization.cpp, ~300 lines):
   - Core merge join candidate generation logic
   - Validates merge join preconditions:
     - Checks all join keys are columns (not expressions) - merge join requires direct column comparisons
     - Verifies left input has both partitioning and ordering (from distribution)
     - Confirms ordering is ascending (merge join requires monotonic order)
     - Validates all partition columns are in join keys (ensures copartitioning)
   - Identifies merge columns from left input's orderKeys that match join keys
   - Plans right side with matching partition and ordering distributions
   - Adds shuffle and sort operators to right side if needed
   - Computes join using merge method
   - Returns NextJoin candidate for cost comparison

2. **Merge join precondition checking**:
   - **Column-only keys**: Rejects if any join key is an expression (e.g., `CAST(orderkey AS BIGINT)`)
   - **Partitioning requirement**: Left input must be partitioned (not gathered)
   - **Ordering requirement**: Left input must have orderKeys specified
   - **Ascending order**: Only kAscNullsFirst and kAscNullsLast supported
   - **Partition subset**: All partition columns must appear in join keys
   - **Matching merge columns**: At least one orderKey must match a join key

3. **Right side preparation algorithm**:
   - Constructs Distribution for right side matching left's partition and ordering
   - For each left partition column, finds corresponding right join key
   - For each left merge column (from orderKeys), finds corresponding right join key
   - Creates `forRight` distribution with:
     - Same DistributionType as left (enables copartitioning)
     - rightPartition: right keys corresponding to left partition columns
     - rightOrderKeys: right keys corresponding to left merge columns
     - rightOrderTypes: ascending order (kAscNullsLast) to match left
   - Calls `makePlan()` with forRight distribution
   - If `needsShuffle` is true, adds Repartition operator
   - Checks if right input needs sorting (orderKeys don't match expected)
   - Adds OrderBy operator if needed, after shuffle or directly

4. **Merge join cost model** (RelationOp.cpp):
   - Added `setMergeJoinCost()` method to Join class
   - Cost formula: `3 * kKeyCompareCost * numKeys * min(1, fanout) + rightSideBytes + kHashExtractColumnCost * numRightSideColumns`
   - Rationale:
     - Key comparisons: Merge join compares keys 3 times on average per match (binary search in merge)
     - Scales with number of keys and fanout (more comparisons for multiple matches)
     - Data copying: Transfers right side bytes to output
     - Column extraction: Extracts columns from right side vectors
   - Significantly cheaper than hash join for large inputs (no hash table construction)
   - Cost difference grows with build side size (hash table cost is O(n log n), merge is O(n))

5. **Integration with join planning** (Optimization.cpp):
   - Modified `makeJoins()` to call `joinByMerge()` after `joinByIndex()`
   - Added `testingUseMergeJoin` option for testing:
     - `std::nullopt` (default): Normal cost-based selection among all join types
     - `true`: Prefer merge join - return immediately if joinByMerge produces a candidate
     - `false`: Disable merge join - skip calling joinByMerge entirely
   - If testing mode is off, merge join competes with hash join based on cost
   - If testing mode is on and merge join produced a candidate, skip hash join consideration

6. **Schema changes for lookup keys** (Schema.h, Schema.cpp):
   - Added `lookupColumns` field to ColumnGroup
   - Distinguished from `orderKeys` in Distribution:
     - `lookupColumns`: Columns used for index lookups (prefix of sort order)
     - `orderKeys`: Full sort order (may include additional sorting columns)
- The key point is that sortedness does not in and of itself make a table lookup-compatible.

- Modified `addIndex()` to accept both `columns` and `lookupColumns`
   - Updated `indexLookupCardinality()` to use lookupColumns for cardinality estimation
   - Enables accurate modeling of sorted table access patterns
   - Example: Table sorted on (orderkey, linenum) can be efficiently joined on just orderkey

7. **Velox plan translation** (ToVelox.cpp, ToVelox.h):
   - Added `makeMergeJoin()` method to create MergeJoinNode
   - Checks `join.method == JoinMethod::kMerge` to dispatch to merge join creation
   - Creates `velox::core::MergeJoinNode` with:
     - Join type (INNER, LEFT, RIGHT, FULL, SEMI, ANTI)
     - Left and right keys as field references
     - Filter expression (for non-equi join conditions)
     - Left and right child plan nodes
     - Output type from join columns
   - Registers prediction and history for cost feedback
   - MergeJoinNode relies on Velox runtime's merge join operator

8. **Bucketed sorted table creation** (ParquetTpchTest.cpp):
   - Added `makeBucketedSortedTables()` utility method
   - Creates `orders_bs`, `lineitem_bs`, `partsupp_bs`, `part_bs` tables
   - Uses 32 buckets (more buckets than `_b` versions for finer parallelism)
   - Specifies `sorted_by` property in addition to `bucketed_by`
   - Example: `orders_bs` is bucketed on `o_orderkey` and sorted on `o_orderkey` within each bucket
   - Parquet files maintain sort order within partitions
   - Used for testing merge join on realistic data

9. **Plan matcher support** (PlanMatcher.cpp, PlanMatcherGenerator.cpp):
   - Added `mergeJoin()` method to PlanMatcherBuilder
   - Signature: `mergeJoin(matcher, joinType)` similar to `hashJoin()`
   - Enables test assertions like:
     ```cpp
     auto matcher = PlanMatcherBuilder()
       .tableScan("orders_bs")
       .mergeJoin(rightMatcher, JoinType::kInner)
       .build();
     ```
   - Added merge join code generation in PlanMatcherGenerator
   - Generates proper `.mergeJoin()` calls when plan contains MergeJoinNode

10. **Testing infrastructure** (OptimizerOptions.h):
    - Added `testingUseMergeJoin` optional flag
    - Three modes for comprehensive testing:
      - `nullopt`: Production mode - cost-based selection
      - `true`: Force merge join - tests merge join implementation in isolation
      - `false`: Disable merge join - tests that hash join fallback works
    - Enables differential testing: run same query with and without merge join

The merge join selection algorithm in joinByMerge():

```
1. Check preconditions:
   - All join keys are columns
   - Left input partitioned and ordered
   - Order is ascending
   - Partition columns ⊆ join keys

2. Extract merge columns:
   - For each left orderKey that matches a join key
   - Build leftMergeColumns vector

3. Plan right side:
   - Construct matching Distribution (partition + order)
   - Call makePlan() to get right input plan
   - Check if shuffle/sort needed via needsShuffle flag

4. Add shuffle/sort if needed:
   - If needsShuffle:
     - Add Repartition on rightPartition
     - Add OrderBy on rightOrderKeys
   - Else if ordering doesn't match:
     - Add OrderBy on rightOrderKeys

5. Create Join operator:
   - method = JoinMethod::kMerge
   - Compute cost using setMergeJoinCost()
   - Return as NextJoin candidate
```

Differential Revision: D89875337
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot. fb-exported meta-exported

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant