Skip to content

Commit 749d52e

Browse files
committed
perf: [iceberg] per partition file scan task serialization
1 parent adc5013 commit 749d52e

13 files changed

Lines changed: 1244 additions & 17 deletions

File tree

docs/source/contributor-guide/iceberg_partition_optimization.md

Lines changed: 660 additions & 0 deletions
Large diffs are not rendered by default.

docs/source/contributor-guide/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Arrow FFI <ffi>
2929
JVM Shuffle <jvm_shuffle>
3030
Native Shuffle <native_shuffle>
3131
Parquet Scans <parquet_scans>
32+
Iceberg Partition Optimization <iceberg_partition_optimization>
3233
Development Guide <development>
3334
Debugging Guide <debugging>
3435
Benchmarking Guide <benchmarking>

native/core/src/execution/jni_api.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,3 +946,39 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowClose(
946946
Ok(())
947947
})
948948
}
949+
950+
#[no_mangle]
951+
/// Get Iceberg partition tasks for the current partition via JNI callback.
952+
///
953+
/// Called by native planner to retrieve partition-specific FileScanTask bytes from
954+
/// the thread-local storage on the JVM side. This enables on-demand task retrieval
955+
/// instead of broadcasting all partition tasks to all executors.
956+
///
957+
/// # Safety
958+
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
959+
pub unsafe extern "system" fn Java_org_apache_comet_Native_getIcebergPartitionTasks<'local>(
960+
mut e: JNIEnv<'local>,
961+
_class: JClass,
962+
) -> jni::sys::jobject {
963+
use jni::signature::ReturnType;
964+
965+
let jvm_classes = JVMClasses::get();
966+
967+
// Call Native.getIcebergPartitionTasksInternal() static method
968+
let result = e.call_static_method_unchecked(
969+
&jvm_classes.native.class,
970+
jvm_classes
971+
.native
972+
.method_get_iceberg_partition_tasks_internal,
973+
ReturnType::Array,
974+
&[],
975+
);
976+
977+
match result {
978+
Ok(value) => match value.l() {
979+
Ok(obj) => obj.as_raw(),
980+
Err(_) => std::ptr::null_mut(),
981+
},
982+
Err(_) => std::ptr::null_mut(),
983+
}
984+
}

