Skip to content

Commit 5dcd5f6

Browse files
Reorg a bit (#309)
1 parent bc127ad commit 5dcd5f6

3 files changed

Lines changed: 264 additions & 1 deletion

File tree

crates/datafusion-udfs-wasm/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,17 @@ tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] }
1717
[features]
1818
serde = ["dep:serde"]
1919

20+
[profile.bench]
21+
debug = true
22+
2023
[[bench]]
2124
harness = false
2225
name = "wasm_startup"
2326

2427
[[bench]]
2528
harness = false
26-
name = "query_performance"
29+
name = "row_query_performance"
30+
31+
[[bench]]
32+
harness = false
33+
name = "arrow_ipc_query_performance"
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
21+
use datafusion::arrow::datatypes::{Field, Schema};
22+
use datafusion::arrow::record_batch::RecordBatch;
23+
use datafusion::arrow::{array::Int64Array, datatypes::DataType};
24+
use datafusion::prelude::*;
25+
use datafusion_udfs_wasm::{try_create_wasm_udf, WasmInputDataType, WasmUdfDetails};
26+
use tokio::runtime::Runtime;
27+
28+
/// Generate test data with specified number of rows
29+
async fn create_test_table(
30+
ctx: &SessionContext,
31+
row_count: usize,
32+
) -> Result<(), datafusion::error::DataFusionError> {
33+
// Create arrays for our data
34+
let mut col1_data = Vec::with_capacity(row_count);
35+
let mut col2_data = Vec::with_capacity(row_count);
36+
37+
for i in 0..row_count {
38+
col1_data.push(i as i64);
39+
col2_data.push((i + 1) as i64);
40+
}
41+
42+
// Create Arrow arrays
43+
let col1_array = Int64Array::from(col1_data);
44+
let col2_array = Int64Array::from(col2_data);
45+
46+
// Define schema
47+
let schema = Arc::new(Schema::new(vec![
48+
Field::new("column1", DataType::Int64, false),
49+
Field::new("column2", DataType::Int64, false),
50+
]));
51+
52+
// Create RecordBatch
53+
let batch = RecordBatch::try_new(
54+
schema.clone(),
55+
vec![Arc::new(col1_array), Arc::new(col2_array)],
56+
)?;
57+
58+
// Register batch as table
59+
ctx.register_batch("test", batch)?;
60+
61+
Ok(())
62+
}
63+
64+
/// Register an Arrow IPC-based WASM UDF
65+
fn register_arrow_ipc_udf(ctx: &SessionContext) -> Result<(), datafusion::error::DataFusionError> {
66+
let wasm_bytes =
67+
std::fs::read("test-wasm/wasm_examples.wasm").expect("Failed to read WASM file");
68+
let input_types = vec![DataType::Int64, DataType::Int64];
69+
let return_type = DataType::Int64;
70+
let udf_details = WasmUdfDetails::new(
71+
"arrow_func".to_string(),
72+
input_types,
73+
return_type,
74+
WasmInputDataType::ArrowIpc,
75+
);
76+
let udf = try_create_wasm_udf(&wasm_bytes, udf_details)?;
77+
ctx.register_udf(udf);
78+
Ok(())
79+
}
80+
81+
/// Execute a query using the Arrow IPC-based WASM UDF
82+
async fn execute_arrow_ipc_udf_query(
83+
ctx: &SessionContext,
84+
) -> Result<Vec<RecordBatch>, datafusion::error::DataFusionError> {
85+
let query = "SELECT *, arrow_func(column1, column2) AS result FROM test";
86+
ctx.sql(query).await?.collect().await
87+
}
88+
89+
/// Benchmark Arrow IPC-based WASM UDF with different dataset sizes
90+
fn bench_arrow_ipc_udf(c: &mut Criterion) {
91+
let mut group = c.benchmark_group("arrow_ipc_udf_query");
92+
93+
for size in [1000, 10000, 100000].iter() {
94+
group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
95+
// Create a shared runtime outside the benchmark loop
96+
let rt = Runtime::new().unwrap();
97+
98+
b.iter(|| {
99+
rt.block_on(async {
100+
let ctx = SessionContext::new();
101+
102+
// Create test data
103+
create_test_table(&ctx, size).await.unwrap();
104+
105+
// Register UDF
106+
register_arrow_ipc_udf(&ctx).unwrap();
107+
108+
// Execute query
109+
execute_arrow_ipc_udf_query(&ctx).await.unwrap()
110+
})
111+
});
112+
});
113+
}
114+
115+
group.finish();
116+
}
117+
118+
// Configure Criterion to run our benchmarks with optimized settings
119+
fn bench_config() -> Criterion {
120+
Criterion::default()
121+
}
122+
123+
criterion_group! {
124+
name = benches;
125+
config = bench_config();
126+
targets = bench_arrow_ipc_udf
127+
}
128+
criterion_main!(benches);
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
21+
use datafusion::arrow::datatypes::{Field, Schema};
22+
use datafusion::arrow::record_batch::RecordBatch;
23+
use datafusion::arrow::{array::Int64Array, datatypes::DataType};
24+
use datafusion::prelude::*;
25+
use datafusion_udfs_wasm::{try_create_wasm_udf, WasmInputDataType, WasmUdfDetails};
26+
use tokio::runtime::Runtime;
27+
28+
/// Generate test data with specified number of rows
29+
async fn create_test_table(
30+
ctx: &SessionContext,
31+
row_count: usize,
32+
) -> Result<(), datafusion::error::DataFusionError> {
33+
// Create arrays for our data
34+
let mut col1_data = Vec::with_capacity(row_count);
35+
let mut col2_data = Vec::with_capacity(row_count);
36+
37+
for i in 0..row_count {
38+
col1_data.push(i as i64);
39+
col2_data.push((i + 1) as i64);
40+
}
41+
42+
// Create Arrow arrays
43+
let col1_array = Int64Array::from(col1_data);
44+
let col2_array = Int64Array::from(col2_data);
45+
46+
// Define schema
47+
let schema = Arc::new(Schema::new(vec![
48+
Field::new("column1", DataType::Int64, false),
49+
Field::new("column2", DataType::Int64, false),
50+
]));
51+
52+
// Create RecordBatch
53+
let batch = RecordBatch::try_new(
54+
schema.clone(),
55+
vec![Arc::new(col1_array), Arc::new(col2_array)],
56+
)?;
57+
58+
// Register batch as table
59+
ctx.register_batch("test", batch)?;
60+
61+
Ok(())
62+
}
63+
64+
/// Register a row-based WASM UDF
65+
fn register_row_udf(ctx: &SessionContext) -> Result<(), datafusion::error::DataFusionError> {
66+
let wasm_bytes =
67+
std::fs::read("test-wasm/basic_wasm_example.wasm").expect("Failed to read WASM file");
68+
let input_types = vec![DataType::Int64, DataType::Int64];
69+
let return_type = DataType::Int64;
70+
let udf_details = WasmUdfDetails::new(
71+
"wasm_add".to_string(),
72+
input_types,
73+
return_type,
74+
WasmInputDataType::Row,
75+
);
76+
let udf = try_create_wasm_udf(&wasm_bytes, udf_details)?;
77+
ctx.register_udf(udf);
78+
Ok(())
79+
}
80+
81+
/// Execute a query using the row-based WASM UDF
82+
async fn execute_row_udf_query(
83+
ctx: &SessionContext,
84+
) -> Result<Vec<RecordBatch>, datafusion::error::DataFusionError> {
85+
let query = "SELECT *, wasm_add(column1, column2) AS result FROM test";
86+
ctx.sql(query).await?.collect().await
87+
}
88+
89+
/// Benchmark row-based WASM UDF with different dataset sizes
90+
fn bench_row_udf(c: &mut Criterion) {
91+
let mut group = c.benchmark_group("row_udf_query");
92+
93+
for size in [1000, 10000, 100000].iter() {
94+
group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| {
95+
// Create a shared runtime outside the benchmark loop
96+
let rt = Runtime::new().unwrap();
97+
98+
b.iter(|| {
99+
rt.block_on(async {
100+
let ctx = SessionContext::new();
101+
102+
// Create test data
103+
create_test_table(&ctx, size).await.unwrap();
104+
105+
// Register UDF
106+
register_row_udf(&ctx).unwrap();
107+
108+
// Execute query
109+
execute_row_udf_query(&ctx).await.unwrap()
110+
})
111+
});
112+
});
113+
}
114+
115+
group.finish();
116+
}
117+
118+
// Configure Criterion to run our benchmarks with optimized settings
119+
fn bench_config() -> Criterion {
120+
Criterion::default()
121+
}
122+
123+
criterion_group! {
124+
name = benches;
125+
config = bench_config();
126+
targets = bench_row_udf
127+
}
128+
criterion_main!(benches);

0 commit comments

Comments
 (0)