Skip to content

Commit a3f8d10

Browse files
committed
fix
1 parent 749d52e commit a3f8d10

4 files changed

Lines changed: 45 additions & 103 deletions

File tree

docs/source/contributor-guide/iceberg_partition_optimization.md

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ This document explains how the Iceberg native scan optimization ensures that **e
2626
## The Problem: Broadcasting Waste
2727

2828
### Old Approach (Before Optimization)
29+
2930
In a traditional distributed query execution:
3031

3132
1. **Driver serializes ALL partition tasks** into a protobuf message
@@ -35,12 +36,14 @@ In a traditional distributed query execution:
3536
5. **Result: 99% waste for large N**
3637

3738
### Example
39+
3840
- Table with **1000 partitions**
3941
- Each partition has **100KB of task data** (file paths, partition values, schemas, etc.)
4042
- Total task data: **100MB**
4143
- **Problem**: EVERY executor receives all 100MB, but only uses ~100KB
4244

4345
For a cluster with 100 executors:
46+
4447
- **Total network transfer**: 100 executors × 100MB = **10GB**
4548
- **Useful data**: 100 executors × 100KB = **10MB**
4649
- **Waste**: 99% of transferred data is discarded!
@@ -82,6 +85,7 @@ scan.wrapped.inputRDD match {
8285
```
8386

8487
**What happens here:**
88+
8589
1. During query planning on the **driver**, the code iterates through each Spark partition
8690
2. For each partition `i`, it extracts **only the FileScanTasks that belong to that partition**
8791
3. These tasks are serialized to protobuf bytes: `IcebergFilePartition``Array[Byte]`
@@ -120,6 +124,7 @@ class IcebergScanRDD(
120124
```
121125

122126
**What happens here:**
127+
123128
1. **Custom Partition class**: `IcebergScanPartition` carries its own `taskBytes: Array[Byte]`
124129
2. **getPartitions()**: Creates N partition objects, each with only its own task data
125130
3. **Spark's RDD serialization**: When Spark schedules tasks, it serializes the `Partition` object and sends it to the executor
@@ -130,6 +135,7 @@ class IcebergScanRDD(
130135
#### Why This Works: Spark's Task Serialization
131136

132137
Spark's task scheduling works as follows:
138+
133139
1. **Driver** calls `getPartitions()` → creates array of Partition objects
134140
2. **Scheduler** assigns tasks to executors: "Executor A: compute partition 5", "Executor B: compute partition 8", etc.
135141
3. **Task serialization**: When sending the task to an executor, Spark serializes:
@@ -176,6 +182,7 @@ if (useJniTaskRetrieval) {
176182
**What happens here:**
177183

178184
#### On the Executor (JVM side):
185+
179186
1. **Receive**: Executor receives `IcebergScanPartition(5, taskBytes)` from Spark
180187
2. **Thread-local storage**: Task bytes stored in `ThreadLocal[Array[Byte]]` via `Native.setIcebergPartitionTasks(taskBytes)`
181188
3. **Create iterator**: Native execution plan is initialized
@@ -203,6 +210,7 @@ object Native {
203210
```
204211

205212
**Why Thread-local?**
213+
206214
- Multiple tasks may run concurrently on the same executor JVM
207215
- Each task runs in its own thread
208216
- Thread-local storage ensures each task only accesses **its own partition data**
@@ -234,6 +242,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_getIcebergPartitionTa
234242
```
235243

236244
**What happens here:**
245+
237246
1. Native Iceberg planner calls `getIcebergPartitionTasks()` via JNI
238247
2. This calls back to `Native.getIcebergPartitionTasksInternal()` on JVM side
239248
3. Retrieves the `Array[Byte]` from thread-local storage
@@ -277,12 +286,14 @@ override def convertBlock(): CometNativeExec = {
277286
```
278287

279288
The `serializedPlanOpt` contains the **operator DAG structure**:
289+
280290
- Scan → Filter → Project, etc.
281291
- Schema definitions
282292
- Filter predicates
283293
- Projection columns
284294

285295
But it does **NOT** contain partition-specific FileScanTasks because:
296+
286297
1. It's created **once** on the driver
287298
2. It's **shared** by all executors
288299
3. It's the same for partition 0, partition 5, partition 1000, etc.
@@ -360,9 +371,9 @@ You might ask: **"Why not include partition-specific data in the protobuf?"**
360371

361372
If we embedded partition-specific data in protobuf, we'd need:
362373

363-
| Approach | Implications |
364-
|----------|--------------|
365-
| **Current: JNI Callback** | ✓ One shared plan protobuf<br>✓ Leverages existing Comet architecture<br>✓ Partition data via RDD (our optimization)<br>⚠ Extra JNI roundtrip (minimal overhead) |
374+
| Approach | Implications |
375+
| ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
376+
| **Current: JNI Callback** | ✓ One shared plan protobuf<br>✓ Leverages existing Comet architecture<br>✓ Partition data via RDD (our optimization)<br>⚠ Extra JNI roundtrip (minimal overhead) |
366377
| **Alternative: Embed in Protobuf** | ✗ Would need N different protobuf plans (one per partition)<br>✗ Each executor receives different protobuf<br>✗ Breaks Comet's shared plan model<br>✗ Major architectural restructuring required |
367378

368379
### The JNI Callback as a Bridge
@@ -414,6 +425,7 @@ The JNI callback overhead is **minimal** compared to the optimization benefits:
414425
- **Memory savings**: 100-200× reduction in executor memory (ongoing)
415426

416427
For a table with 10,000 partitions:
428+
417429
- JNI overhead: 10,000 partitions × 10μs = **0.1 seconds total**
418430
- Network savings: 100GB → 500MB = **99.5 GB saved**
419431
- Memory savings: 100GB → 500MB executor memory = **199.5 GB saved**
@@ -499,14 +511,17 @@ Without the JNI callback, we would have to fundamentally restructure how Comet s
499511
### Network Transfer Savings
500512

501513
**Before optimization:**
514+
502515
- Total data per executor = N × avg_task_size
503516
- Total cluster network = num_executors × N × avg_task_size
504517

505518
**After optimization:**
519+
506520
- Total data per executor = avg_tasks_per_executor × avg_task_size
507521
- Total cluster network = num_executors × avg_tasks_per_executor × avg_task_size
508522

509523
**Savings ratio:**
524+
510525
```
511526
savings = 1 - (avg_tasks_per_executor / N)
512527
```
@@ -516,18 +531,21 @@ For evenly distributed data: `avg_tasks_per_executor ≈ N / num_executors`
516531
### Example: Large Table Scan
517532

518533
**Scenario:**
534+
519535
- Table with 10,000 partitions
520536
- 200 executors
521537
- 50KB average task data per partition
522538
- Total task metadata: 10,000 × 50KB = **500MB**
523539

524540
**Before optimization:**
541+
525542
- Each executor receives: **500MB** (all partition data)
526543
- Total network transfer: 200 × 500MB = **100GB**
527544
- Each executor uses: ~50 partitions × 50KB = **2.5MB** (0.5%)
528545
- Wasted transfer: **99.5%**
529546

530547
**After optimization:**
548+
531549
- Each executor receives: ~50 × 50KB = **2.5MB** (only its partitions)
532550
- Total network transfer: 200 × 2.5MB = **500MB**
533551
- Each executor uses: **2.5MB** (100%)
@@ -536,11 +554,13 @@ For evenly distributed data: `avg_tasks_per_executor ≈ N / num_executors`
536554
### Memory Pressure Reduction
537555

538556
**Before:**
557+
539558
- Driver memory: 500MB (serialize all tasks)
540559
- Executor memory: 500MB × 200 = **100GB** across cluster
541560
- GC pressure: High (500MB objects per executor)
542561

543562
**After:**
563+
544564
- Driver memory: 500MB (same, but partitioned)
545565
- Executor memory: 2.5MB × 200 = **500MB** across cluster
546566
- GC pressure: Low (2.5MB objects per executor)
@@ -559,6 +579,7 @@ Spark's broadcast variables would still send all data to all executors. The opti
559579
**Problem**: Need to pass partition-specific data from JVM to native code during execution.
560580

561581
**Options considered:**
582+
562583
1. **Pass as function parameter**: Would require modifying the entire call chain
563584
2. **Global state**: Unsafe with concurrent tasks
564585
3. **Thread-local**: ✓ Safe, simple, minimal API changes
@@ -572,12 +593,14 @@ Spark's broadcast variables would still send all data to all executors. The opti
572593
### 4. What About Protobuf Deduplication?
573594

574595
The code still uses deduplication pools (CometIcebergNativeScan.scala:696-705) to reduce redundancy **within each partition's task data**:
596+
575597
- Schema pool
576598
- Partition spec pool
577599
- Delete files pool
578600
- etc.
579601

580602
This is **orthogonal** to the partition distribution optimization. Both work together:
603+
581604
- **Deduplication**: Reduces task data size within each partition
582605
- **Partition-specific distribution**: Ensures executors only receive their partition data
583606

@@ -586,6 +609,7 @@ This is **orthogonal** to the partition distribution optimization. Both work tog
586609
## Code Flow Summary
587610

588611
### Query Planning (Driver)
612+
589613
1. `CometScanRule` → creates `CometBatchScanExec` with Iceberg metadata
590614
2. `CometIcebergNativeScan.convert()` → serializes plan to protobuf
591615
- Extracts FileScanTasks per partition
@@ -600,6 +624,7 @@ This is **orthogonal** to the partition distribution optimization. Both work tog
600624
- Passes `partitionTasks` map to RDD constructor
601625

602626
### Task Execution (Executors)
627+
603628
1. Spark schedules task for partition `i` on executor
604629
2. Spark serializes and sends `IcebergScanPartition(i, taskBytes_i)` to executor
605630
3. `IcebergScanRDD.compute()` called with partition object
@@ -620,6 +645,7 @@ This is **orthogonal** to the partition distribution optimization. Both work tog
620645
To verify the optimization is working:
621646

622647
1. **Check logs for partition data distribution:**
648+
623649
```
624650
INFO CometIcebergNativeScan: Cached N partitions (avg X bytes/partition)
625651
```

native/core/src/jvm_bridge/mod.rs

Lines changed: 12 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -258,71 +258,18 @@ impl JVMClasses<'_> {
258258
class_get_name_method,
259259
throwable_get_message_method,
260260
throwable_get_cause_method,
261-
comet_metric_node: {
262-
eprintln!(">> Initializing CometMetricNode...");
263-
match CometMetricNode::new(env) {
264-
Ok(node) => {
265-
eprintln!(" OK: CometMetricNode initialized");
266-
node
267-
}
268-
Err(e) => {
269-
eprintln!(" ERROR: CometMetricNode failed: {:?}", e);
270-
panic!("CometMetricNode initialization failed: {:?}", e);
271-
}
272-
}
273-
},
274-
comet_exec: {
275-
eprintln!(">> Initializing CometExec...");
276-
match CometExec::new(env) {
277-
Ok(exec) => {
278-
eprintln!(" OK: CometExec initialized");
279-
exec
280-
}
281-
Err(e) => {
282-
eprintln!(" ERROR: CometExec failed: {:?}", e);
283-
panic!("CometExec initialization failed: {:?}", e);
284-
}
285-
}
286-
},
287-
comet_batch_iterator: {
288-
eprintln!(">> Initializing CometBatchIterator...");
289-
match CometBatchIterator::new(env) {
290-
Ok(iter) => {
291-
eprintln!(" OK: CometBatchIterator initialized");
292-
iter
293-
}
294-
Err(e) => {
295-
eprintln!(" ERROR: CometBatchIterator failed: {:?}", e);
296-
panic!("CometBatchIterator initialization failed: {:?}", e);
297-
}
298-
}
299-
},
300-
comet_task_memory_manager: {
301-
eprintln!(">> Initializing CometTaskMemoryManager...");
302-
match CometTaskMemoryManager::new(env) {
303-
Ok(mgr) => {
304-
eprintln!(" OK: CometTaskMemoryManager initialized");
305-
mgr
306-
}
307-
Err(e) => {
308-
eprintln!(" ERROR: CometTaskMemoryManager failed: {:?}", e);
309-
panic!("CometTaskMemoryManager initialization failed: {:?}", e);
310-
}
311-
}
312-
},
313-
native: match Native::new(env) {
314-
Ok(native) => {
315-
eprintln!("✓ Successfully initialized Native JNI class");
316-
native
317-
}
318-
Err(e) => {
319-
eprintln!("✗ PANIC: Failed to initialize Native JNI class: {:?}", e);
320-
eprintln!(" Class name: org/apache/comet/NativeJNIBridge");
321-
eprintln!(" Method: getIcebergPartitionTasksInternal");
322-
eprintln!(" Signature: ()[B");
323-
panic!("Native JNI initialization failed: {:?}", e);
324-
}
325-
},
261+
comet_metric_node: CometMetricNode::new(env)
262+
.unwrap_or_else(|e| panic!("CometMetricNode initialization failed: {:?}", e)),
263+
comet_exec: CometExec::new(env)
264+
.unwrap_or_else(|e| panic!("CometExec initialization failed: {:?}", e)),
265+
comet_batch_iterator: CometBatchIterator::new(env).unwrap_or_else(|e| {
266+
panic!("CometBatchIterator initialization failed: {:?}", e)
267+
}),
268+
comet_task_memory_manager: CometTaskMemoryManager::new(env).unwrap_or_else(|e| {
269+
panic!("CometTaskMemoryManager initialization failed: {:?}", e)
270+
}),
271+
native: Native::new(env)
272+
.unwrap_or_else(|e| panic!("Native JNI initialization failed: {:?}", e)),
326273
}
327274
});
328275
}

native/core/src/jvm_bridge/native.rs

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,57 +18,26 @@
1818
use jni::{
1919
errors::Result as JniResult,
2020
objects::{JClass, JStaticMethodID},
21-
signature::ReturnType,
2221
JNIEnv,
2322
};
2423

2524
/// A struct that holds all the JNI methods and fields for JVM Native object.
2625
pub struct Native<'a> {
2726
pub class: JClass<'a>,
2827
pub method_get_iceberg_partition_tasks_internal: JStaticMethodID,
29-
pub method_get_iceberg_partition_tasks_internal_ret: ReturnType,
3028
}
3129

3230
impl<'a> Native<'a> {
3331
pub const JVM_CLASS: &'static str = "org/apache/comet/NativeJNIBridge";
3432

3533
pub fn new(env: &mut JNIEnv<'a>) -> JniResult<Native<'a>> {
36-
eprintln!("→ Initializing Native JNI class...");
37-
eprintln!(" Looking up class: {}", Self::JVM_CLASS);
34+
let class = env.find_class(Self::JVM_CLASS)?;
3835

39-
let class = match env.find_class(Self::JVM_CLASS) {
40-
Ok(c) => {
41-
eprintln!(" ✓ Found class: {}", Self::JVM_CLASS);
42-
c
43-
}
44-
Err(e) => {
45-
eprintln!(" ✗ Failed to find class: {}", Self::JVM_CLASS);
46-
eprintln!(" Error: {:?}", e);
47-
return Err(e);
48-
}
49-
};
36+
let method =
37+
env.get_static_method_id(Self::JVM_CLASS, "getIcebergPartitionTasksInternal", "()[B")?;
5038

51-
eprintln!(" Looking up method: getIcebergPartitionTasksInternal with signature ()[B");
52-
let method = match env.get_static_method_id(
53-
Self::JVM_CLASS,
54-
"getIcebergPartitionTasksInternal",
55-
"()[B",
56-
) {
57-
Ok(m) => {
58-
eprintln!(" ✓ Found method: getIcebergPartitionTasksInternal");
59-
m
60-
}
61-
Err(e) => {
62-
eprintln!(" ✗ Failed to find method: getIcebergPartitionTasksInternal");
63-
eprintln!(" Error: {:?}", e);
64-
return Err(e);
65-
}
66-
};
67-
68-
eprintln!("✓ Native JNI class initialized successfully");
6939
Ok(Native {
7040
method_get_iceberg_partition_tasks_internal: method,
71-
method_get_iceberg_partition_tasks_internal_ret: ReturnType::Array,
7241
class,
7342
})
7443
}

spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ class IcebergReadFromS3Suite extends CometS3TestBase {
184184
id,
185185
CONCAT('data_', CAST(id AS STRING)) as data,
186186
(id % 100) as partition_id
187-
FROM range(500000)
187+
FROM range(5000000)
188188
""")
189189

190190
checkIcebergNativeScan(

0 commit comments

Comments
 (0)