Skip to content

Commit 30b7fb6

Browse files
committed
feat: Add PartitioningSerializer for Presto serialization
This commit introduces PartitioningSerializer, which buffers RowVectors across multiple append() calls, partitions rows in-place using PartitionedVector, and on flush() serializes each non-empty partition into a Presto wire-format IOBuf. The serializer has no dependency on velox_exec — it returns raw folly::IOBuf objects, leaving SerializedPage creation to the caller.
1 parent cbb849a commit 30b7fb6

File tree

5 files changed

+484
-0
lines changed

5 files changed

+484
-0
lines changed

velox/serializers/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ velox_link_libraries(
2525
velox_add_library(
2626
velox_presto_serializer
2727
CompactRowSerializer.cpp
28+
PartitioningSerializer.cpp
2829
PrestoSerializer.cpp
2930
UnsafeRowSerializer.cpp
3031
PrestoBatchVectorSerializer.cpp
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/serializers/PartitioningSerializer.h"
17+
18+
#include "velox/common/memory/ByteStream.h"
19+
#include "velox/vector/VectorStream.h"
20+
21+
namespace facebook::velox::serializer::presto {
22+
23+
PartitioningSerializer::PartitioningSerializer(
24+
RowTypePtr inputType,
25+
uint32_t numPartitions,
26+
const SerdeOpts& opts,
27+
memory::MemoryPool* pool)
28+
: type_(std::move(inputType)),
29+
numPartitions_(numPartitions),
30+
opts_(opts),
31+
pool_(pool),
32+
rowsPerPartition_(numPartitions, 0) {
33+
VELOX_CHECK_GT(numPartitions_, 0);
34+
VELOX_CHECK_NOT_NULL(pool_);
35+
}
36+
37+
void PartitioningSerializer::append(
38+
const RowVectorPtr& input,
39+
const std::vector<uint32_t>& partitions) {
40+
VELOX_CHECK_NOT_NULL(input);
41+
VELOX_CHECK_EQ(
42+
input->size(),
43+
partitions.size(),
44+
"partitions.size() must equal input->size()");
45+
46+
if (input->size() == 0) {
47+
return;
48+
}
49+
50+
PartitionBuildContext ctx;
51+
auto pv = PartitionedVector::create(
52+
std::static_pointer_cast<BaseVector>(input),
53+
partitions,
54+
numPartitions_,
55+
ctx,
56+
pool_);
57+
58+
const auto* offsets = pv->rawPartitionOffsets();
59+
vector_size_t prevOffset = 0;
60+
for (uint32_t p = 0; p < numPartitions_; ++p) {
61+
rowsPerPartition_[p] += offsets[p] - prevOffset;
62+
prevOffset = offsets[p];
63+
}
64+
65+
partitionedPages_.push_back(std::move(pv));
66+
bytesBuffered_ += input->retainedSize();
67+
rowsBuffered_ += static_cast<int64_t>(input->size());
68+
}
69+
70+
std::map<uint32_t, std::unique_ptr<folly::IOBuf>>
71+
PartitioningSerializer::flush() {
72+
if (partitionedPages_.empty()) {
73+
return {};
74+
}
75+
76+
std::map<uint32_t, std::unique_ptr<folly::IOBuf>> result;
77+
78+
for (uint32_t p = 0; p < numPartitions_; ++p) {
79+
const auto numRows = rowsPerPartition_[p];
80+
if (numRows == 0) {
81+
continue;
82+
}
83+
84+
auto streamGroup = std::make_unique<VectorStreamGroup>(pool_, &serde_);
85+
streamGroup->createStreamTree(type_, numRows, &opts_);
86+
87+
for (auto& pv : partitionedPages_) {
88+
auto slice = std::dynamic_pointer_cast<RowVector>(pv->partitionAt(p));
89+
if (slice && slice->size() > 0) {
90+
streamGroup->append(slice);
91+
}
92+
}
93+
94+
const int32_t estimatedSize =
95+
std::max(static_cast<int32_t>(bytesBuffered_ / numPartitions_), 1024);
96+
IOBufOutputStream out(*pool_, nullptr, estimatedSize);
97+
streamGroup->flush(&out);
98+
result[p] = out.getIOBuf();
99+
}
100+
101+
partitionedPages_.clear();
102+
std::fill(rowsPerPartition_.begin(), rowsPerPartition_.end(), 0);
103+
bytesBuffered_ = 0;
104+
rowsBuffered_ = 0;
105+
106+
return result;
107+
}
108+
109+
} // namespace facebook::velox::serializer::presto
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <map>
19+
20+
#include <folly/io/IOBuf.h>
21+
22+
#include "velox/serializers/PrestoSerializer.h"
23+
#include "velox/vector/PartitionedVector.h"
24+
25+
namespace facebook::velox::serializer::presto {
26+
27+
using SerdeOpts = PrestoVectorSerde::PrestoOptions;
28+
29+
/// Buffers one or more RowVectors and serializes them into per-partition
30+
/// Presto-format pages. Rows are assigned to partitions via the partitions
31+
/// array passed to append(). The serializer accumulates partitioned data
32+
/// across multiple append() calls and emits all buffered data on flush().
33+
///
34+
/// Thread-safety: not thread-safe.
35+
class PartitioningSerializer {
36+
public:
37+
/// Constructs a serializer for the given input type, number of partitions,
38+
/// and serde options. All serialized output uses the provided memory pool.
39+
PartitioningSerializer(
40+
RowTypePtr inputType,
41+
uint32_t numPartitions,
42+
const SerdeOpts& opts,
43+
memory::MemoryPool* pool);
44+
45+
/// Appends 'input' to the buffer. partitions[i] must be in [0,
46+
/// numPartitions) and gives the destination partition for row i. The size
47+
/// of 'partitions' must equal input->size().
48+
void append(
49+
const RowVectorPtr& input,
50+
const std::vector<uint32_t>& partitions);
51+
52+
/// Serializes all buffered data into Presto wire format. Returns one IOBuf
53+
/// per non-empty partition, keyed by partition index. The caller is
54+
/// responsible for wrapping the IOBuf in an appropriate page type (e.g.
55+
/// PrestoSerializedPage). Clears the internal buffer after flushing.
56+
std::map<uint32_t, std::unique_ptr<folly::IOBuf>> flush();
57+
58+
/// Returns the total in-memory bytes of all buffered RowVectors.
59+
int64_t bytesBuffered() const {
60+
return bytesBuffered_;
61+
}
62+
63+
/// Returns the total number of rows in all buffered RowVectors.
64+
int64_t rowsBuffered() const {
65+
return rowsBuffered_;
66+
}
67+
68+
private:
69+
const RowTypePtr type_;
70+
const uint32_t numPartitions_;
71+
const SerdeOpts opts_;
72+
PrestoVectorSerde serde_;
73+
memory::MemoryPool* const pool_;
74+
75+
std::vector<PartitionedVectorPtr> partitionedPages_;
76+
77+
/// Cumulative number of rows assigned to each partition across all appends.
78+
std::vector<vector_size_t> rowsPerPartition_;
79+
80+
int64_t bytesBuffered_{0};
81+
int64_t rowsBuffered_{0};
82+
};
83+
84+
} // namespace facebook::velox::serializer::presto

velox/serializers/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ target_link_libraries(
3535
add_executable(
3636
velox_serializer_test
3737
CompactRowSerializerTest.cpp
38+
PartitioningSerializerTest.cpp
3839
PrestoOutputStreamListenerTest.cpp
3940
PrestoSerializerTest.cpp
4041
SerializedPageFileTest.cpp

0 commit comments

Comments
 (0)