Skip to content

Introducing PartitionedVector#1596

Open
yingsu00 wants to merge 1 commit intoIBM:oss-mainfrom
yingsu00:PartitionedOutput1.0
Open

Introducing PartitionedVector#1596
yingsu00 wants to merge 1 commit intoIBM:oss-mainfrom
yingsu00:PartitionedOutput1.0

Conversation

@yingsu00
Copy link
Collaborator

This commit is the first PR for optimized PartitionedOutput. It introduces the PartitionedVector, in which the values are partitioned according to a given partitionId list. It uses in place swapping algorithm and has very high throughput. It can also be used in aggregation, sorting, etc.

@yingsu00 yingsu00 requested a review from xin-zhang2 January 14, 2026 08:06
@yingsu00 yingsu00 force-pushed the PartitionedOutput1.0 branch 3 times, most recently from bf73a19 to aded22e Compare February 13, 2026 05:42
@yingsu00 yingsu00 marked this pull request as ready for review February 13, 2026 05:47
@yingsu00 yingsu00 removed the request for review from majetideepak February 13, 2026 05:47
Copy link
Member

@czentgr czentgr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good. I suppose we need to see the subsequent changes to move them to the output buffer in the PartitionedOutput operator. Is this code available somewhere too?

BufferPtr beginPartitionOffsets;

/// Optional reusable buffer for in-place row swapping.
BufferPtr swappingBuffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs initialization. Given it is optional I expect that this member won't always be set and you'd run into compiler errors on newer compilers.

void initializeBeginPartitionOffsets(
BufferPtr& beginPartitionOffsets,
const BufferPtr& endPartitionOffsets,
int32_t numPartitions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector_size_t?
In the test I also see unit32_t being used for numPartitions. But isn't that always the same and as such should have the same type?

The PartitionedVector uses numPartitions uint32_t as storage but sets it from a const uint32_t. Why not use vector_size_t?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector_size_t?

Sure.

The PartitionedVector uses numPartitions uint32_t as storage but sets it from a const uint32_t. Why not use vector_size_t?
In my understanding, this is a history paradox. The PartitionFunction interface defines the partitions in uint32_t

  virtual std::optional<uint32_t> partition(
      const RowVector& input,
      std::vector<uint32_t>& partitions) = 0;

But Velox is trying to use vector_size_t as size units and vector index everywhere. vector_size_t is defined as int32_t today. But I've seen places it overflows and may need to be int64_t instead. I think the right way is to unify the partition representation with Vector index, and both shall use vector_size_t. vector_size_t can be easily expanded or overriden in the future by defining using vector_size_t = int64_t; . But for now, I changed all numPartitions to vector_size_t. Another option is to make them all uint32_t. @czentgr Which one do you think makes more sense?

// TODO: This was copied from dwio::common::BufferUtil.h. However the vector
// module should not depend on dwio. Move this to a common place
template <typename T>
void ensureCapacity(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should go into the cpp file and declared for use in the test.
Or as you suggest a new utility?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a template function and the definition needs to be in the header file. IMHO it's not worthwhile to separate the declaration and definition for this very simple function. My original thought was to extract it from dwio::common and move it to common, but it's better to be done in a separate PR or commit. And to avoid future rebase conflicts I left it in this .h file because this file is new. But I just moved it to VectorUtil.h for now.

BufferPtr swappingBuffer;

/// Optional starting row offset (used when partitioning a subset of rows).
vector_size_t firstRow{0};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose for future use? This and the other member? Currently they are not used at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they are for complex types. I can remove them now but that makes it's unjustifiable to have a PartitionBuildContext. Do you prefer removing them for now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

std::memcpy(
&beginPartitionOffsets->asMutable<vector_size_t>()[1],
endPartitionOffsets->as<vector_size_t>(),
sizeof(uint32_t) * (numPartitions - 1));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be sizeof(vector_size_t). In the next line it uses sizeof(vector_size_t). As mentioned before I don't know why we switch between the types. It could cause problems.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree. I updated the unit of partition to uint32_t and unit of rows to vector_size_t

@yingsu00 yingsu00 force-pushed the PartitionedOutput1.0 branch 2 times, most recently from 02e249e to 667d8b5 Compare February 23, 2026 07:13
@yingsu00
Copy link
Collaborator Author

@czentgr @xin-zhang2 Thank you very much for reviewing this PR! I have addressed your comments and did the following improvements:

  1. Added VELOX_CHECKs for constructors and public facing functions
  2. Renamed beginPartitionOffsets to cursorPartitionOffsets
  3. Unified unit of partitions to uint32_t and unit of rows to vector_size_t
  4. Cleaned up the constructors and static create functions. Added a new create function that is public facing and does not need to take endPartitionOffsets. The old one is moved to protected section for future complex types.
  5. Enforced const whenever possible
  6. Fixed a bug in test where the write of exepectedVector could write out of bound.

Your second review is much appreciated!

velox::memory::MemoryPool* pool);

/// Allow move constructor and move assignment operator.
PartitionedVector(PartitionedVector&& other) = default;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The declarations of move constructor and move assigment can be placed below the deleted copy assignment.

velox::memory::MemoryPool* pool);

