Skip to content

Commit 2a266f6

Browse files
authored
perf: use bulk-NULL semantics in split and substring, skip Vec allocation in split (#4403)
1 parent 7cd0aaf commit 2a266f6

2 files changed

Lines changed: 144 additions & 126 deletions

File tree

native/spark-expr/src/string_funcs/split.rs

Lines changed: 80 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::array::{Array, ArrayRef, GenericStringArray, ListArray};
18+
use arrow::array::{
19+
Array, ArrayBuilder, ArrayRef, GenericListArray, GenericStringArray, GenericStringBuilder,
20+
ListArray, OffsetSizeTrait,
21+
};
22+
use arrow::buffer::OffsetBuffer;
1923
use arrow::datatypes::{DataType, Field};
2024
use datafusion::common::{
2125
cast::as_generic_string_array, exec_err, DataFusionError, Result as DataFusionResult,
@@ -115,89 +119,99 @@ fn split_array(
115119
DataFusionError::Execution(format!("Invalid regex pattern '{}': {}", pattern, e))
116120
})?;
117121

118-
let string_array = match string_array.data_type() {
119-
DataType::Utf8 => as_generic_string_array::<i32>(string_array)?,
120-
DataType::LargeUtf8 => {
121-
// Convert LargeUtf8 to Utf8 for processing
122-
let large_array = as_generic_string_array::<i64>(string_array)?;
123-
return split_large_string_array(large_array, &regex, limit);
122+
match string_array.data_type() {
123+
DataType::Utf8 => {
124+
split_generic::<i32>(as_generic_string_array::<i32>(string_array)?, &regex, limit)
124125
}
125-
_ => {
126-
return exec_err!(
127-
"split expects Utf8 or LargeUtf8 string array, got {:?}",
128-
string_array.data_type()
129-
);
126+
DataType::LargeUtf8 => {
127+
split_generic::<i64>(as_generic_string_array::<i64>(string_array)?, &regex, limit)
130128
}
131-
};
129+
_ => exec_err!(
130+
"split expects Utf8 or LargeUtf8 string array, got {:?}",
131+
string_array.data_type()
132+
),
133+
}
134+
}
132135

133-
// Build the result ListArray
134-
let mut offsets: Vec<i32> = Vec::with_capacity(string_array.len() + 1);
135-
let mut values: Vec<String> = Vec::new();
136-
let mut null_buffer_builder = arrow::array::BooleanBufferBuilder::new(string_array.len());
137-
offsets.push(0);
138-
139-
for i in 0..string_array.len() {
140-
if string_array.is_null(i) {
141-
// NULL input produces NULL in result (Spark behavior)
142-
offsets.push(offsets[i]);
143-
null_buffer_builder.append(false); // false = NULL
144-
} else {
145-
let string_val = string_array.value(i);
146-
let parts = split_with_regex(string_val, &regex, limit);
147-
values.extend(parts);
148-
offsets.push(values.len() as i32);
149-
null_buffer_builder.append(true); // true = valid
136+
fn split_generic<O: OffsetSizeTrait>(
137+
string_array: &GenericStringArray<O>,
138+
regex: &Regex,
139+
limit: i32,
140+
) -> DataFusionResult<ColumnarValue> {
141+
let len = string_array.len();
142+
let mut offsets: Vec<O> = Vec::with_capacity(len + 1);
143+
let mut values_builder = GenericStringBuilder::<O>::new();
144+
offsets.push(O::usize_as(0));
145+
146+
// Bulk-NULL: output null mask equals input's, so reuse it instead of
147+
// tracking per-row in a NullBufferBuilder. Null rows contribute no parts
148+
// (offset does not advance) and the cloned NullBuffer marks them.
149+
for i in 0..len {
150+
if !string_array.is_null(i) {
151+
let s = string_array.value(i);
152+
push_split_parts(s, regex, limit, &mut values_builder);
150153
}
154+
offsets.push(O::usize_as(values_builder.len()));
151155
}
152156

153-
let values_array = Arc::new(GenericStringArray::<i32>::from(values)) as ArrayRef;
154-
let field = Arc::new(Field::new("item", DataType::Utf8, false));
155-
let nulls = arrow::buffer::NullBuffer::new(null_buffer_builder.finish());
156-
let list_array = ListArray::new(
157+
let values_array = Arc::new(values_builder.finish()) as ArrayRef;
158+
let item_type = if O::IS_LARGE {
159+
DataType::LargeUtf8
160+
} else {
161+
DataType::Utf8
162+
};
163+
let field = Arc::new(Field::new("item", item_type, false));
164+
let list_array = GenericListArray::<O>::new(
157165
field,
158-
arrow::buffer::OffsetBuffer::new(offsets.into()),
166+
OffsetBuffer::new(offsets.into()),
159167
values_array,
160-
Some(nulls),
168+
string_array.nulls().cloned(),
161169
);
162170

163171
Ok(ColumnarValue::Array(Arc::new(list_array)))
164172
}
165173

166-
fn split_large_string_array(
167-
string_array: &GenericStringArray<i64>,
174+
/// Push the splits of `string` into `builder`. Avoids materializing an
175+
/// intermediate `Vec<String>` — appends each `&str` slice from the regex
176+
/// iterator directly (the builder copies into its own buffer).
177+
fn push_split_parts<O: OffsetSizeTrait>(
178+
string: &str,
168179
regex: &Regex,
169180
limit: i32,
170-
) -> DataFusionResult<ColumnarValue> {
171-
let mut offsets: Vec<i32> = Vec::with_capacity(string_array.len() + 1);
172-
let mut values: Vec<String> = Vec::new();
173-
let mut null_buffer_builder = arrow::array::BooleanBufferBuilder::new(string_array.len());
174-
offsets.push(0);
175-
176-
for i in 0..string_array.len() {
177-
if string_array.is_null(i) {
178-
// NULL input produces NULL in result (Spark behavior)
179-
offsets.push(offsets[i]);
180-
null_buffer_builder.append(false); // false = NULL
181+
builder: &mut GenericStringBuilder<O>,
182+
) {
183+
if limit == 0 {
184+
// limit = 0: split all, drop trailing empties. Need to know the end
185+
// before pushing, so collect borrowed slices first (no string copies).
186+
let mut parts: Vec<&str> = regex.split(string).collect();
187+
while parts.last().is_some_and(|s| s.is_empty()) {
188+
parts.pop();
189+
}
190+
if parts.is_empty() {
191+
builder.append_value("");
181192
} else {
182-
let string_val = string_array.value(i);
183-
let parts = split_with_regex(string_val, regex, limit);
184-
values.extend(parts);
185-
offsets.push(values.len() as i32);
186-
null_buffer_builder.append(true); // true = valid
193+
for p in parts {
194+
builder.append_value(p);
195+
}
196+
}
197+
} else if limit > 0 {
198+
// limit > 0: at most limit-1 splits.
199+
let mut last_end = 0;
200+
let cap = (limit - 1) as usize;
201+
for (count, mat) in regex.find_iter(string).enumerate() {
202+
if count >= cap {
203+
break;
204+
}
205+
builder.append_value(&string[last_end..mat.start()]);
206+
last_end = mat.end();
207+
}
208+
builder.append_value(&string[last_end..]);
209+
} else {
210+
// limit < 0: split all, keep trailing empties.
211+
for p in regex.split(string) {
212+
builder.append_value(p);
187213
}
188214
}
189-
190-
let values_array = Arc::new(GenericStringArray::<i32>::from(values)) as ArrayRef;
191-
let field = Arc::new(Field::new("item", DataType::Utf8, false));
192-
let nulls = arrow::buffer::NullBuffer::new(null_buffer_builder.finish());
193-
let list_array = ListArray::new(
194-
field,
195-
arrow::buffer::OffsetBuffer::new(offsets.into()),
196-
values_array,
197-
Some(nulls),
198-
);
199-
200-
Ok(ColumnarValue::Array(Arc::new(list_array)))
201215
}
202216

203217
fn split_string(string: &str, pattern: &str, limit: i32) -> DataFusionResult<Vec<String>> {

native/spark-expr/src/string_funcs/substring.rs

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#![allow(deprecated)]
1919

2020
use crate::kernels::strings::substring;
21-
use arrow::array::{as_dictionary_array, as_largestring_array, as_string_array, Array, ArrayRef};
21+
use arrow::array::{
22+
as_dictionary_array, as_largestring_array, as_string_array, Array, ArrayRef, GenericStringArray,
23+
};
2224
use arrow::datatypes::{DataType, Int32Type, Schema};
2325
use arrow::record_batch::RecordBatch;
2426
use datafusion::logical_expr::ColumnarValue;
@@ -128,67 +130,69 @@ fn spark_substring_negative_start(
128130
start: i64,
129131
len: u64,
130132
) -> datafusion::common::Result<ArrayRef> {
131-
use arrow::array::{
132-
BinaryArray, DictionaryArray, GenericBinaryBuilder, GenericStringBuilder, LargeBinaryArray,
133-
};
134-
135-
match array.data_type() {
136-
DataType::Utf8 => {
137-
let str_array = as_string_array(array);
138-
let mut builder = GenericStringBuilder::<i32>::new();
139-
for i in 0..str_array.len() {
140-
if str_array.is_null(i) {
141-
builder.append_null();
142-
} else {
143-
builder.append_value(spark_substr_negative(str_array.value(i), start, len));
144-
}
145-
}
146-
Ok(Arc::new(builder.finish()) as ArrayRef)
147-
}
148-
DataType::LargeUtf8 => {
149-
let str_array = as_largestring_array(array);
150-
let mut builder = GenericStringBuilder::<i64>::new();
151-
for i in 0..str_array.len() {
152-
if str_array.is_null(i) {
153-
builder.append_null();
154-
} else {
155-
builder.append_value(spark_substr_negative(str_array.value(i), start, len));
156-
}
157-
}
158-
Ok(Arc::new(builder.finish()) as ArrayRef)
133+
use arrow::array::{DictionaryArray, GenericBinaryArray, OffsetSizeTrait};
134+
135+
fn substr_str<O: OffsetSizeTrait>(
136+
str_array: &GenericStringArray<O>,
137+
start: i64,
138+
len: u64,
139+
) -> ArrayRef {
140+
use arrow::array::GenericStringBuilder;
141+
let mut builder = GenericStringBuilder::<O>::with_capacity(str_array.len(), 0);
142+
for i in 0..str_array.len() {
143+
// Always append; nulls are reattached in bulk below. This avoids
144+
// per-row NullBufferBuilder maintenance.
145+
let s = if str_array.is_null(i) {
146+
""
147+
} else {
148+
spark_substr_negative(str_array.value(i), start, len)
149+
};
150+
builder.append_value(s);
159151
}
160-
DataType::Binary => {
161-
let bin_array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
162-
let mut builder = GenericBinaryBuilder::<i32>::new();
163-
for i in 0..bin_array.len() {
164-
if bin_array.is_null(i) {
165-
builder.append_null();
166-
} else {
167-
builder.append_value(spark_binary_substr_negative(
168-
bin_array.value(i),
169-
start,
170-
len,
171-
));
172-
}
173-
}
174-
Ok(Arc::new(builder.finish()) as ArrayRef)
175-
}
176-
DataType::LargeBinary => {
177-
let bin_array = array.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
178-
let mut builder = GenericBinaryBuilder::<i64>::new();
179-
for i in 0..bin_array.len() {
180-
if bin_array.is_null(i) {
181-
builder.append_null();
182-
} else {
183-
builder.append_value(spark_binary_substr_negative(
184-
bin_array.value(i),
185-
start,
186-
len,
187-
));
188-
}
189-
}
190-
Ok(Arc::new(builder.finish()) as ArrayRef)
152+
let (offsets, values, _) = builder.finish().into_parts();
153+
Arc::new(GenericStringArray::<O>::new(
154+
offsets,
155+
values,
156+
str_array.nulls().cloned(),
157+
))
158+
}
159+
160+
fn substr_bin<O: OffsetSizeTrait>(
161+
bin_array: &GenericBinaryArray<O>,
162+
start: i64,
163+
len: u64,
164+
) -> ArrayRef {
165+
use arrow::array::GenericBinaryBuilder;
166+
let mut builder = GenericBinaryBuilder::<O>::with_capacity(bin_array.len(), 0);
167+
for i in 0..bin_array.len() {
168+
let b: &[u8] = if bin_array.is_null(i) {
169+
&[]
170+
} else {
171+
spark_binary_substr_negative(bin_array.value(i), start, len)
172+
};
173+
builder.append_value(b);
191174
}
175+
let (offsets, values, _) = builder.finish().into_parts();
176+
Arc::new(GenericBinaryArray::<O>::new(
177+
offsets,
178+
values,
179+
bin_array.nulls().cloned(),
180+
))
181+
}
182+
183+
match array.data_type() {
184+
DataType::Utf8 => Ok(substr_str::<i32>(as_string_array(array), start, len)),
185+
DataType::LargeUtf8 => Ok(substr_str::<i64>(as_largestring_array(array), start, len)),
186+
DataType::Binary => Ok(substr_bin::<i32>(
187+
array.as_any().downcast_ref().unwrap(),
188+
start,
189+
len,
190+
)),
191+
DataType::LargeBinary => Ok(substr_bin::<i64>(
192+
array.as_any().downcast_ref().unwrap(),
193+
start,
194+
len,
195+
)),
192196
DataType::Dictionary(_, _) => {
193197
let dict = as_dictionary_array::<Int32Type>(array);
194198
let values = spark_substring_negative_start(dict.values(), start, len)?;

0 commit comments

Comments
 (0)