feat(PartitionedOutput): Add dictionary encoding support#2119
Open
xin-zhang2 wants to merge 26 commits into
Open
feat(PartitionedOutput): Add dictionary encoding support#2119xin-zhang2 wants to merge 26 commits into
xin-zhang2 wants to merge 26 commits into
Conversation
This commit introduces `PartitionedVector` - a low-level execution abstraction that provides an in-place, partition-aware layout of a vector based on per-row partition IDs. 1. **In-place rearrangement**: Rearrange vector data in memory without creating multiple copies 2. **Buffer reuse**: Allow reuse of temporary buffers across multiple partitioning operations 3. **Minimal abstraction**: Similar to `DecodedVector`, focus on efficient execution rather than operator semantics 4. **Thread-unsafe by design**: Optimized for single-threaded execution contexts For more information please see IBM#1703 Alchemy-item: (ID = 1150) Introducing PartitionedVector commit 1/1 - 960f41b
Signed-off-by: Xin Zhang <xin-zhang2@ibm.com> Alchemy-item: (ID = 1167) Add PartitionedRowVector commit 1/1 - f2af427
PartitionedFlatVector::partition() and PartitionedRowVector::partition() called mutableRawNulls() unconditionally. mutableRawNulls() allocates a null buffer if one does not exist, causing mayHaveNulls() to return true for every vector after partitioning, even when the original had no nulls. Fix both sites to check rawNulls() first and only call mutableRawNulls() when a null buffer already exists. Add noNullBufferAllocatedForNullFreeFlat and noNullBufferAllocatedForNullFreeRow tests to PartitionedVectorTest to cover this case. # Conflicts: # velox/vector/PartitionedVector.cpp
This commit introduces PrestoIterativePartitioningSerializer, which buffers RowVectors across multiple append() calls, partitions rows in-place using PartitionedVector, and on flush() serializes each non-empty partition into a Presto wire-format IOBuf. The serializer has no dependency on velox_exec: it returns raw folly::IOBuf objects, leaving SerializedPage creation to the caller.
This commit introduces OptimizedPartitionedOutput, a PartitionedOutput operator backed by PrestoIterativePartitioningSerializer. Enabled via query config key "optimized_repartitioning" (default off). LocalPlanner selects it over the standard PartitionedOutput when the flag is set. TODO: replicateNullsAndAny is not yet supported and raises a user error.
…geBenchmark - Added normal vs optimized PartitionedOutput comparison by running each exchange case twice with kOptimizedPartitionedOutputEnabled=false/true. - Added per-mode benchmark names: - exchange<Case>_normalPartitionedOutput - exchange<Case>_optimizedPartitionedOutput in ExchangeBenchmark.cpp. - Refactored result printing into shared helpers and fixed output consistency in ExchangeBenchmark.cpp.
…mark Split the local partition exchange benchmark out of ExchangeBenchmark into its own executable and CMake target, while keeping the local benchmark logic and statistics reporting available in a dedicated binary.
…tioningSerializer
…fferManager listeners Pass an OutputBufferManager-backed listener factory into PrestoIterativePartitioningSerializer so the optimized path uses the same listener source as normal PartitionedOutput. Create per-partition listeners during flush, set the checksum bit only when a listener is present, and compute the page checksum only for PrestoOutputStreamListener instances. Also add tests that verify checksum headers are written and that the serialized pages round-trip through the standard deserializer.
…rting - add explicit simple-schema benchmark cases by type and column count - register normal and optimized runs as separate named benchmark cases - make `dictPct` apply per generated vector and recurse into nested types - generate benchmark input vectors directly with optional nulls - replace ad hoc flat input generation with explicit input specs - return `ExchangeRunStats` from benchmark runs and centralize query config - group printed results by dataset with normal vs. optimized stats
The new OptimizedVectorHasher is up to 2-3x faster than VectorHasher.
BatchMaker always allocates a null buffer even when no rows are null. This commit removes it so benchmarks measure the non-nullable path faithfully. Plus some minor format cleanups.
Currently Velox never passes CMAKE_BUILD_TYPE into Folly's own configure
step, while cmake_install only forwards arbitrary caller flags, so Folly
was not built in release mode when Velox is built in release mode. This
commit adds -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" to FOLLY_FLAGS, so
a release Velox dependency setup now builds release Folly on macOS and
Linux.
- Remove benchmarks with vector size 1000000 and only keep vector size 10000. - Add tinyint and smallint benchmarks.
Introduce OptimizedHashPartitionFunction as a faster drop-in replacement for HashPartitionFunction, gated behind a new query config flag optimized_hash_partition_function_enabled (default false). partition() is improved from 50% to over 200x. Add HashPartitionFunctionBase as a common base exposing numPartitions(), and createHashPartitionFunction() factories that select the implementation based on the flag. Thread QueryConfig* through PartitionFunctionSpec::create() and update callsites (LocalPartition, PartitionedOutput, MarkDistinct, RowNumber, Window, SubPartitionedSortWindowBuild, HiveConnector) to construct partition functions via the factory. Register CMake targets for the new test and benchmark binaries.
Reimplement PrestoIterativePartitioningSerializer::flushRowColumn around bulk bit operations and fix two coupled null-handling defects in the simple-column path. The three changes are interdependent: correct null serialization requires all of them, so they land together. flushRowColumn: - AND the incoming parentNulls into this level's own nulls in place with bits::andBits, reusing the vector's own buffer (or sharing the parent's when there are no own nulls) instead of allocating a per-batch buffer. - Count parent-live and live rows per partition with bits::countBits to drive child rowCounts/parentNullCounts and the footer numRows. - Compact the live bitmap across batch boundaries with bits::extractBits via a branchless compactBits/appendLowBits helper; offsets become a prefix sum over the compacted bitmap, with a sequential fast path for null-free partitions. flushFlatValues: stop calling mutableRawNulls() (which materialized an all-not-null buffer and masked the no-null fast path, then dereferenced a null parentNulls in andBits), and iterate each partition's own [lastOffset, offset) range. Leaf flushers now take parentNulls directly; flushSingleConstantVector also takes the per-partition parent-null counts. flushNulls: replace the stub that always wrote hasNulls=0 with a real implementation that compacts per-batch validity over parentNulls into a per-partition bitmap sized to context.rowCounts, handling FLAT and CONSTANT (including null constants) for both top-level and nested columns. Removes the now-unused flushSimpleVectorNulls/flushConstantVectorNulls. Add multiLevelNullsMultiAppendMultiPartition covering nulls at multiple nested ROW levels across several appends and partitions. All 59 tests in PrestoIterativePartitioningSerializerTest pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
cff9eff to
6944d36
Compare
6944d36 to
974bb09
Compare
974bb09 to
ee25fa7
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.