PartitionedVector(
VectorPtr vector,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector can be passed as const reference.

velox::memory::MemoryPool* pool)
: PartitionedVector(flatVector, numPartitions, partitionOffsets, pool) {}

void partition(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make more sense to declare partition in PartitionedVector and override it here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Previously it was not in base class because the arguments were not the same for different type of vectors. Since I wrapped them up in PartitionBuildContext in this version, we can make it virtual.

}
}

inline void prefixSum(vector_size_t* offsets, uint32_t numPartitions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

countPartitionSizes and prefixSum are always called together, so it might be better to combine them into a single function. We could also call ensureCapacity for endPartitionOffsets inside the function to prevent potential out-of-bound issues.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-zhang2 prefixSum is used separately for Dictionary vectors. So I think we can use it this way for now.

// partition by repeatedly swapping elements until the current element belongs
// to the current partition
template <typename T>
void partitionFixedWidthValuesInPlace(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also include the specialization for bool in this PR.
There's already an implementation in my draft PR for the benchmark.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we can, but it will make this PR longer to merge. How about send a new PR with this one as the first commit, then rebase after this one is merged?


vector_size_t destinationAddr = destinationOffset >> 3;
int8_t destinationBitInByte = destinationOffset & 7;
vector_size_t fromAddr = offset / kBitsPerByte;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to use kBitsPerByte here? Since it's always 8, we can also consider using bit operations when computing fromAddr and fromBitInByte.

case VectorEncoding::Simple::BIASED:
case VectorEncoding::Simple::SEQUENCE:
case VectorEncoding::Simple::MAP:
case VectorEncoding::Simple::LAZY:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONSTANT can be included in the unsupported encodings.

case VectorEncoding::Simple::MAP:
case VectorEncoding::Simple::LAZY:
VELOX_UNSUPPORTED(
"Unsupported vector encoding for OptimizedPartitionedOutput: {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OptimizedPartitionedOutput can be modified to PartitionedVector.

mapSimpleToName(encoding));
default:
VELOX_UNREACHABLE(
"Invalid vector encoding for OptimizedPartitionedOutput: {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

case VectorEncoding::Simple::FLAT: {
// Print the addresses of vector's values and nulls buffers for debugging
auto nulls = vector->rawNulls();
auto values = vector->values()->as<char>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nulls and values are not used can be removed.

PartitionBuildContext& ctx) {
auto valuesBuffer = vector_->as<FlatVector<T>>()->values();

Byte* rawNulls = (Byte*)vector_->rawNulls();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to call mutableRawNulls() as rawNulls is modified in partitionBitsInPlace, and use reinterpret_cast for the cast.

Copy link
Member

@xin-zhang2 xin-zhang2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 Left a few comments. Please take a look. Thanks!

@yingsu00 yingsu00 force-pushed the PartitionedOutput1.0 branch from 667d8b5 to fc7b2d5 Compare February 27, 2026 08:42
@yingsu00
Copy link
Collaborator Author

@xin-zhang2 Thank you for your thorough review! Comments addressed, please take a look.

@yingsu00 yingsu00 force-pushed the PartitionedOutput1.0 branch from fc7b2d5 to ff120fd Compare February 27, 2026 08:48
Copy link
Member

@xin-zhang2 xin-zhang2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thanks!

This was referenced Mar 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants