-
Notifications
You must be signed in to change notification settings - Fork 57
Support copartitioned execution with hive bucketing #714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
oerling
wants to merge
6
commits into
facebookincubator:main
Choose a base branch
from
oerling:export-D89496393
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
oerling
added a commit
to oerling/verax
that referenced
this pull request
Dec 19, 2025
…r#714) Summary: Pull Request resolved: facebookincubator#714 Differential Revision: D89496393
35467c8 to
fa60c6c
Compare
Summary: This diff adds support for exposing partition-level statistics from the Prism connector, enabling the optimizer to make better decisions for partitioned tables. The implementation focuses on the LocalHive connector and introduces a new `getPartitionStatistics()` API that returns detailed statistics for specific partitions and columns. The key motivation is to provide per-partition statistics (row counts, file counts, column statistics including min/max, null percentages, distinct counts, etc.) that can be used for query optimization, particularly for pruning partitions and estimating cardinalities. Previously, only table-level statistics were available. Main changes: 1. Added `PartitionStatistics` struct to ConnectorMetadata.h that holds per-partition statistics including numRows, numFiles, column names, and per-column statistics 2. Added `getPartitionStatistics()` virtual method to ConnectorSplitManager interface that takes a span of partition handles and column names and returns statistics for each partition 3. Enhanced `PartitionHandle` base class with `partition()` method to return partition path string in Hive format (e.g., "ds=2023-01-01/product=p1") 4. Implemented `HivePartitionHandle::makePartitionString()` to create Hive-formatted partition strings from partition key maps 5. Implemented partition filtering in `LocalHiveSplitManager::listPartitions()` to support partition pruning based on filters in the table handle - extracts filters that apply to partition columns and tests partition values against them 6. Implemented `LocalHiveSplitManager::getPartitionStatistics()` to return pre-computed statistics from LocalHivePartitionHandle objects 7. Added helper functions for converting partition values (stored as strings) to typed values and testing them against filters (testPartitionValue, makePartitionVector) 8. Enhanced `LocalHiveTableLayout::gatherFileStatistics()` to collect both data column and partition column statistics by: - Separating data and partition columns - Reading data column statistics from data files using existing file readers - Computing partition column statistics from partition value lists - Merging statistics and storing them per partition 9. Added `ColumnStatistics::toString()` method for debugging and logging 10. Changed mutex type from `std::mutex` to `std::recursive_mutex` in LocalHiveConnectorMetadata to support nested locking scenarios 11. Added `discretePredicateColumns()` override in LocalHiveTableLayout to expose partition columns for partition pruning The implementation reads statistics from data files (Parquet, ORC, etc.) for data columns and computes statistics for partition columns by examining the partition values. All statistics are pre-computed during table metadata initialization and cached in LocalHivePartitionHandle objects, making the getPartitionStatistics() call very fast. Differential Revision: D88694200
Summary: This diff introduces a comprehensive filter selectivity estimation framework for the Axiom optimizer. Filter selectivity estimation is crucial for query optimization as it enables accurate cardinality estimation after filters are applied, which directly impacts join ordering, physical operator selection, and overall query plan quality. The implementation provides sophisticated selectivity estimation for various predicate types including comparisons, ranges, IN clauses, AND/OR combinations, and null checks. The framework leverages column statistics (min/max values, cardinalities, null fractions) to compute mathematically sound selectivity estimates with proper NULL semantics handling. Main changes: 1. **New Filters.cpp/Filters.h (1,400+ lines)**: Core selectivity estimation framework with the following key functions: - `exprSelectivity()`: Entry point that dispatches to appropriate selectivity estimator based on expression type - `conjunctsSelectivity()`: Combines selectivity of AND predicates using probability multiplication with proper NULL handling - `combineConjuncts()`: Computes P(TRUE) and P(NULL) for conjunctions using formula: P(TRUE) = ∏ trueFractions, P(NULL) = ∏(trueFraction + nullFraction) - P(TRUE) - `combineDisjuncts()`: Computes selectivity for OR predicates using: P(TRUE) = 1 - ∏(1 - trueFraction) - `comparisonSelectivity()`: Estimates selectivity for equality and inequality comparisons between columns - `columnComparisonSelectivity()`: Template-based implementation for type-specific comparison selectivity using range overlap analysis - `rangeSelectivity()`: Estimates selectivity for range predicates (>, <, >=, <=) by computing intersection of constraint ranges - `inSelectivity()`: Handles IN clause selectivity by intersecting IN lists and counting matching values - Support for constraint propagation through `updateConstraints` parameter 2. **Comparison selectivity algorithm**: For comparing two columns a and b with ranges [al, ah] and [bl, bh] and cardinalities ac and bc: - For equality (a = b): Assumes uniform distribution within ranges. Computes overlap range, estimates distinct values in overlap for each column, takes minimum as matching values, divides by (ac × bc) - For inequality (a < b): Computes fraction of a's range below b using continuous approximation and integral over overlap region - Handles NULL semantics: comparisons with NULL yield NULL, all probabilities scaled by (1 - P(either null)) - Returns `Selectivity` struct with both `trueFraction` and `nullFraction` 3. **Range selectivity algorithm**: For predicates like `col > 100 AND col < 200`: - Groups multiple comparisons on same column to analyze as unified range constraint - Tracks lower and upper bounds, IN lists, and equality constraints - Computes intersection of constraint range with column's min/max range - For IN clauses, prunes IN list to intersection and uses list size / cardinality - For numeric types: selectivity = (effectiveMax - effectiveMin) / (colMax - colMin) - For VARCHAR: uses first character ASCII value for range approximation - For other types: conservative 0.1 default selectivity 4. **Constraint propagation**: When `updateConstraints=true`, equality predicates update a constraint map: - For `a = b`, creates intersection constraint with min(distinct_in_overlap), stores for both column IDs - Subsequent predicates on same columns use updated constraints for more accurate estimates - Enables correlation-aware selectivity estimation across multiple filters 5. **StatisticsBuilder.cpp fixes**: Critical bug fix for min/max variant creation: - IntegerStatisticsBuilder uses int64_t internally but must create Variants with correct TypeKind - Added switch statement to cast int64_t to proper type (TINYINT→int8_t, SMALLINT→int16_t, INTEGER→int32_t, BIGINT→int64_t, HUGEINT→int128_t) - Similar fix for DoubleStatisticsBuilder: REAL columns need float Variants, not double - Without this fix, comparison selectivity would fail due to type mismatches between Variants 6. **Integration changes**: - Moved `flattenAll()` from anonymous namespace to public in DerivedTable.cpp (needed by Filters.cpp to flatten AND/OR expressions) - Added helper functions in PlanUtils.h for working with expression trees - Updated Schema.cpp/Schema.h to support constraint tracking - Modified ToGraph.cpp to use filter selectivity for cardinality estimation - Updated RelationOp to propagate constraints through plan nodes 7. **FiltersTest.cpp (900+ lines)**: Comprehensive test suite covering: - Selectivity validation (trueFraction and nullFraction in valid ranges) - Conjunction and disjunction combining with various NULL fractions - Comparison selectivity for all operators (=, <, <=, >, >=) with different range configurations - Range selectivity with multiple predicates on same column - IN clause selectivity with list pruning and intersection - Constraint propagation through equality predicates - Edge cases: empty ranges, no overlap, full overlap, NULL handling - Type-specific tests for INTEGER, BIGINT, REAL, DOUBLE, VARCHAR, TINYINT, SMALLINT 8. **Mathematical soundness**: - All selectivity calculations preserve proper three-valued logic (TRUE/FALSE/NULL) - `validateSelectivity()` enforces: 0 ≤ trueFraction ≤ 1, 0 ≤ nullFraction ≤ 1, trueFraction + nullFraction ≤ 1 - NULL semantics: NULL AND TRUE = NULL, NULL OR FALSE = NULL, comparisons with NULL = NULL - Smooth interpolation near minimum selectivity bounds to avoid optimizer instability The motivation for this work is to enable cost-based optimization decisions that depend on accurate cardinality estimates after filter application. This is particularly critical for: - Join ordering: choosing to filter high-selectivity predicates early - Hash join vs. merge join selection based on filtered cardinality - Partition count selection for hash aggregations - Memory budget allocation for operators The implementation is designed to be extensible for future enhancements like: - Histogram-based selectivity estimation - Multi-column correlation analysis - Machine learning-based selectivity models - Query feedback loop for selectivity correction Differential Revision: D88164653
Summary:
This diff implements end-to-end constraint propagation through the Axiom query plan construction process. Building on the filter selectivity estimation framework (D88164653), this change tracks refined Value constraints (min/max ranges, cardinalities, null fractions) from leaf scans through joins, filters, projects, and aggregations, enabling more accurate cardinality estimates at every level of the plan.
The core motivation is to improve cost-based optimization decisions by maintaining up-to-date Value metadata as constraints are derived from predicates and joins. For example, after a filter `WHERE age > 18 AND age < 65`, the optimizer now knows the age column has min=18, max=65, and reduced cardinality, which influences downstream join cost estimates. Similarly, join equalities like `customer.id = order.customer_id` update both columns' constraints to reflect the intersection of their ranges and cardinalities.
This constraint tracking is essential for:
- Accurate join cardinality estimation (especially for multi-way joins)
- Proper null handling in outer joins (tracking which columns become nullable)
- Aggregate cardinality estimation (result size of GROUP BY)
- Project cardinality propagation (computing distinct counts through expressions)
- Filter selectivity estimation (more accurate with refined input constraints)
Main changes:
1. **PlanState constraint tracking** (Plan.h, Plan.cpp):
- Added `ConstraintMap constraints` field to PlanState to track Value constraints by expression ID during plan construction
- Added `exprConstraint()` function that derives constraints for expressions based on their type:
- Literals: use the literal's min/max (already set in expr->value())
- Columns: look up in state.constraints, fall back to expr->value()
- Field access: propagate cardinality from base expression
- Aggregates: set cardinality from state.cost.cardinality (group count)
- Function calls: use functionConstraint metadata if available, otherwise max of argument cardinalities
- Modified Plan constructor to populate Plan::constraints from state.constraints using QueryGraphContext::registerAny() for lifetime management
- Modified NextJoin to preserve constraints across join candidates
- Added PlanStateSaver to save/restore constraints along with cost and placed sets
2. **Join constraint propagation** (RelationOp.cpp):
- Added `addJoinConstraint()` function to update constraints for join key pairs:
- For inner joins: both sides become non-nullable (nullFraction=0), ranges intersect via columnComparisonSelectivity
- For outer joins: optional side becomes nullable, nullFraction set to (1 - innerFanout)
- Non-key columns from optional side get nullFraction = (1 - innerFanout)
- Added `addJoinConstraints()` to apply constraint updates for all key pairs
- Modified Join constructor to:
- Accept new `innerFanout` parameter (fanout if this were an inner join, used for null fraction calculation)
- Call addJoinConstraints() with join type information
- Handle filter expressions using conjunctsSelectivity() to get filter selectivity
- For semi-joins and anti-joins, multiply fanout by filter selectivity
- For semi-project (mark joins), update the mark column's trueFraction
- Propagate non-key nullable constraints for outer joins
3. **Filter constraint integration** (Filters.cpp, Filters.h):
- Modified `value()` function to first check state.constraints before falling back to expr->value()
- Added `addConstraint()` helper that enforces type-based cardinality limits:
- BOOLEAN: max 2 distinct values
- TINYINT: max 256 distinct values
- SMALLINT: max 65,536 distinct values
- Modified `conjunctsSelectivity()` to call `exprConstraint()` for each conjunct before computing selectivity, ensuring constraints are computed and cached
- Changed `exprSelectivity()` and `conjunctsSelectivity()` signatures to take non-const `PlanState&` to allow updating state.constraints
- Added `constraintsString()` debugging helper to format ConstraintMap as readable string
4. **Project constraint propagation** (RelationOp.cpp):
- Modified Project constructor to accept PlanState parameter
- Added loop to derive and store constraints for each output column by calling exprConstraint() on projection expressions
- This ensures that projected expressions (e.g., `col + 1`, `upper(name)`) have accurate cardinality estimates
5. **Aggregation constraint propagation** (RelationOp.cpp):
- Modified Aggregation constructor to accept PlanState parameter
- Modified `setCostWithGroups()` to accept PlanState
- Grouping keys get cardinality from the aggregation's result cardinality (number of groups)
- Aggregate functions get cardinality from exprConstraint() evaluation
6. **VeloxHistory integration** (VeloxHistory.cpp, VeloxHistory.h):
- Added `setBaseTableValues()` function to update BaseTable column Values from ConstraintMap
- Modified `findLeafSelectivity()` to always:
1. Call conjunctsSelectivity() with updateConstraints=true to get constraints
2. Update BaseTable column Values using setBaseTableValues()
3. Optionally sample if sampling is enabled
- This ensures filter-derived constraints (min/max, cardinality, null fractions) are applied to base table columns before planning
7. **Optimization.cpp updates**:
- Updated all RelationOp construction sites to pass PlanState:
- Project: added state parameter to constructor calls
- Aggregation: added state parameter, including planSingleAggregation()
- Join: added innerFanout parameter and state
- Filter: already had state
- Modified PrecomputeProjection::maybeProject() to accept PlanState parameter
- Updated makeDistinct() to accept PlanState
- Updated Join::makeCrossJoin() to accept PlanState
- Ensured PlanStateSaver preserves constraints when exploring alternative join orders
8. **QueryGraphContext lifetime management** (QueryGraphContext.h):
- Added `registerAny()` template function to take ownership of arbitrary objects (stored as shared_ptr<void>)
- Added `ownedObjects_` set and `mutex_` for thread-safe lifetime management
- Used to manage ConstraintMap pointers in Plan objects, ensuring they remain valid throughout optimization
9. **Schema.h/Schema.cpp updates**:
- Changed Value::cardinality from const to non-const to allow constraint updates
- Added Value assignment operator that validates type equality before assigning
- Added Value::toString() for debugging
10. **ToGraph.cpp updates**:
- Updated all plan construction to pass PlanState to constructors
- Modified constant deduplication to set min/max for literals to the literal value
11. **FunctionRegistry.h extension**:
- Added optional `functionConstraint` callback to FunctionMetadata
- Allows functions to provide custom constraint derivation logic
- Returns `std::optional<Value>` with refined constraints for the function result
12. **Comprehensive test coverage** (ConstraintsTest.cpp, 347 lines):
- `scanEquality`: Tests join equality constraints (n_nationkey = r_regionkey), verifies min/max ranges intersect and cardinality is limited to intersection size
- `aggregateConstraint`: Tests grouping key cardinality propagates to all output columns
- `projectConstraint`: Tests projected expression cardinality (col+1, col+col) inherits from source columns
- `outer`: Tests outer join null fraction propagation to optional side (left join, right side becomes nullable with nullFraction ≈ 0.8)
- `bitwiseAnd`: Tests custom functionConstraint for bitwise_and, verifies min=0 and max=min(arg1.max, arg2.max)
The implementation maintains the invariant that state.constraints contains the most up-to-date Value metadata for all expressions in the current plan state. When exploring alternative join orders or plan structures, PlanStateSaver ensures constraints are properly saved and restored. This enables accurate "what-if" analysis during optimization without polluting the global expression Value metadata.
The constraint propagation integrates seamlessly with the filter selectivity estimation from D88164653:
- Filters call conjunctsSelectivity() which updates state.constraints
- Joins use columnComparisonSelectivity() which updates constraints for join keys
- All downstream operators see refined constraints via value(state, expr)
- Cost estimation at each operator uses the most accurate available cardinality
Future enhancements enabled by this infrastructure:
- Constraint-based partition pruning (skip partitions outside min/max range)
- Dynamic filter pushdown (propagate join-derived constraints to scans)
- Constraint-based empty result detection (min > max indicates zero rows)
- Correlation detection (tracking when columns have matching values)
Differential Revision: D89130357
… down filters
Summary:
This diff restructures the optimizer's statistics gathering phase to fetch table statistics and join samples in parallel after the query graph is fully constructed and filter pushdown is complete. Previously, statistics were fetched sequentially during query graph construction, causing significant latency for queries with many tables, specially if going to remote metadata serers. This also removes duplicate work in cases where a table acquires filters throughout the query graph generation.
The core motivation is twofold:
1. **Performance**: Sequential statistics gathering created a bottleneck for queries with many tables. A query joining 20 tables would wait for 20 sequential stat fetches, even though most could run in parallel. This change enables concurrent statistics fetching across all tables.
2. **Accuracy**: Fetching statistics before filter pushdown meant filters weren't available to reduce the data scanned. By moving statistics collection after filter pushdown, partition pruning and filter selectivity estimation can reduce the amount of data sampled, leading to faster and more accurate statistics.
For example, with a query like:
```sql
SELECT * FROM orders o, customer c, lineitem l
WHERE o.orderkey = l.orderkey AND o.custkey = c.custkey
AND o.orderdate >= '2024-01-01'
AND c.nationkey = 5
```
**Before**: Fetched full table stats for orders, customer, lineitem sequentially (no filters applied), then pushed down filters, then planned joins.
**After**: Build query graph → push down filters (orderdate >= '2024-01-01', nationkey = 5) → fetch stats for filtered tables in parallel → plan joins.
This enables partition pruning for the orderdate filter before sampling, and parallel stat fetches for all three tables.
Main changes:
1. **Deferred statistics fetching** (Optimization.cpp, DerivedTable.cpp):
- Added `statsFetched_` flag to Optimization to track whether initial statistics have been gathered
- Added `getInitialStats()` method to fetch statistics for all tables after query graph construction
- Modified `DerivedTable::planJoins()` to skip planning if statistics haven't been fetched yet
- Moved `guessFanout()` calls from individual join construction to centralized `getInitialStats()`
- Statistics are now fetched in this order: build query graph → distribute conjuncts (filter pushdown) → add implied joins → link tables to joins → **fetch all statistics in parallel** → plan joins
2. **Parallel statistics infrastructure** (VeloxHistory.cpp, VeloxHistory.h, Cost.h):
- Added `guessFanouts()` method to History interface that takes a span of JoinEdges and processes them in parallel
- Implementation strategy:
- First pass: check cache for already-known fanouts
- Second pass: identify unique (left table, right table) pairs that need sampling
- Third pass: launch parallel sampling tasks using folly::futures::collectAll
- Final pass: update all JoinEdges with sampled fanouts
- Added `combineStatistics()` method to sample a subset of partitions and extrapolate table-level statistics
- Changed History mutex from `std::mutex` to `std::recursive_mutex` to support nested locking
- Modified `setLeafSelectivity()` to return void instead of bool (consistent async interface)
3. **Parallel table statistics collection** (Optimization.cpp):
- Added `collectTablesRecursive()` helper to traverse DerivedTable tree and collect BaseTables and DerivedTables in dependency order
- Added `getInitialStats()` implementation:
- Collects all BaseTables from the query graph
- Launches parallel `setLeafSelectivity()` calls using folly::futures::collectAll
- After base table stats complete, processes DerivedTables (union, etc.) in dependency order
- Finally, calls `guessFanouts()` to sample all join fanouts in parallel
- Uses velox::AsyncSource for async execution of stat fetching tasks
4. **Thread-safe sampling context** (LocalHiveConnectorMetadata.cpp, JoinSample.cpp):
- Added `SampleContext` struct to encapsulate executor, memory pool, and QueryCtx for sampling operations
- Added `createSampleContext()` function to create isolated sampling contexts with their own thread pools and memory pools
- Modified partition sampling and join sampling to create dedicated contexts instead of sharing connector-level contexts
- This eliminates race conditions when multiple sampling operations run concurrently
- Proper RAII cleanup: SampleContext destructor joins executor, frees QueryCtx, then frees memory pool in correct order
- Each sampling operation now runs in its own isolated context with configurable thread count
5. **Join sampling refactoring** (JoinSample.cpp):
- Completely rewrote `prepareSampleRunner()` to build sampling plans using PlanBuilder (logical plan) instead of RelationOp (physical plan)
- New approach:
- Uses logical_plan::PlanBuilder to construct: TableScan → Project(hash(keys)) → Filter(sample) → Project(hash)
- Translates to Velox plan using Optimization::toVeloxPlan
- Creates dedicated SampleContext with configurable thread pool size
- Returns SampleRunner with context ownership
- Added `collectColumnNames()` helper to recursively extract column names from join key expressions
- Renamed `sampleJoin()` to `sampleJoinByPartitions()` and added new `sampleJoin()` wrapper
- Improved hash mixing: handles arbitrary expressions in join keys, not just simple columns
- Better resource management: sampling contexts are properly cleaned up after each join sample
6. **Optimizer options for sampling** (OptimizerOptions.h, SqlQueryRunner.cpp, Console.cpp):
- Added `sampleJoins` flag to OptimizerOptions (default: false)
- Added `sampleFilters` flag to OptimizerOptions (default: false)
- Added command-line flags `--sample_joins` and `--sample_filters` to Console
- RunOptions in SqlQueryRunner now propagates these flags to OptimizerOptions
- This allows disabling sampling for testing or when historical statistics are preferred
7. **Static plan generation** (Optimization.cpp):
- Added static `toVeloxPlan()` method that takes LogicalPlanNode, pool, options, runner options, and optional history
- Used by join sampling to translate sampling plans without requiring full Optimization object
- Enables standalone plan translation for utilities and testing
8. **Partition statistics interface** (ConnectorSplitManager.h, HiveConnectorMetadata.cpp):
- Added `toString()` method to PartitionHandle base class (returns "PartitionHandle")
- Implemented `HivePartitionHandle::toString()` to return formatted partition info with bucket number
- Improved formatting for partition paths in debugging output
- Added `hiveConfig()` getter to LocalHiveConnectorMetadata for accessing connector configuration
9. **DerivedTable constraint propagation** (DerivedTable.cpp):
- Modified `planJoins()` to copy constraints from best Plan to DerivedTable output columns after planning
- Uses `Column::setValue()` to update column Values with refined constraints
- Enables constraint propagation across DerivedTable boundaries (important for CTEs and subqueries)
10. **Union statistics aggregation** (Optimization.cpp):
- Added `makeUnionDistributionAndStats()` helper to combine statistics from union children
- Sums cardinalities from all union branches
- Aggregates column statistics (min, max, cardinality) across branches
- Properly handles set operations (UNION, INTERSECT, EXCEPT)
11. **Error handling and robustness**:
- Added VELOX_UNREACHABLE() after switch statement in explain handling
- Added `(void)passingRows` to silence unused variable warning in sampling code
- Improved move semantics for SampleContext (move constructor and assignment operator)
- Better resource cleanup ordering in SampleContext destructor
12. **Testing and debugging**:
- Modified FiltersTest to work with deferred statistics
- Updated StatsAndHistory tests to handle parallel statistics fetching
- Removed unused query config from HiveQueriesTestBase
The parallelization strategy uses velox::AsyncSource to launch all statistics gathering operations concurrently and wait for all to complete before proceeding with join planning. The level of parallelism is controlled by:
- Velox executor thread pool size for join sampling
- Individual SampleContext thread pools for partition sampling
Differential Revision: D88916977
Differential Revision: D89559598
Summary:
This diff implements copartitioned execution for bucketed Hive tables, enabling the optimizer to avoid unnecessary shuffle operations when joining or aggregating tables that are already partitioned compatibly. When tables are bucketed on the same key (or compatible keys), the optimizer can now execute joins and aggregations without repartitioning the data, significantly reducing query execution time and network overhead.
The core motivation is performance optimization for data warehouse workloads where tables are pre-bucketed on join keys. For example:
- `orders` bucketed on `o_orderkey` (8 buckets)
- `lineitem` bucketed on `l_orderkey` (8 buckets)
- Query: `SELECT ... FROM orders JOIN lineitem ON o_orderkey = l_orderkey`
**Before this change**: The optimizer would shuffle both tables to align partitions, even though they're already copartitioned.
**After this change**: The optimizer recognizes the compatible bucketing and executes the join without shuffling, processing bucket 0 from both tables together, bucket 1 together, etc.
This optimization is particularly valuable for:
- Multi-way joins on bucketed tables (e.g., TPC-H queries with 5+ table joins)
- Aggregations after joins (can skip repartitioning if already bucketed on group keys)
- Write operations to bucketed tables (can write directly to target buckets)
- Scenarios with different bucket counts where one divides the other evenly
Main changes:
1. **PartitionType copartitioning support** (HiveConnectorMetadata.h):
- Added `numPartitions()` getter to HivePartitionType to expose bucket count
- The `copartition()` method was already present but now properly used by optimizer
- HivePartitionType::copartition() returns non-null PartitionType pointer if two partition types are compatible (same bucket count and compatible bucket key types)
2. **DistributionType compatibility checking** (Schema.h, Schema.cpp):
- Replaced equality operator with `isCopartitionCompatible()` method
- Returns true if both DistributionTypes have PartitionTypes that can copartition
- Handles null PartitionType case (both null = compatible)
- Enables comparing distributions with different PartitionType instances that are semantically compatible
3. **Distribution matching with broadcast handling** (Schema.cpp):
- Fixed `isSamePartition()` logic to properly handle broadcast distributions
- Both sides broadcast = colocated (no shuffle needed)
- One side broadcast, other side partitioned = not colocated (shuffle needed)
- Previously incorrectly treated one-sided broadcast as colocated
4. **Aggregation repartitioning optimization** (Optimization.cpp):
- Inverted shuffle decision logic: instead of "shuffle if any group key is missing from partition", now "skip shuffle if all partition columns are present in group keys"
- Algorithm:
- If input has no partition columns, must shuffle
- If input has partition columns, check if every partition column equals some grouping key
- If yes, input is already partitioned correctly for the aggregation
- Example: input partitioned on [a, b], grouping on [a, b, c] → no shuffle needed
- Example: input partitioned on [a, b], grouping on [a, c] → shuffle needed (b is missing)
5. **Join copartitioning detection** (Optimization.cpp):
- Added `joinCopartition()` helper to extract join keys corresponding to partition columns
- Takes partition column indices and join side, returns corresponding join key expressions
- Returns empty vector if any join key is not a simple column reference (conservatively requires shuffle)
- Used by both `joinByHash()` and `joinRightByHash()` to determine if join can skip shuffle
6. **Hash join optimization** (Optimization.cpp):
- Modified `joinByHash()` to detect when build side has compatible partitioning
- Calls `joinKeyPartition()` to find which join keys align with build's partition columns
- If partition keys found, calls `joinCopartition()` to get the actual join key expressions
- Sets `copartition` variable to guide repartitioning decisions
- If copartitioned, skips shuffle of build side
- Probe side still shuffled if needed to align with build
7. **Right join optimization** (Optimization.cpp):
- Modified `joinRightByHash()` to detect when probe side has compatible partitioning
- Similar logic to `joinByHash()` but for right joins where probe and build are swapped
- Creates `forProbe` distribution that preserves copartitioning if detected
- Changed memo key lookup to use `forProbe` distribution instead of empty partition list
- If copartitioned, only shuffles probe side if `needsShuffle` flag is set by memo lookup
- Otherwise, both sides shuffled to align
8. **Partition count compatibility** (Optimization.cpp):
- Added check that partition counts match between layout and plan
- If `plan->distribution().partition.size() != keyValues.size()`, must shuffle
- Prevents incorrect copartitioning when number of partition columns differs
- Example: table has 2 partition columns, join has 1 key → shuffle required
9. **PlanStateSaver exprToColumn preservation** (Plan.h):
- Added `exprToColumn_` field to PlanStateSaver
- Saves and restores `state.exprToColumn` map along with other state
- Critical for PrecomputeProjection to work correctly when exploring alternative join orders
- Without this, projected expressions could be lost when backtracking
10. **Type parsing improvements** (LocalHiveConnectorMetadata.cpp):
- Enhanced `parseSchema()` to support both Presto type format and Hive type format
- Tries Presto parser first (uses `Type::toString()` format like `array(bigint)`)
- Falls back to Hive parser for backward compatibility (uses format like `array<bigint>`)
- Provides detailed error message showing both parser failures if both fail
- Changed table schema serialization to use `Type::toString()` instead of Hive serializer
- Ensures round-trip consistency: serialize with `toString()`, parse with Presto parser
- Maintains backward compatibility with existing Hive-formatted schemas
11. **LocalRunner bucket-aware scheduling** (LocalRunner.cpp, LocalRunner.h):
- Added bucket-aware task assignment for copartitioned execution
- Algorithm:
- Examines table scan nodes to detect bucket count
- If all scans have same bucket count, assigns tasks to respect bucketing
- Task N processes bucket (N % bucketCount) from all tables
- Example: 8 buckets, 16 tasks → tasks 0 and 8 both process bucket 0
- Added `numBuckets()` helper to extract bucket count from HivePartitionType
- Added logic to assign bucket numbers to table scan splits
- Ensures copartitioned tables are read from matching buckets in each task
- Handles write operations: assigns bucket numbers to TableWrite operators
12. **Comprehensive test suite** (CopartitionTest.cpp, 245 lines):
- `join`: Tests copartitioned join between `lineitem_b` and `orders_b` (both 8 buckets on orderkey)
- Verifies no shuffle operators in plan.
- Validates result correctness (150,000 rows)
- `group`: Tests multi-way join with aggregations on bucketed tables
- Joins two aggregation results, both bucketed on `l_partkey`
- Verifies copartitioned execution avoids shuffle
- Tests complex query with filters and subqueries
- `writeWider`: Tests bucket count compatibility when counts differ
- Creates `part_b2` with 16 buckets (2× the 8 buckets in `part_b`)
- Joins `part_b` (8 buckets) with `part_b2` (16 buckets)
- Verifies optimizer handles division: 16 = 2 × 8, tasks can process matching buckets
- Tests CTAS (CREATE TABLE AS SELECT) with bucketing specification
13. **Bucketed table generation** (ParquetTpchTest.cpp, ParquetTpchTest.h):
- Added `makeBucketedTables()` static method to create bucketed versions of TPC-H tables
- Creates `orders_b`, `lineitem_b`, `partsupp_b`, `part_b` with 8 buckets each
- Handles date column conversion: uses `DATE` type and `CAST` in SELECT
- Uses LocalHiveConnectorMetadata to execute CTAS statements
- Writes data in Parquet format with proper bucketing metadata
- Invoked in `CopartitionTest::SetUpTestCase()` to prepare test data
14. **Write operation integration** (WriteTest.cpp):
- Tests that bucketed writes work when joining the written table to itself on all columns.
15. **Build system updates** (BUCK files, CMakeLists.txt):
- Added dependency on `velox/functions/prestosql/types/parser` for Presto type parsing
- Added CopartitionTest to test suite
- Added parquet_tpch dependency to various test targets
The optimization works by:
1. During distribution planning, check if input distribution's PartitionType copartitions with target PartitionType
2. If yes, mark distribution types as compatible via `isCopartitionCompatible()`
3. Skip inserting Repartition operators when distributions are compatible
4. In LocalRunner, assign task IDs that respect bucketing (task N reads bucket N % bucketCount)
5. Execute join/aggregation without network shuffle
Differential Revision: D89496393
fa60c6c to
b4f0410
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Differential Revision: D89496393