@@ -3636,6 +3636,84 @@ TEST_P(HashJoinTest, semiProjectOverLazyVectors) {
36363636 .run ();
36373637}
36383638
3639+ // Verifies that hash join with lazy probe input produces correct results
3640+ // across multiple join types, with automatic spill injection via
3641+ // HashJoinBuilder.
3642+ TEST_P (MultiThreadedHashJoinTest, hashJoinWithLazyProbeInputAndSpill) {
3643+ VectorFuzzer::Options opts;
3644+ opts.vectorSize = 1'000 ;
3645+ VectorFuzzer fuzzer (opts, pool ());
3646+
3647+ const int32_t numProbeVectors = 3 ;
3648+ std::vector<RowVectorPtr> probeVectors;
3649+ std::vector<RowVectorPtr> probeReference;
3650+ probeVectors.reserve (numProbeVectors);
3651+ probeReference.reserve (numProbeVectors);
3652+ for (int32_t i = 0 ; i < numProbeVectors; ++i) {
3653+ auto nonLazy = fuzzer.fuzzRow (probeType_);
3654+ probeReference.push_back (
3655+ std::dynamic_pointer_cast<RowVector>(
3656+ nonLazy->testingCopyPreserveEncodings ()));
3657+ probeVectors.push_back (
3658+ VectorFuzzer (opts, pool ()).fuzzRowChildrenToLazy (nonLazy));
3659+ }
3660+ createDuckDbTable (" t" , probeReference);
3661+
3662+ const int32_t numBuildVectors = 3 ;
3663+ std::vector<RowVectorPtr> buildVectors;
3664+ buildVectors.reserve (numBuildVectors);
3665+ for (int32_t i = 0 ; i < numBuildVectors; ++i) {
3666+ buildVectors.push_back (fuzzer.fuzzRow (buildType_));
3667+ }
3668+ createDuckDbTable (" u" , buildVectors);
3669+
3670+ struct {
3671+ core::JoinType joinType;
3672+ std::string filter;
3673+ std::vector<std::string> outputColumns;
3674+ std::string referenceQuery;
3675+ } testCases[] = {
3676+ {core::JoinType::kInner ,
3677+ " " ,
3678+ concat (probeType_->names (), buildType_->names ()),
3679+ " SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1" },
3680+ {core::JoinType::kLeft ,
3681+ " " ,
3682+ concat (probeType_->names (), buildType_->names ()),
3683+ " SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t LEFT JOIN u ON t.t_k1 = u.u_k1" },
3684+ {core::JoinType::kFull ,
3685+ " " ,
3686+ concat (probeType_->names (), buildType_->names ()),
3687+ " SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t FULL OUTER JOIN u ON t.t_k1 = u.u_k1" },
3688+ {core::JoinType::kAnti ,
3689+ " t_k2 <> u_k2" ,
3690+ probeType_->names (),
3691+ " SELECT t_k1, t_k2, t_v1 FROM t WHERE NOT EXISTS (SELECT 1 FROM u WHERE t.t_k1 = u.u_k1 AND t.t_k2 <> u.u_k2)" },
3692+ {core::JoinType::kLeftSemiProject ,
3693+ " " ,
3694+ concat (probeType_->names (), {" match" }),
3695+ " SELECT t_k1, t_k2, t_v1, EXISTS (SELECT 1 FROM u WHERE t.t_k1 = u.u_k1) FROM t" },
3696+ };
3697+
3698+ for (const auto & testCase : testCases) {
3699+ SCOPED_TRACE (
3700+ fmt::format (" joinType: {}" , static_cast <int >(testCase.joinType )));
3701+
3702+ HashJoinBuilder (*pool_, duckDbQueryRunner_, driverExecutor_.get ())
3703+ .numDrivers (numDrivers_)
3704+ .parallelizeJoinBuildRows (parallelBuildSideRowsEnabled_)
3705+ .probeKeys ({" t_k1" })
3706+ .probeVectors (std::vector<RowVectorPtr>(probeVectors))
3707+ .buildKeys ({" u_k1" })
3708+ .buildVectors (std::vector<RowVectorPtr>(buildVectors))
3709+ .joinType (testCase.joinType )
3710+ .joinFilter (testCase.filter )
3711+ .joinOutputLayout (std::vector<std::string>(testCase.outputColumns ))
3712+ .referenceQuery (testCase.referenceQuery )
3713+ .run ();
3714+ }
3715+ }
3716+
36393717VELOX_INSTANTIATE_TEST_SUITE_P (
36403718 MultiThreadedHashJoinTest,
36413719 MultiThreadedHashJoinTest,
0 commit comments