Skip to content

Commit f045b35

Browse files
tanjialiangfacebook-github-bot
authored andcommitted
feat: Add SpillPartitionFunction (facebookincubator#13079)
Summary: Add spill partition function later used for mixed mode hash join spilling Pull Request resolved: facebookincubator#13079 Reviewed By: xiaoxmeng Differential Revision: D73339693 fbshipit-source-id: 857f4e9dbb4f11b46e3154131d9dc5c62611963d
1 parent de1b15e commit f045b35

File tree

3 files changed

+175
-1
lines changed

3 files changed

+175
-1
lines changed

velox/exec/Spill.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,41 @@ SpillPartitionId SpillPartitionIdLookup::partition(uint64_t hash) const {
523523
return lookup_[bits::extractBits(hash, partitionBitsMask_)];
524524
}
525525

526+
SpillPartitionFunction::SpillPartitionFunction(
527+
const SpillPartitionIdLookup& lookup,
528+
const RowTypePtr& inputType,
529+
const std::vector<column_index_t>& keyChannels)
530+
: lookup_(lookup) {
531+
VELOX_CHECK(!keyChannels.empty(), "Key channels must not be empty.");
532+
hashers_.reserve(keyChannels.size());
533+
for (const auto channel : keyChannels) {
534+
VELOX_CHECK_NE(channel, kConstantChannel);
535+
hashers_.emplace_back(
536+
VectorHasher::create(inputType->childAt(channel), channel));
537+
}
538+
}
539+
540+
void SpillPartitionFunction::partition(
541+
const RowVector& input,
542+
std::vector<SpillPartitionId>& partitionIds) {
543+
const auto size = input.size();
544+
rows_.resize(size);
545+
rows_.setAll();
546+
547+
hashes_.resize(size);
548+
for (auto i = 0; i < hashers_.size(); ++i) {
549+
auto& hasher = hashers_[i];
550+
hashers_[i]->decode(*input.childAt(hasher->channel()), rows_);
551+
hashers_[i]->hash(rows_, i > 0, hashes_);
552+
}
553+
554+
partitionIds.resize(size);
555+
556+
for (auto i = 0; i < size; ++i) {
557+
partitionIds[i] = lookup_.partition(hashes_[i]);
558+
}
559+
}
560+
526561
uint8_t partitionBitOffset(
527562
const SpillPartitionId& id,
528563
uint8_t startPartitionBitOffset,

velox/exec/Spill.h

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@
2727
#include "velox/exec/SpillFile.h"
2828
#include "velox/exec/TreeOfLosers.h"
2929
#include "velox/exec/UnorderedStreamReader.h"
30+
#include "velox/exec/VectorHasher.h"
3031
#include "velox/vector/ComplexVector.h"
3132
#include "velox/vector/DecodedVector.h"
3233
#include "velox/vector/VectorStream.h"
3334

3435
namespace facebook::velox::exec {
36+
class VectorHasher;
37+
3538
/// A source of sorted spilled RowVectors coming either from a file or memory.
3639
class SpillMergeStream : public MergeStream {
3740
public:
@@ -212,7 +215,7 @@ class SpillPartitionId {
212215
/// 'SpillPartitionId'.
213216
static constexpr uint32_t kMaxPartitionBits{3};
214217

215-
/// Constructs an default invalid id.
218+
/// Constructs a default invalid id.
216219
SpillPartitionId() = default;
217220

218221
/// Constructs a root spill level id.
@@ -323,6 +326,30 @@ class SpillPartitionIdLookup {
323326
std::vector<SpillPartitionId> lookup_;
324327
};
325328

329+
/// Vectorized partitioning function for spill. The partitioning takes advantage
330+
/// of SpillPartitionIdLookup and performs a fast partitioning for the input
331+
/// vector.
332+
class SpillPartitionFunction {
333+
public:
334+
SpillPartitionFunction(
335+
const SpillPartitionIdLookup& lookup,
336+
const RowTypePtr& inputType,
337+
const std::vector<column_index_t>& keyChannels);
338+
339+
void partition(
340+
const RowVector& input,
341+
std::vector<SpillPartitionId>& partitionIds);
342+
343+
private:
344+
const SpillPartitionIdLookup lookup_;
345+
346+
std::vector<std::unique_ptr<VectorHasher>> hashers_;
347+
348+
// Reusable resources for hashing.
349+
SelectivityVector rows_;
350+
raw_vector<uint64_t> hashes_;
351+
};
352+
326353
/// Contains a spill partition data which includes the partition id and
327354
/// corresponding spill files.
328355
class SpillPartition {

velox/exec/tests/SpillTest.cpp

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,118 @@ TEST_P(SpillTest, spillPartitionIdLookup) {
895895
}
896896
}
897897

898+
TEST_P(SpillTest, spillPartitionFunctionBasic) {
899+
std::vector<RowVectorPtr> inputVectors;
900+
const auto rowType =
901+
ROW({"key1", "key2", "key3", "value"},
902+
{BIGINT(), VARCHAR(), VARCHAR(), VARCHAR()});
903+
{
904+
// Simple input vector for basic testing.
905+
const uint64_t numRows = 100;
906+
std::vector<VectorPtr> columns;
907+
columns.push_back(
908+
makeFlatVector<int64_t>(numRows, [](auto row) { return row; }));
909+
columns.push_back(makeFlatVector<std::string>(
910+
numRows, [](auto row) { return fmt::format("key_{}", row); }));
911+
columns.push_back(makeFlatVector<std::string>(
912+
numRows, [](auto row) { return fmt::format("key_{}_{}", row, row); }));
913+
columns.push_back(makeFlatVector<std::string>(
914+
numRows, [](auto row) { return fmt::format("val_{}", row); }));
915+
inputVectors.push_back(makeRowVector(columns));
916+
}
917+
918+
// Additional 2 fuzzed vector for wider range testing.
919+
{
920+
VectorFuzzer::Options options;
921+
options.vectorSize = 100;
922+
options.nullRatio = 0.3;
923+
options.allowDictionaryVector = true;
924+
inputVectors.push_back(VectorFuzzer(options, pool()).fuzzRow(rowType));
925+
}
926+
927+
{
928+
VectorFuzzer::Options options;
929+
options.vectorSize = 1000;
930+
options.nullRatio = 0.2;
931+
options.stringVariableLength = true;
932+
options.allowDictionaryVector = true;
933+
inputVectors.push_back(VectorFuzzer(options, pool()).fuzzRow(rowType));
934+
}
935+
936+
struct TestData {
937+
std::string name;
938+
SpillPartitionIdSet partitionIds;
939+
uint32_t startPartitionBit;
940+
uint32_t numPartitionBits;
941+
std::vector<column_index_t> keyChannels;
942+
size_t numRows;
943+
};
944+
945+
std::vector<TestData> testCases = {
946+
// Basic test with multiple partitions
947+
{"basic",
948+
{genPartitionId({0}),
949+
genPartitionId({1}),
950+
genPartitionId({2, 1}),
951+
genPartitionId({3, 2})},
952+
0,
953+
2,
954+
{0, 1},
955+
100},
956+
957+
// Test with empty key channels
958+
{"emptyKeys", {genPartitionId({0}), genPartitionId({1})}, 0, 2, {}, 50},
959+
960+
// Test with nulls
961+
{"withNulls",
962+
{genPartitionId({0}), genPartitionId({1}), genPartitionId({2})},
963+
0,
964+
2,
965+
{0, 1},
966+
80},
967+
968+
// Test with higher start partition bits
969+
{"highStartBits",
970+
{genPartitionId({0}),
971+
genPartitionId({1}),
972+
genPartitionId({2}),
973+
genPartitionId({3})},
974+
8,
975+
2,
976+
{0, 1, 2},
977+
120}};
978+
979+
for (const auto& data : testCases) {
980+
for (auto i = 0; i < inputVectors.size(); ++i) {
981+
SCOPED_TRACE(fmt::format("Test case: {}, Input vector {}", data.name, i));
982+
983+
SpillPartitionIdLookup lookup(
984+
data.partitionIds, data.startPartitionBit, data.numPartitionBits);
985+
if (data.keyChannels.empty()) {
986+
VELOX_ASSERT_THROW(
987+
SpillPartitionFunction(lookup, rowType, data.keyChannels),
988+
"Key channels must not be empty");
989+
continue;
990+
}
991+
SpillPartitionFunction partitionFunction(
992+
lookup, rowType, data.keyChannels);
993+
994+
auto& input = inputVectors[i];
995+
std::vector<SpillPartitionId> resultPartitionIds;
996+
partitionFunction.partition(*input, resultPartitionIds);
997+
998+
ASSERT_EQ(resultPartitionIds.size(), input->size());
999+
for (const auto& id : resultPartitionIds) {
1000+
ASSERT_TRUE((!id.valid()) || data.partitionIds.contains(id));
1001+
}
1002+
1003+
std::unordered_set<SpillPartitionId> uniquePartitions(
1004+
resultPartitionIds.begin(), resultPartitionIds.end());
1005+
ASSERT_GT(uniquePartitions.size(), 1);
1006+
}
1007+
}
1008+
}
1009+
8981010
TEST_P(SpillTest, spillPartitionSet) {
8991011
for (int iter = 0; iter < 3; ++iter) {
9001012
folly::Random::DefaultGenerator rng;

0 commit comments

Comments
 (0)