diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index db6fcf398d0d..7d04cbc0b0e0 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -456,6 +456,12 @@ class QueryConfig { static constexpr const char* kHashJoinSpillFileCreateConfig = "hash_join_spill_file_create_config"; + /// Config used to create row number spill files. This config is provided to + /// underlying file system and the config is free form. The form should be + /// defined by the underlying file system. + static constexpr const char* kRowNumberSpillFileCreateConfig = + "row_number_spill_file_create_config"; + /// Default offset spill start partition bit. /// 'kSpillNumPartitionBits' together to /// calculate the spilling partition number for join spill or aggregation @@ -1269,6 +1275,10 @@ class QueryConfig { return get(kHashJoinSpillFileCreateConfig, ""); } + std::string rowNumberSpillFileCreateConfig() const { + return get(kRowNumberSpillFileCreateConfig, ""); + } + int32_t minSpillableReservationPct() const { constexpr int32_t kDefaultPct = 5; return get(kMinSpillableReservationPct, kDefaultPct); diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 3e78859a2319..43db455228c8 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -124,6 +124,10 @@ bool isAggregationSpillOperator(std::string_view operatorType) { return operatorType == OperatorType::kAggregation || operatorType == OperatorType::kPartialAggregation; } + +bool isRowNumberSpillOperator(std::string_view operatorType) { + return operatorType == OperatorType::kRowNumber; +} } // namespace std::optional DriverCtx::makeSpillConfig( @@ -159,6 +163,11 @@ std::optional DriverCtx::makeSpillConfig( if (!aggregationConfig.empty()) { fileCreateConfig = aggregationConfig; } + } else if (isRowNumberSpillOperator(operatorType)) { + const auto& rowNumberConfig = queryConfig.rowNumberSpillFileCreateConfig(); + if (!rowNumberConfig.empty()) { + fileCreateConfig = rowNumberConfig; + } } return common::SpillConfig( diff --git a/velox/exec/tests/RowNumberTest.cpp b/velox/exec/tests/RowNumberTest.cpp index 0a58aec3acd1..ba832c12dafb 100644 --- a/velox/exec/tests/RowNumberTest.cpp +++ b/velox/exec/tests/RowNumberTest.cpp @@ -575,4 +575,41 @@ TEST_F(RowNumberTest, spillWithYield) { } } +DEBUG_ONLY_TEST_F(RowNumberTest, rowNumberSpillFileCreateConfig) { + auto vectors = createVectors(8, rowType_, fuzzerOpts_); + createDuckDbTable(vectors); + + auto tempDirectory = TempDirectoryPath::create(); + + std::atomic_bool rowNumberConfigVerified{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::isBlocked", + std::function([&](exec::Operator* op) { + const auto* spillConfig = op->testingSpillConfig(); + if (spillConfig == nullptr) { + return; + } + const auto& opType = op->operatorType(); + if (opType == "RowNumber") { + ASSERT_EQ(spillConfig->fileCreateConfig, "test_row_number_config") + << "Operator: " << opType; + rowNumberConfigVerified = true; + } + })); + + TestScopedSpillInjection scopedSpillInjection(100); + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(tempDirectory->getPath()) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kRowNumberSpillEnabled, true) + .config(core::QueryConfig::kSpillFileCreateConfig, "test_default_config") + .config( + core::QueryConfig::kRowNumberSpillFileCreateConfig, + "test_row_number_config") + .plan(PlanBuilder().values(vectors).rowNumber({"c0"}).planNode()) + .assertResults("SELECT *, row_number() over (partition by c0) FROM tmp"); + + ASSERT_TRUE(rowNumberConfigVerified.load()); +} + } // namespace facebook::velox::exec::test