Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,12 +566,6 @@ class QueryConfig {
static constexpr const char* kSparkJsonIgnoreNullFields =
"spark.json_ignore_null_fields";

/// If true, collect_list aggregate function will ignore nulls in the input.
/// Defaults to true to match Spark's default behavior. Set to false to
/// include nulls (RESPECT NULLS). Introduced in Spark 4.2 (SPARK-55256).
static constexpr const char* kSparkCollectListIgnoreNulls =
"spark.collect_list.ignore_nulls";

/// The number of local parallel table writer operators per task.
static constexpr const char* kTaskWriterCount = "task_writer_count";

Expand Down Expand Up @@ -1394,10 +1388,6 @@ class QueryConfig {
return get<bool>(kSparkJsonIgnoreNullFields, true);
}

bool sparkCollectListIgnoreNulls() const {
return get<bool>(kSparkCollectListIgnoreNulls, true);
}

bool exprTrackCpuUsage() const {
return get<bool>(kExprTrackCpuUsage, false);
}
Expand Down
14 changes: 7 additions & 7 deletions velox/docs/functions/spark/aggregate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ General Aggregate Functions

``hash`` cannot be null.

.. spark:function:: collect_list(x) -> array<[same as x]>
.. spark:function:: collect_list(x [, ignoreNulls]) -> array<[same as x]>

Returns an array created from the input ``x`` elements. By default,
ignores null inputs and returns an empty array when all inputs are null.
Returns an array created from the input ``x`` elements.
When ``ignoreNulls`` is ``true`` (default), null inputs are excluded and
an empty array is returned when all inputs are null.

When the configuration property ``spark.collect_list.ignore_nulls`` is set
to ``false``, null values are included in the output array (RESPECT NULLS
behavior). In this mode, an all-null input produces an array of nulls
instead of an empty array.
When ``ignoreNulls`` is ``false`` (RESPECT NULLS), null values are included
in the output array. In this mode, an all-null input produces an array of
nulls instead of an empty array.

.. spark:function:: collect_set(x [, ignoreNulls]) -> array<[same as x]>

Expand Down
6 changes: 4 additions & 2 deletions velox/exec/SimpleAggregateAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -762,10 +762,12 @@ class SimpleAggregateAdapter : public Aggregate {
}
}

protected:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The member order should be protected and then private

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making fn_ protected exposes the raw unique_ptr to all SimpleAggregateAdapter subclasses across the codebase. Currently only CollectListAdapter needs it. Would a protected accessor like FUNC& fn() { return *fn_; } be a tighter API contract? That way subclasses can access the function object without being able to reset/move the unique_ptr itself.

Not a blocker — just a suggestion for encapsulation.

std::unique_ptr<FUNC> fn_;

private:
std::vector<DecodedVector> inputDecoded_;
DecodedVector intermediateDecoded_;

std::unique_ptr<FUNC> fn_;
};

} // namespace facebook::velox::exec
37 changes: 26 additions & 11 deletions velox/functions/sparksql/aggregates/CollectListAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/exec/SimpleAggregateAdapter.h"
#include "velox/functions/lib/aggregates/ValueList.h"
#include "velox/vector/ConstantVector.h"

using namespace facebook::velox::aggregate;
using namespace facebook::velox::exec;
Expand All @@ -44,14 +45,6 @@
// aggregation uses the accumulator path, which correctly respects the config.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment references "config" twice, but ignoreNulls_ is no longer read from QueryConfig — it now comes from the constant boolean argument via setConstantInputs(). Please update the wording, e.g.:

// NOTE: toIntermediate() was intentionally removed because it is static and
// cannot access the runtime ignoreNulls_ flag. Without it, partial
// aggregation uses the accumulator path, which correctly respects the flag.

bool ignoreNulls_{true};

void initialize(
core::AggregationNode::Step /*step*/,
const std::vector<TypePtr>& /*argTypes*/,
const TypePtr& /*resultType*/,
const core::QueryConfig& config) {
ignoreNulls_ = config.sparkCollectListIgnoreNulls();
}

struct AccumulatorType {
ValueList elements_;

Expand Down Expand Up @@ -114,16 +107,40 @@
};
};

// Adapter that overrides setConstantInputs to read the ignoreNulls flag.
class CollectListAdapter : public SimpleAggregateAdapter<CollectListAggregate> {
public:
using SimpleAggregateAdapter<CollectListAggregate>::SimpleAggregateAdapter;

void setConstantInputs(
const std::vector<VectorPtr>& constantInputs) override {

Check warning on line 116 in velox/functions/sparksql/aggregates/CollectListAggregate.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "facebook::velox::VectorPtr" is directly included

Check warning on line 116 in velox/functions/sparksql/aggregates/CollectListAggregate.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "std::vector" is directly included
if (constantInputs.size() >= 2 && constantInputs[1] != nullptr &&
!constantInputs[1]->isNullAt(0)) {
fn_->ignoreNulls_ =
constantInputs[1]->as<ConstantVector<bool>>()->valueAt(0);
}
}
};