native/core/src/execution/planner.rs

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,14 +1144,69 @@ impl PhysicalPlanner {
11441144
let metadata_location = scan.metadata_location.clone();
11451145

11461146
debug_assert!(
1147-
!scan.file_partitions.is_empty(),
1148-
"IcebergScan must have at least one file partition. This indicates a bug in Scala serialization."
1147+
scan.use_jni_task_retrieval || !scan.file_partitions.is_empty(),
1148+
"IcebergScan must have use_jni_task_retrieval=true OR non-empty file_partitions. This indicates a bug in Scala serialization."
11491149
);
11501150

1151-
let tasks = parse_file_scan_tasks(
1152-
scan,
1153-
&scan.file_partitions[self.partition as usize].file_scan_tasks,
1154-
)?;
1151+
// Get file scan tasks either from JNI callback or from protobuf
1152+
let file_scan_tasks = if scan.use_jni_task_retrieval {
1153+
// Call JNI to get partition-specific tasks from thread-local storage
1154+
use crate::jvm_bridge::JVMClasses;
1155+
use datafusion_comet_proto::spark_operator::IcebergFilePartition;
1156+
use jni::objects::JByteArray;
1157+
use jni::signature::ReturnType;
1158+
use prost::Message;
1159+
1160+
let mut env = JVMClasses::get_env()?;
1161+
let jvm_classes = JVMClasses::get();
1162+
1163+
// Call Native.getIcebergPartitionTasksInternal() directly
1164+
let result = unsafe {
1165+
env.call_static_method_unchecked(
1166+
&jvm_classes.native.class,
1167+
jvm_classes
1168+
.native
1169+
.method_get_iceberg_partition_tasks_internal,
1170+
ReturnType::Array,
1171+
&[],
1172+
)
1173+
};
1174+
1175+
let result = result.map_err(|e| {
1176+
ExecutionError::GeneralError(format!("JNI call failed: {}", e))
1177+
})?;
1178+
1179+
// Extract byte array from result
1180+
let task_bytes_obj = result.l().map_err(|e| {
1181+
ExecutionError::GeneralError(format!("Failed to extract JObject: {}", e))
1182+
})?;
1183+
let task_bytes_array: JByteArray = task_bytes_obj.into();
1184+
1185+
if task_bytes_array.is_null() {
1186+
return Err(ExecutionError::GeneralError(format!(
1187+
"No partition tasks found for partition {} (JNI returned null). \
1188+
This may indicate that partition tasks were not set in thread-local storage.",
1189+
self.partition
1190+
)));
1191+
}
1192+
1193+
// Convert JByteArray to Vec<u8>
1194+
let task_bytes = env.convert_byte_array(&task_bytes_array).map_err(|e| {
1195+
ExecutionError::GeneralError(format!("Failed to convert byte array: {}", e))
1196+
})?;
1197+
1198+
// Parse protobuf bytes into IcebergFilePartition
1199+
let partition = IcebergFilePartition::decode(&task_bytes[..])?;
1200+
1201+
partition.file_scan_tasks
1202+
} else {
1203+
// Use tasks from protobuf (backward compatibility)
1204+
scan.file_partitions[self.partition as usize]
1205+
.file_scan_tasks
1206+
.clone()
1207+
};
1208+
1209+
let tasks = parse_file_scan_tasks(scan, &file_scan_tasks)?;
11551210
let file_task_groups = vec![tasks];
11561211

11571212
let iceberg_scan = IcebergScanExec::new(

native/core/src/jvm_bridge/mod.rs

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,13 @@ pub use comet_exec::*;
174174
mod batch_iterator;
175175
mod comet_metric_node;
176176
mod comet_task_memory_manager;
177+
mod native;
177178

178179
use crate::{errors::CometError, JAVA_VM};
179180
use batch_iterator::CometBatchIterator;
180181
pub use comet_metric_node::*;
181182
pub use comet_task_memory_manager::*;
183+
pub use native::*;
182184

183185
/// The JVM classes that are used in the JNI calls.
184186
#[allow(dead_code)] // we need to keep references to Java items to prevent GC
@@ -207,6 +209,8 @@ pub struct JVMClasses<'a> {
207209
/// The CometTaskMemoryManager used for interacting with JVM side to
208210
/// acquire & release native memory.
209211
pub comet_task_memory_manager: CometTaskMemoryManager<'a>,
212+
/// The Native object. Used for Iceberg partition task retrieval.
213+
pub native: Native<'a>,
210214
}
211215

212216
unsafe impl Send for JVMClasses<'_> {}
@@ -254,10 +258,71 @@ impl JVMClasses<'_> {
254258
class_get_name_method,
255259
throwable_get_message_method,
256260
throwable_get_cause_method,
257-
comet_metric_node: CometMetricNode::new(env).unwrap(),
258-
comet_exec: CometExec::new(env).unwrap(),
259-
comet_batch_iterator: CometBatchIterator::new(env).unwrap(),
260-
comet_task_memory_manager: CometTaskMemoryManager::new(env).unwrap(),
261+
comet_metric_node: {
262+
eprintln!(">> Initializing CometMetricNode...");
263+
match CometMetricNode::new(env) {
264+
Ok(node) => {
265+
eprintln!(" OK: CometMetricNode initialized");
266+
node
267+
}
268+
Err(e) => {
269+
eprintln!(" ERROR: CometMetricNode failed: {:?}", e);
270+
panic!("CometMetricNode initialization failed: {:?}", e);
271+
}
272+
}
273+
},
274+
comet_exec: {
275+
eprintln!(">> Initializing CometExec...");
276+
match CometExec::new(env) {
277+
Ok(exec) => {
278+
eprintln!(" OK: CometExec initialized");
279+
exec
280+
}
281+
Err(e) => {
282+
eprintln!(" ERROR: CometExec failed: {:?}", e);
283+
panic!("CometExec initialization failed: {:?}", e);
284+
}
285+
}
286+
},
287+
comet_batch_iterator: {
288+
eprintln!(">> Initializing CometBatchIterator...");
289+
match CometBatchIterator::new(env) {
290+
Ok(iter) => {
291+
eprintln!(" OK: CometBatchIterator initialized");
292+
iter
293+
}
294+
Err(e) => {
295+
eprintln!(" ERROR: CometBatchIterator failed: {:?}", e);
296+
panic!("CometBatchIterator initialization failed: {:?}", e);
297+
}
298+
}
299+
},
300+
comet_task_memory_manager: {
301+
eprintln!(">> Initializing CometTaskMemoryManager...");
302+
match CometTaskMemoryManager::new(env) {
303+
Ok(mgr) => {
304+
eprintln!(" OK: CometTaskMemoryManager initialized");
305+
mgr
306+
}
307+
Err(e) => {
308+
eprintln!(" ERROR: CometTaskMemoryManager failed: {:?}", e);
309+
panic!("CometTaskMemoryManager initialization failed: {:?}", e);
310+
}
311+
}
312+
},
313+
native: match Native::new(env) {
314+
Ok(native) => {
315+
eprintln!("✓ Successfully initialized Native JNI class");
316+
native
317+
}
318+
Err(e) => {
319+
eprintln!("✗ PANIC: Failed to initialize Native JNI class: {:?}", e);
320+
eprintln!(" Class name: org/apache/comet/NativeJNIBridge");
321+
eprintln!(" Method: getIcebergPartitionTasksInternal");
322+
eprintln!(" Signature: ()[B");
323+
panic!("Native JNI initialization failed: {:?}", e);
324+
}
325+
},
261326
}
262327
});
263328
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use jni::{
19+
errors::Result as JniResult,
20+
objects::{JClass, JStaticMethodID},
21+
signature::ReturnType,
22+
JNIEnv,
23+
};
24+
25+
/// A struct that holds all the JNI methods and fields for JVM Native object.
26+
pub struct Native<'a> {
27+
pub class: JClass<'a>,
28+
pub method_get_iceberg_partition_tasks_internal: JStaticMethodID,
29+
pub method_get_iceberg_partition_tasks_internal_ret: ReturnType,
30+
}
31+
32+
impl<'a> Native<'a> {
33+
pub const JVM_CLASS: &'static str = "org/apache/comet/NativeJNIBridge";
34+
35+
pub fn new(env: &mut JNIEnv<'a>) -> JniResult<Native<'a>> {
36+
eprintln!("→ Initializing Native JNI class...");
37+
eprintln!(" Looking up class: {}", Self::JVM_CLASS);
38+
39+
let class = match env.find_class(Self::JVM_CLASS) {
40+
Ok(c) => {
41+
eprintln!(" ✓ Found class: {}", Self::JVM_CLASS);
42+
c
43+
}
44+
Err(e) => {
45+
eprintln!(" ✗ Failed to find class: {}", Self::JVM_CLASS);
46+
eprintln!(" Error: {:?}", e);
47+
return Err(e);
48+
}
49+
};
50+
51+
eprintln!(" Looking up method: getIcebergPartitionTasksInternal with signature ()[B");
52+
let method = match env.get_static_method_id(
53+
Self::JVM_CLASS,
54+
"getIcebergPartitionTasksInternal",
55+
"()[B",
56+
) {
57+
Ok(m) => {
58+
eprintln!(" ✓ Found method: getIcebergPartitionTasksInternal");
59+
m
60+
}
61+
Err(e) => {
62+
eprintln!(" ✗ Failed to find method: getIcebergPartitionTasksInternal");
63+
eprintln!(" Error: {:?}", e);
64+
return Err(e);
65+
}
66+
};
67+
68+
eprintln!("✓ Native JNI class initialized successfully");
69+
Ok(Native {
70+
method_get_iceberg_partition_tasks_internal: method,
71+
method_get_iceberg_partition_tasks_internal_ret: ReturnType::Array,
72+
class,
73+
})
74+
}
75+
}

native/proto/src/proto/operator.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ message IcebergScan {
164164
map<string, string> catalog_properties = 2;
165165

166166
// Pre-planned file scan tasks grouped by Spark partition
167+
// DEPRECATED when use_jni_task_retrieval=true: Tasks retrieved via JNI callback instead
167168
repeated IcebergFilePartition file_partitions = 3;
168169

169170
// Table metadata file path for FileIO initialization
@@ -178,6 +179,10 @@ message IcebergScan {
178179
repeated PartitionData partition_data_pool = 10;
179180
repeated DeleteFileList delete_files_pool = 11;
180181
repeated spark.spark_expression.Expr residual_pool = 12;
182+
183+
// When true, file_partitions is empty and tasks are retrieved via JNI callback
184+
// This optimizes network/memory usage by avoiding broadcast of all partition tasks
185+
bool use_jni_task_retrieval = 13;
181186
}
182187

183188
// Helper message for deduplicating field ID lists
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet;
21+
22+
/**
23+
* JNI bridge for accessing Scala object methods as static methods.
24+
*
25+
* <p>This class provides static methods that can be called from native code via JNI, delegating to
26+
* the Scala Native object singleton.
27+
*/
28+
public class NativeJNIBridge {
29+
30+
/**
31+
* Gets Iceberg partition tasks for the current thread (JNI-accessible static method).
32+
*
33+
* <p>This method is called by native Rust code via JNI to retrieve partition-specific tasks
34+
* during Iceberg scan execution.
35+
*
36+
* @return Serialized protobuf bytes containing IcebergFileScanTask messages, or null if not set
37+
*/
38+
public static byte[] getIcebergPartitionTasksInternal() {
39+
return Native$.MODULE$.getIcebergPartitionTasksInternal();
40+
}
41+
}

0 commit comments

Comments
 (0)