Skip to content

Commit 35467c8

Browse files
oerlingmeta-codesync[bot]
authored andcommitted
Support copartitioned execution with hive bucketing
Differential Revision: D89496393
1 parent b2b6cf2 commit 35467c8

File tree

4 files changed

+335
-7
lines changed

4 files changed

+335
-7
lines changed

axiom/connectors/hive/HiveConnectorMetadata.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class HivePartitionType : public connector::PartitionType {
7676

7777
std::string toString() const override;
7878

79+
int32_t numPartitions() const {
80+
return numPartitions_;
81+
}
82+
7983
private:
8084
const int32_t numPartitions_;
8185
const std::vector<velox::TypePtr> partitionKeyTypes_;

axiom/optimizer/tests/WriteTest.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include "velox/core/QueryConfig.h"
2222
#include "velox/dwio/parquet/RegisterParquetWriter.h"
2323

24+
#include <folly/String.h>
25+
2426
namespace facebook::axiom::optimizer {
2527
namespace {
2628

@@ -169,6 +171,61 @@ class WriteTest : public test::HiveQueriesTestBase {
169171
return data;
170172
}
171173

174+
void runBucketedJoinQuery(
175+
const std::string& tableName,
176+
const std::vector<std::string>& joinColumns,
177+
int64_t expectedCount) {
178+
SCOPED_TRACE(fmt::format(
179+
"Bucketed join on table {} with columns [{}]",
180+
tableName,
181+
folly::join(", ", joinColumns)));
182+
183+
// Build the join condition: t1.col1 = t2.col1 AND t1.col2 = t2.col2 ...
184+
std::vector<std::string> joinConditions;
185+
for (const auto& col : joinColumns) {
186+
joinConditions.push_back(fmt::format("t1.{} = t2.{}", col, col));
187+
}
188+
std::string joinCondition = folly::join(" AND ", joinConditions);
189+
190+
// Build the SQL query
191+
std::string sql = fmt::format(
192+
"SELECT count(*) FROM {} t1, {} t2 WHERE {}",
193+
tableName,
194+
tableName,
195+
joinCondition);
196+
197+
::axiom::sql::presto::PrestoParser parser(
198+
exec::test::kHiveConnectorId, std::nullopt, pool());
199+
200+
auto statement = parser.parse(sql);
201+
auto logicalPlan = statement->as<::axiom::sql::presto::SelectStatement>()->plan();
202+
203+
connector::SchemaResolver schemaResolver;
204+
205+
runner::MultiFragmentPlan::Options options{
206+
.numWorkers = 4,
207+
.numDrivers = 4,
208+
};
209+
210+
auto plan = planVelox(logicalPlan, schemaResolver, options);
211+
auto result = runFragmentedPlan(plan);
212+
213+
ASSERT_EQ(1, result.results.size());
214+
ASSERT_EQ(1, result.results[0]->size());
215+
216+
const auto& child = result.results[0]->childAt(0);
217+
ASSERT_TRUE(child);
218+
ASSERT_EQ(1, child->size());
219+
220+
const auto value = child->variantAt(0);
221+
ASSERT_TRUE(!value.isNull());
222+
223+
ASSERT_EQ(expectedCount, value.value<int64_t>())
224+
<< "Expected count " << expectedCount << " for bucketed join on table "
225+
<< tableName << " with join columns [" << folly::join(", ", joinColumns)
226+
<< "], but got " << value.value<int64_t>();
227+
}
228+
172229
private:
173230
velox::Variant evaluateConstantExpr(const lp::Expr& expr);
174231
};
@@ -490,6 +547,9 @@ TEST_F(WriteTest, createTableAsSelectBucketedSql) {
490547

491548
verifyPartitionedLayout(getLayout("test"), "key", 8);
492549

550+
// Test bucketed self-join on single bucket column
551+
runBucketedJoinQuery("test", {"key"}, 600'572);
552+
493553
// Copy bucketed table with a larger bucket_count. Expect no shuffle.
494554
runCtas(
495555
"CREATE TABLE test3 WITH (bucket_count = 16, bucketed_by = ARRAY['key']) AS "
@@ -499,6 +559,9 @@ TEST_F(WriteTest, createTableAsSelectBucketedSql) {
499559

500560
verifyPartitionedLayout(getLayout("test3"), "key", 16);
501561

562+
// Test bucketed self-join on table with larger bucket count
563+
runBucketedJoinQuery("test3", {"key"}, 600'572);
564+
502565
// Copy bucketed table with same bucket_count. Expect no shuffle.
503566
runCtas(
504567
"CREATE TABLE test2 WITH (bucket_count = 8, bucketed_by = ARRAY['key']) AS "
@@ -508,6 +571,9 @@ TEST_F(WriteTest, createTableAsSelectBucketedSql) {
508571

509572
verifyPartitionedLayout(getLayout("test2"), "key", 8);
510573

574+
// Test bucketed self-join on copied table
575+
runBucketedJoinQuery("test2", {"key"}, 600'572);
576+
511577
// Copy bucketed table with a larger bucket_count. Expect no shuffle.
512578
runCtas(
513579
"CREATE TABLE test3 WITH (bucket_count = 16, bucketed_by = ARRAY['key']) AS "
@@ -525,6 +591,9 @@ TEST_F(WriteTest, createTableAsSelectBucketedSql) {
525591
verifyPartitionedWrite);
526592

527593
verifyPartitionedLayout(getLayout("test4"), "key", 2);
594+
595+
// Test bucketed self-join on table with smaller bucket count
596+
runBucketedJoinQuery("test4", {"key"}, 600'572);
528597
}
529598

530599
// Single-node execution.
@@ -593,6 +662,9 @@ TEST_F(WriteTest, createTableAsSelectBucketedSql) {
593662
verifyPartitionedWrite);
594663

595664
verifyPartitionedLayout(getLayout("test"), "n_nationkey", 16, "n_name");
665+
666+
// Test bucketed self-join on single bucket column
667+
runBucketedJoinQuery("test", {"n_nationkey"}, 25);
596668
}
597669
}
598670

0 commit comments

Comments
 (0)