-
Notifications
You must be signed in to change notification settings - Fork 80
[MapFromEntries] Native JNI implementation with Spark null-struct semantics #4475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
b70f95f
528f336
383a15a
f404062
846f2f5
bf974cc
4a46b36
0db1485
b9eae67
e1edb87
be86e4d
c1ead42
b2e69df
6633fb2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| /* | ||
| * Copyright (c) 2025, NVIDIA CORPORATION. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #include "cudf_jni_apis.hpp" | ||
| #include "jni_utils.hpp" | ||
| #include "map_utils.hpp" | ||
|
|
||
| extern "C" { | ||
|
|
||
| JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_MapUtils_mapFromEntries( | ||
| JNIEnv* env, jclass, jlong input_handle, jboolean throw_on_null_key) | ||
| { | ||
| JNI_NULL_CHECK(env, input_handle, "input column is null", 0); | ||
| JNI_TRY | ||
| { | ||
| cudf::jni::auto_set_device(env); | ||
| auto const& input = *reinterpret_cast<cudf::column_view const*>(input_handle); | ||
| return cudf::jni::release_as_jlong( | ||
| spark_rapids_jni::map_from_entries(input, static_cast<bool>(throw_on_null_key))); | ||
| } | ||
| JNI_CATCH(env, 0); | ||
| } | ||
|
|
||
| } // extern "C" | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,152 @@ | ||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||
| * Copyright (c) 2025, NVIDIA CORPORATION. | ||||||||||||||||||||||||||||||
|
thirtiseven marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||||||
| * you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||||||
| * You may obtain a copy of the License at | ||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| #include "map_utils.hpp" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| #include <cudf/column/column.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/column/column_factories.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/copying.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/lists/contains.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/lists/lists_column_view.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/reduction.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/scalar/scalar.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/transform.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/unary.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/utilities/default_stream.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/utilities/error.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/utilities/memory_resource.hpp> | ||||||||||||||||||||||||||||||
| #include <cudf/utilities/span.hpp> | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| namespace spark_rapids_jni { | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| std::unique_ptr<cudf::column> map_from_entries(cudf::column_view const& input, | ||||||||||||||||||||||||||||||
| bool throw_on_null_key, | ||||||||||||||||||||||||||||||
| rmm::cuda_stream_view stream, | ||||||||||||||||||||||||||||||
| rmm::device_async_resource_ref mr) | ||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||
| CUDF_EXPECTS(input.type().id() == cudf::type_id::LIST, | ||||||||||||||||||||||||||||||
| "map_from_entries: input must be a LIST column"); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if (input.size() == 0) { return cudf::make_empty_column(input.type()); } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| auto const lists_cv = cudf::lists_column_view(input); | ||||||||||||||||||||||||||||||
| auto const structs = lists_cv.child(); | ||||||||||||||||||||||||||||||
| CUDF_EXPECTS(structs.type().id() == cudf::type_id::STRUCT, | ||||||||||||||||||||||||||||||
| "map_from_entries: list child must be a STRUCT column"); | ||||||||||||||||||||||||||||||
| CUDF_EXPECTS(structs.num_children() >= 1, | ||||||||||||||||||||||||||||||
| "map_from_entries: struct must have at least one child column (KEY)"); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Step 1: Per-row flag — does row i contain any null struct entry? | ||||||||||||||||||||||||||||||
| // contains_nulls returns BOOL8, size = input.size(). | ||||||||||||||||||||||||||||||
| // A null outer row itself yields null in has_null_entry; copy_if_else handles that correctly. | ||||||||||||||||||||||||||||||
| auto has_null_entry = cudf::lists::contains_nulls(lists_cv, stream, mr); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Fast path: no null struct entries anywhere — simple global null-key check. | ||||||||||||||||||||||||||||||
| auto any_null_entry_scalar = cudf::reduce( | ||||||||||||||||||||||||||||||
| *has_null_entry, | ||||||||||||||||||||||||||||||
| *cudf::make_any_aggregation<cudf::reduce_aggregation>(), | ||||||||||||||||||||||||||||||
| cudf::data_type{cudf::type_id::BOOL8}, | ||||||||||||||||||||||||||||||
| stream, | ||||||||||||||||||||||||||||||
| mr); | ||||||||||||||||||||||||||||||
| bool const any_null_entry = | ||||||||||||||||||||||||||||||
| any_null_entry_scalar->is_valid(stream) && | ||||||||||||||||||||||||||||||
| static_cast<cudf::numeric_scalar<bool>*>(any_null_entry_scalar.get())->value(stream); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if (!any_null_entry) { | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
| // All struct entries are valid. Any null key in the flat key column is a real null key. | ||||||||||||||||||||||||||||||
| auto const keys = structs.child(0); | ||||||||||||||||||||||||||||||
| if (throw_on_null_key && keys.null_count(stream) > 0) { | ||||||||||||||||||||||||||||||
| throw cudf::logic_error("Cannot use null as map key."); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return std::make_unique<cudf::column>(input, stream, mr); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Slow path: at least one row contains a null struct entry. | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| // CPU semantics: if a row's array has any null struct entry the entire output row is null, | ||||||||||||||||||||||||||||||
| // regardless of whether another entry in that row also has a null key. We must therefore | ||||||||||||||||||||||||||||||
| // throw "Cannot use null as map key" only for rows that satisfy BOTH: | ||||||||||||||||||||||||||||||
| // (a) the row has NO null struct entry (has_null_entry = false), AND | ||||||||||||||||||||||||||||||
| // (b) at least one entry's key is null inside a valid (non-null) struct. | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| // Per-entry boolean: null_key_in_valid[j] = key_is_null[j] AND struct_is_valid[j] | ||||||||||||||||||||||||||||||
| auto const keys = structs.child(0); | ||||||||||||||||||||||||||||||
| auto key_is_null = cudf::is_null(keys, stream, mr); // flat BOOL8 | ||||||||||||||||||||||||||||||
| auto struct_is_null = cudf::is_null(structs, stream, mr); // flat BOOL8 | ||||||||||||||||||||||||||||||
| auto struct_is_valid = cudf::unary_operation( | ||||||||||||||||||||||||||||||
| *struct_is_null, cudf::unary_operator::NOT, stream, mr); // flat BOOL8 | ||||||||||||||||||||||||||||||
| auto null_key_in_valid = cudf::binary_operation( | ||||||||||||||||||||||||||||||
| *key_is_null, | ||||||||||||||||||||||||||||||
| *struct_is_valid, | ||||||||||||||||||||||||||||||
| cudf::binary_operator::BITWISE_AND, | ||||||||||||||||||||||||||||||
| cudf::data_type{cudf::type_id::BOOL8}, | ||||||||||||||||||||||||||||||
| stream, | ||||||||||||||||||||||||||||||
| mr); | ||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Same applies to the Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed — changed to |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Reduce per-list: does this row contain any entry where null_key_in_valid = true? | ||||||||||||||||||||||||||||||
| // segmented_reduce(max) over the flat boolean values using the list offsets as boundaries. | ||||||||||||||||||||||||||||||
| // is_null() always returns a fully-valid boolean column, so null_key_in_valid has no nulls; | ||||||||||||||||||||||||||||||
| // EXCLUDE null_policy only affects empty-list rows (which yield a null result, safely | ||||||||||||||||||||||||||||||
| // treated as false in the AND below). | ||||||||||||||||||||||||||||||
| auto const offsets_col = lists_cv.offsets(); | ||||||||||||||||||||||||||||||
| auto const offsets_span = cudf::device_span<cudf::size_type const>( | ||||||||||||||||||||||||||||||
| offsets_col.data<cudf::size_type>(), offsets_col.size()); | ||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed — the code now uses
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed — |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| auto row_has_null_key = cudf::segmented_reduce( | ||||||||||||||||||||||||||||||
| *null_key_in_valid, | ||||||||||||||||||||||||||||||
| offsets_span, | ||||||||||||||||||||||||||||||
| *cudf::make_max_aggregation<cudf::segmented_reduce_aggregation>(), | ||||||||||||||||||||||||||||||
| cudf::data_type{cudf::type_id::BOOL8}, | ||||||||||||||||||||||||||||||
| cudf::null_policy::EXCLUDE, | ||||||||||||||||||||||||||||||
| stream, | ||||||||||||||||||||||||||||||
| mr); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Throw only when: row has no null struct entry AND row has a null key in a valid struct. | ||||||||||||||||||||||||||||||
| // For rows with null struct entries (has_null_entry = true), the whole output row is masked | ||||||||||||||||||||||||||||||
| // to null below, so their null keys are irrelevant — no exception should be thrown for them. | ||||||||||||||||||||||||||||||
| if (throw_on_null_key) { | ||||||||||||||||||||||||||||||
| auto no_null_entry = cudf::unary_operation( | ||||||||||||||||||||||||||||||
| *has_null_entry, cudf::unary_operator::NOT, stream, mr); | ||||||||||||||||||||||||||||||
| // NULL AND anything = NULL; reduce(any) skips nulls, so null rows are safely ignored. | ||||||||||||||||||||||||||||||
| auto should_throw = cudf::binary_operation( | ||||||||||||||||||||||||||||||
| *no_null_entry, | ||||||||||||||||||||||||||||||
| *row_has_null_key, | ||||||||||||||||||||||||||||||
| cudf::binary_operator::BITWISE_AND, | ||||||||||||||||||||||||||||||
| cudf::data_type{cudf::type_id::BOOL8}, | ||||||||||||||||||||||||||||||
| stream, | ||||||||||||||||||||||||||||||
| mr); | ||||||||||||||||||||||||||||||
| auto any_throw_scalar = cudf::reduce( | ||||||||||||||||||||||||||||||
| *should_throw, | ||||||||||||||||||||||||||||||
| *cudf::make_any_aggregation<cudf::reduce_aggregation>(), | ||||||||||||||||||||||||||||||
| cudf::data_type{cudf::type_id::BOOL8}, | ||||||||||||||||||||||||||||||
| stream, | ||||||||||||||||||||||||||||||
| mr); | ||||||||||||||||||||||||||||||
| bool const any_throw = | ||||||||||||||||||||||||||||||
| any_throw_scalar->is_valid(stream) && | ||||||||||||||||||||||||||||||
| static_cast<cudf::numeric_scalar<bool>*>(any_throw_scalar.get())->value(stream); | ||||||||||||||||||||||||||||||
| if (any_throw) { throw cudf::logic_error("Cannot use null as map key."); } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Null-mask rows that contain null struct entries. | ||||||||||||||||||||||||||||||
| // copy_if_else(lhs=null_scalar, rhs=input, mask=has_null_entry): | ||||||||||||||||||||||||||||||
| // mask[i] = true → null_scalar (row had a null struct entry → output null row) | ||||||||||||||||||||||||||||||
| // mask[i] = false → input[i] (row was fine → keep original) | ||||||||||||||||||||||||||||||
| // mask[i] = null → input[i] (outer null row stays null via input[i]) | ||||||||||||||||||||||||||||||
| auto null_scalar = cudf::make_default_constructed_scalar(input.type(), stream, mr); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
| auto null_scalar = cudf::make_default_constructed_scalar(input.type(), stream, mr); | |
| auto null_scalar = cudf::make_default_constructed_scalar(input.type(), stream, mr); | |
| null_scalar->set_valid_async(false, stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code path no longer exists. The current slow-path implementation does not use make_default_constructed_scalar or copy_if_else at all — it builds the null mask directly via bools_to_mask + bitmask_and + purge_nonempty_nulls.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| /* | ||
| * Copyright (c) 2025, NVIDIA CORPORATION. | ||
|
thirtiseven marked this conversation as resolved.
Outdated
|
||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <cudf/column/column.hpp> | ||
| #include <cudf/column/column_view.hpp> | ||
| #include <cudf/utilities/default_stream.hpp> | ||
| #include <cudf/utilities/memory_resource.hpp> | ||
|
|
||
| #include <rmm/cuda_stream_view.hpp> | ||
|
|
||
| namespace spark_rapids_jni { | ||
|
|
||
| /** | ||
| * @brief Converts a LIST(STRUCT(KEY, VALUE)) column to a map column following Spark semantics. | ||
| * | ||
| * Spark semantics for map_from_entries: | ||
| * - If a row's array contains any null struct entry (the whole struct is null), the output row | ||
| * is null — even if another entry in that same row has a null key. | ||
| * - If a row's array contains no null struct entry but does contain a null key inside a valid | ||
| * struct, behavior depends on throw_on_null_key: | ||
| * - true → throws a logic_error ("Cannot use null as map key.") | ||
| * - false → returns the row as-is (caller is responsible for deduplication policy) | ||
| * | ||
| * This function only handles null-struct masking and null-key validation. | ||
| * Duplicate-key deduplication is left to the caller. | ||
| * | ||
| * @param input Input LIST(STRUCT(KEY, VALUE)) column. | ||
| * @param throw_on_null_key When true, throw if any valid-struct entry has a null key. | ||
| * @param stream CUDA stream used for device memory operations and kernel launches. | ||
| * @param mr Device memory resource used to allocate the returned column's memory. | ||
| * @return A new column equal to @p input except that rows containing null struct entries are | ||
| * replaced with a null outer row. | ||
| * @throws cudf::logic_error if the input is not a LIST(STRUCT(KEY,...)) column. | ||
| * @throws cudf::logic_error if @p throw_on_null_key is true and any row (with no null struct | ||
| * entries) contains a null key inside a valid struct. | ||
| */ | ||
| std::unique_ptr<cudf::column> map_from_entries( | ||
| cudf::column_view const& input, | ||
| bool throw_on_null_key, | ||
| rmm::cuda_stream_view stream = cudf::get_default_stream(), | ||
| rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); | ||
|
|
||
| } // namespace spark_rapids_jni | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /* | ||
| * Copyright (c) 2025, NVIDIA CORPORATION. | ||
|
thirtiseven marked this conversation as resolved.
Outdated
|
||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.nvidia.spark.rapids.jni; | ||
|
|
||
| import ai.rapids.cudf.ColumnVector; | ||
| import ai.rapids.cudf.ColumnView; | ||
| import ai.rapids.cudf.NativeDepsLoader; | ||
|
|
||
| /** | ||
| * Utility APIs for map column operations that require Spark-specific semantics | ||
| * not available in the standard cuDF Java bindings. | ||
| */ | ||
| public class MapUtils { | ||
| static { | ||
| NativeDepsLoader.loadNativeDeps(); | ||
| } | ||
|
|
||
| /** | ||
| * Converts a LIST(STRUCT(KEY, VALUE)) column to a map column following Spark semantics. | ||
| * | ||
| * <p>Spark semantics for {@code map_from_entries}: | ||
| * <ul> | ||
| * <li>If a row's array contains any null struct entry (the whole struct is null), the output | ||
| * row is null — even if another entry in that same row has a null key inside a valid | ||
| * struct.</li> | ||
| * <li>If a row's array contains no null struct entry but a valid struct's key is null, | ||
| * behavior depends on {@code throwOnNullKey}: | ||
| * <ul> | ||
| * <li>{@code true} — throws a {@link RuntimeException}.</li> | ||
| * <li>{@code false} — returns the row as-is (caller handles dedup policy).</li> | ||
| * </ul> | ||
| * </li> | ||
| * </ul> | ||
| * | ||
| * <p>Duplicate-key deduplication is intentionally left to the caller so that the EXCEPTION | ||
| * and LAST_WIN policies can be applied after this function returns. | ||
| * | ||
| * @param input Input LIST(STRUCT(KEY, VALUE)) column. | ||
| * @param throwOnNullKey When {@code true}, throw if any valid-struct entry has a null key. | ||
| * @return A new column equal to {@code input} except that rows containing null struct entries | ||
| * are replaced with a null outer row. | ||
| * @throws RuntimeException if {@code throwOnNullKey} is true and any row (with no null struct | ||
| * entry) contains a null key inside a valid struct. | ||
| */ | ||
| public static ColumnVector mapFromEntries(ColumnView input, boolean throwOnNullKey) { | ||
| return new ColumnVector(mapFromEntries(input.getNativeView(), throwOnNullKey)); | ||
| } | ||
|
|
||
| private static native long mapFromEntries(long inputHandle, boolean throwOnNullKey); | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.