AggregateRegistrationResult registerCollectList(

Check warning on line 125 in velox/functions/sparksql/aggregates/CollectListAggregate.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "facebook::velox::exec::AggregateRegistrationResult" is directly included
const std::string& name,
bool withCompanionFunctions,
bool overwrite) {
std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures{
// collect_list(E) -> array(E): default ignoreNulls=true.
exec::AggregateFunctionSignatureBuilder()
.typeVariable("E")
.returnType("array(E)")
.intermediateType("array(E)")
.argumentType("E")
.build(),
// collect_list(E, ignoreNulls) -> array(E): explicit flag.
exec::AggregateFunctionSignatureBuilder()
.typeVariable("E")
.returnType("array(E)")
.intermediateType("array(E)")
.argumentType("E")
.constantArgumentType("boolean")
.build()};
return exec::registerAggregateFunction(
name,
Expand All @@ -133,9 +150,7 @@
const std::vector<TypePtr>& argTypes,
const TypePtr& resultType,
const core::QueryConfig& config) -> std::unique_ptr<exec::Aggregate> {
VELOX_CHECK_EQ(
argTypes.size(), 1, "{} takes at most one argument", name);
return std::make_unique<SimpleAggregateAdapter<CollectListAggregate>>(
return std::make_unique<CollectListAdapter>(

Check warning on line 153 in velox/functions/sparksql/aggregates/CollectListAggregate.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "std::make_unique" is directly included
step, argTypes, resultType, &config);
},
withCompanionFunctions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,28 @@ TEST_F(CollectListAggregateTest, allNullsInput) {
{});
}

std::unordered_map<std::string, std::string> makeConfig(bool ignoreNulls) {
return {{"spark.collect_list.ignore_nulls", ignoreNulls ? "true" : "false"}};
TEST_F(CollectListAggregateTest, explicitIgnoreNullsTrue) {
// 2-arg form with ignoreNulls=true should behave same as 1-arg.
auto input = makeRowVector({makeNullableFlatVector<int32_t>(
{1, 2, std::nullopt, 4, std::nullopt, 6})});
auto expected =
makeRowVector({makeArrayVectorFromJson<int32_t>({"[1, 2, 4, 6]"})});
testAggregations(
{input},
{},
{"spark_collect_list(c0, true)"},
{"array_sort(a0)"},
{expected});
}

TEST_F(CollectListAggregateTest, respectNulls) {
// When ignoreNulls is false (RESPECT NULLS), nulls should be included.
// 2-arg form with ignoreNulls=false (RESPECT NULLS).
auto input = makeRowVector({makeNullableFlatVector<int32_t>(
{1, 2, std::nullopt, 4, std::nullopt, 6})});
auto expected = makeRowVector({makeNullableArrayVector<int32_t>(
std::vector<std::vector<std::optional<int32_t>>>{
{1, 2, std::nullopt, 4, std::nullopt, 6}})});
std::vector<RowVectorPtr> expectedResult{expected};
testAggregations(
{input},
{},
{"spark_collect_list(c0)"},
expectedResult,
makeConfig(false));
testAggregations({input}, {}, {"spark_collect_list(c0, false)"}, {expected});
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a test that verifies the constant boolean false (RESPECT NULLS) works correctly through partial → intermediate → final aggregation stages. testAggregations() does cover multiple modes internally, but an explicit streaming/split test would increase confidence that setConstantInputs() propagates correctly across stages.

}

TEST_F(CollectListAggregateTest, respectNullsGroupBy) {
Expand All @@ -153,30 +157,20 @@ TEST_F(CollectListAggregateTest, respectNullsGroupBy) {
makeNullableArrayVector<int64_t>(
std::vector<std::vector<std::optional<int64_t>>>{
{std::nullopt, 1}, {2, std::nullopt, 3}})});
std::vector<RowVectorPtr> expectedResult{expected};
testAggregations(
{data},
{"c0"},
{"spark_collect_list(c1)"},
{"spark_collect_list(c1, false)"},
{"c0", "a0"},
expectedResult,
makeConfig(false));
{expected});
}

TEST_F(CollectListAggregateTest, respectNullsAllNulls) {
// When all inputs are null and ignoreNulls is false, output should be an
// array of nulls (not an empty array).
auto input = makeRowVector({makeAllNullFlatVector<int32_t>(3)});
auto expected = makeRowVector({makeNullableArrayVector<int32_t>(
std::vector<std::vector<std::optional<int32_t>>>{
{std::nullopt, std::nullopt, std::nullopt}})});
std::vector<RowVectorPtr> expectedResult{expected};
testAggregations(
{input},
{},
{"spark_collect_list(c0)"},
expectedResult,
makeConfig(false));
testAggregations({input}, {}, {"spark_collect_list(c0, false)"}, {expected});
}
} // namespace
} // namespace facebook::velox::functions::aggregate::sparksql::test
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ int main(int argc, char** argv) {
// Velox registers a 2-arg collect_set(T, boolean) signature that Spark
// doesn't support. The fuzzer may pick this signature and fail.
"collect_set",
// Same as collect_set — 2-arg signature not supported by Spark.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "2-arg signature not supported by Spark" is slightly misleading — Spark 4.0+ does support RESPECT NULLS / IGNORE NULLS for collect_list (SPARK-55256). The real reason for skipping is that the fuzzer can't generate the constant boolean argument. Consider:

// Fuzzer may pick the 2-arg (T, boolean) signature which requires
// a constant boolean that the fuzzer cannot generate.
"collect_list",

Same applies to the collect_set comment above.

"collect_list",
"first_ignore_null",
"last_ignore_null",
"regr_replacement",
Expand Down
Loading