Skip to content

Commit a1b81ec

Browse files
schenksjclaude
andcommitted
feat: Iceberg parity improvements + code review fixes for Delta native scan
- Add LRU cache to DeltaPlanDataInjector (matching Iceberg pattern) - Add ImmutableSQLMetric, planning metrics (total_files, dv_files), explicit doExecuteColumnar() override, convertBlock() preserving @transient fields - Add DPP support via doPrepare() + deferred partition pruning - Add filesystem scheme validation in scan rule - Consolidate castPartitionString into DeltaReflection (fixes divergent date/timestamp/decimal handling between planning and execution) - Fix DV unwrap_or_default → proper error on kernel mismatch - Fix silent predicate decode failure → log warning - Fix ColumnMappingFilterRewriter silent fallback → log warning - Fix fieldIndex/indexWhere inconsistency in DPP partition resolution - Fix ThreadLocal cleanup in createExec (try-finally) - Add unsafe safety docs, DV filter loop invariant comment - Replace flaky Thread.sleep in timestamp test with deterministic mtime - Split test suite into 3 focused suites + shared CometDeltaTestBase trait - Fix license header typo, outdated comments, error message clarity Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ec70c59 commit a1b81ec

14 files changed

Lines changed: 1100 additions & 761 deletions

File tree

native/core/src/delta/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ pub enum DeltaError {
5454

5555
#[error("delta kernel error: {0}")]
5656
Kernel(#[from] delta_kernel::Error),
57+
58+
#[error("{0}")]
59+
Internal(String),
5760
}
5861

5962
pub type DeltaResult<T> = std::result::Result<T, DeltaError>;

native/core/src/delta/jni.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
// Unless required by applicable law or agreed to in writing,
1212
// software distributed under the License is distributed on an
1313
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14-
// KIND, either success or implied. See the License for the
14+
// KIND, either express or implied. See the License for the
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

@@ -105,14 +105,21 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_planDeltaScan(
105105
// BoundReference index-to-name resolution.
106106
let kernel_predicate = _predicate_proto.and_then(|bytes| {
107107
use prost::Message;
108-
datafusion_comet_proto::spark_expression::Expr::decode(bytes.as_slice())
109-
.ok()
110-
.map(|expr| {
108+
match datafusion_comet_proto::spark_expression::Expr::decode(bytes.as_slice()) {
109+
Ok(expr) => Some(
111110
crate::delta::predicate::catalyst_to_kernel_predicate_with_names(
112111
&expr,
113112
&col_names,
114-
)
115-
})
113+
),
114+
),
115+
Err(e) => {
116+
log::warn!(
117+
"Failed to decode predicate for Delta file pruning: {e}; \
118+
scanning all files"
119+
);
120+
None
121+
}
122+
}
116123
});
117124

118125
let plan =
@@ -238,6 +245,9 @@ fn read_string_array(
238245
let mut result = Vec::with_capacity(len);
239246
for i in 0..len {
240247
let obj = arr.get_element(env, i)?;
248+
// SAFETY: get_element returns a valid local JObject reference that we
249+
// immediately convert to JString. The array is String[], so the cast
250+
// is valid. The env lifetime outlives this scope.
241251
let jstr = unsafe { JString::from_raw(env, obj.into_raw()) };
242252
result.push(jstr.try_to_string(env)?);
243253
}
@@ -256,7 +266,9 @@ fn map_get_string(
256266
match jmap.get(env, &key_jobj)? {
257267
None => Ok(None),
258268
Some(value) => {
259-
// Safe: `Map<String, String>::get` only ever returns a String.
269+
// SAFETY: Map<String, String>::get always returns a String. The
270+
// JObject reference is valid because JMap::get returned it from the
271+
// current env frame. We consume the local ref via into_raw().
260272
let jstr = unsafe { JString::from_raw(env, value.into_raw()) };
261273
Ok(Some(jstr.try_to_string(env)?))
262274
}

native/core/src/delta/scan.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,12 @@ pub fn plan_delta_scan_with_predicate(
240240
// processing if driver OOM becomes an issue at extreme scale.
241241
let mut entries: Vec<DeltaFileEntry> = Vec::with_capacity(raw.len());
242242
for r in raw {
243-
// get_row_indexes returns Ok(Some(Vec<u64>)) when a DV is present,
244-
// Ok(None) when has_vector() lied (shouldn't happen), or Err on I/O
245-
// failure. The `?` propagates I/O errors; `unwrap_or_default` handles
246-
// the None case defensively (empty = no deletions, safe fallback).
247243
let deleted_row_indexes = if r.dv_info.has_vector() {
248244
r.dv_info
249245
.get_row_indexes(&engine, &table_root_url)?
250-
.unwrap_or_default()
246+
.ok_or_else(|| DeltaError::Internal(
247+
format!("DV has_vector() true but get_row_indexes() returned None for {}", r.path),
248+
))?
251249
} else {
252250
Vec::new()
253251
};

native/core/src/execution/operators/delta_dv_filter.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ impl DeltaDvFilterStream {
247247
// that fall inside [batch_start, batch_end).
248248
let mut mask_buf: Vec<bool> = vec![true; batch_rows as usize];
249249
let mut dropped: usize = 0;
250+
// Loop is safe: next_delete_idx < deleted.len() is checked by the while
251+
// condition, and deleted is sorted ascending by the kernel contract.
250252
while self.next_delete_idx < self.deleted.len() {
251253
let d = self.deleted[self.next_delete_idx];
252254
if d >= batch_end {

native/core/src/execution/planner.rs

Lines changed: 64 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,7 +1358,7 @@ impl PhysicalPlanner {
13581358
let common = scan
13591359
.common
13601360
.as_ref()
1361-
.ok_or_else(|| GeneralError("DeltaScan missing common data".into()))?;
1361+
.ok_or_else(|| GeneralError("DeltaScan proto missing 'common' field (Scala serialization error)".into()))?;
13621362

13631363
let required_schema: SchemaRef =
13641364
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
@@ -1415,7 +1415,19 @@ impl PhysicalPlanner {
14151415
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = common
14161416
.data_filters
14171417
.iter()
1418-
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
1418+
.map(|expr| {
1419+
let filter =
1420+
self.create_expr(expr, Arc::clone(&required_schema))?;
1421+
if has_column_mapping {
1422+
let mut rewriter = ColumnMappingFilterRewriter {
1423+
logical_to_physical: &logical_to_physical,
1424+
data_schema: &data_schema,
1425+
};
1426+
Ok(filter.rewrite(&mut rewriter).data()?)
1427+
} else {
1428+
Ok(filter)
1429+
}
1430+
})
14191431
.collect();
14201432

14211433
let object_store_options: HashMap<String, String> = common
@@ -1459,39 +1471,21 @@ impl PhysicalPlanner {
14591471
.tasks
14601472
.first()
14611473
.map(|t| t.file_path.clone())
1462-
.ok_or_else(|| GeneralError("DeltaScan has no tasks".into()))?;
1474+
.ok_or_else(|| GeneralError("DeltaScan has no tasks after split-mode injection (check DeltaPlanDataInjector)".into()))?;
14631475
let (object_store_url, _) = prepare_object_store_with_configs(
14641476
self.session_ctx.runtime_env(),
14651477
one_file,
14661478
&object_store_options,
14671479
)?;
14681480

1469-
// When column mapping is active, required_schema also needs physical
1470-
// names so init_datasource_exec's name-matching logic works against the
1471-
// physical data_schema.
1472-
let read_required_schema = if has_column_mapping {
1473-
let new_fields: Vec<_> = required_schema
1474-
.fields()
1475-
.iter()
1476-
.map(|f| {
1477-
if let Some(physical) = logical_to_physical.get(f.name()) {
1478-
Arc::new(Field::new(
1479-
physical,
1480-
f.data_type().clone(),
1481-
f.is_nullable(),
1482-
))
1483-
} else {
1484-
Arc::clone(f)
1485-
}
1486-
})
1487-
.collect();
1488-
Arc::new(Schema::new(new_fields))
1489-
} else {
1490-
Arc::clone(&required_schema)
1491-
};
1492-
1481+
// Keep required_schema in LOGICAL names (Spark's convention).
1482+
// data_schema uses physical names (when column mapping is active).
1483+
// DataFusion's schema adapter bridges the gap: it matches file
1484+
// columns against data_schema by name and produces the
1485+
// required_schema output shape, injecting nulls for missing
1486+
// columns (schema evolution).
14931487
let delta_exec = init_datasource_exec(
1494-
read_required_schema,
1488+
Arc::clone(&required_schema),
14951489
Some(data_schema),
14961490
Some(partition_schema),
14971491
object_store_url,
@@ -1518,33 +1512,9 @@ impl PhysicalPlanner {
15181512
delta_exec
15191513
};
15201514

1521-
// Phase 4: when column mapping is active, the output has PHYSICAL
1522-
// column names. Add a ProjectionExec to rename back to logical.
1523-
let final_exec = if has_column_mapping {
1524-
let physical_to_logical: HashMap<String, String> = logical_to_physical
1525-
.iter()
1526-
.map(|(l, p)| (p.clone(), l.clone()))
1527-
.collect();
1528-
let input_schema = final_exec.schema();
1529-
let rename_exprs: Result<Vec<(Arc<dyn PhysicalExpr>, String)>, ExecutionError> = input_schema
1530-
.fields()
1531-
.iter()
1532-
.enumerate()
1533-
.map(|(idx, f)| {
1534-
let col: Arc<dyn PhysicalExpr> =
1535-
Arc::new(Column::new(f.name(), idx));
1536-
let logical = physical_to_logical
1537-
.get(f.name())
1538-
.cloned()
1539-
.unwrap_or_else(|| f.name().clone());
1540-
Ok((col, logical))
1541-
})
1542-
.collect();
1543-
let rename_exprs = rename_exprs?;
1544-
Arc::new(ProjectionExec::try_new(rename_exprs, final_exec)?) as Arc<dyn ExecutionPlan>
1545-
} else {
1546-
final_exec
1547-
};
1515+
// No rename projection needed: required_schema already uses
1516+
// logical names, and DataFusion's schema adapter handles the
1517+
// physical→logical mapping internally via data_schema.
15481518

15491519
Ok((
15501520
vec![],
@@ -2999,6 +2969,45 @@ fn expr_to_columns(
29992969
Ok((left_field_indices, right_field_indices))
30002970
}
30012971

2972+
/// Rewrites Column references in a PhysicalExpr from logical names/indices
2973+
/// (as in required_schema) to physical names/indices (as in data_schema).
2974+
/// Used by the Delta scan path when column mapping is active so that pushed-down
2975+
/// data filters match the DataSourceExec's base schema (physical column names).
2976+
struct ColumnMappingFilterRewriter<'a> {
2977+
logical_to_physical: &'a HashMap<String, String>,
2978+
data_schema: &'a SchemaRef,
2979+
}
2980+
2981+
impl TreeNodeRewriter for ColumnMappingFilterRewriter<'_> {
2982+
type Node = Arc<dyn PhysicalExpr>;
2983+
2984+
fn f_down(
2985+
&mut self,
2986+
node: Self::Node,
2987+
) -> datafusion::common::Result<Transformed<Self::Node>> {
2988+
if let Some(column) = node.as_any().downcast_ref::<Column>() {
2989+
if let Some(physical_name) = self.logical_to_physical.get(column.name()) {
2990+
if let Some(idx) = self
2991+
.data_schema
2992+
.fields()
2993+
.iter()
2994+
.position(|f| f.name() == physical_name)
2995+
{
2996+
return Ok(Transformed::yes(Arc::new(Column::new(physical_name, idx))));
2997+
}
2998+
log::warn!(
2999+
"Column mapping: physical name '{}' for logical '{}' not found in data_schema; \
3000+
filter may fail at execution time",
3001+
physical_name, column.name()
3002+
);
3003+
}
3004+
Ok(Transformed::no(node))
3005+
} else {
3006+
Ok(Transformed::no(node))
3007+
}
3008+
}
3009+
}
3010+
30023011
/// A physical join filter rewritter which rewrites the column indices in the expression
30033012
/// to use the new column indices. See `rewrite_physical_expr`.
30043013
struct JoinFilterRewriter<'a> {

spark/src/main/scala/org/apache/comet/delta/DeltaReflection.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,48 @@ object DeltaReflection extends Logging {
137137
case _: Exception => None
138138
}
139139
}
140+
141+
/**
142+
* Convert a Delta partition value string to a Catalyst-internal representation. Delta stores
143+
* partition values as strings in add actions; this converts them to the correct type for
144+
* predicate evaluation.
145+
*/
146+
def castPartitionString(str: Option[String], dt: org.apache.spark.sql.types.DataType): Any = {
147+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
148+
import org.apache.spark.sql.types._
149+
import org.apache.spark.unsafe.types.UTF8String
150+
str match {
151+
case None | Some(null) => null
152+
case Some(s) =>
153+
try {
154+
dt match {
155+
case StringType => UTF8String.fromString(s)
156+
case IntegerType => s.toInt
157+
case LongType => s.toLong
158+
case ShortType => s.toShort
159+
case ByteType => s.toByte
160+
case FloatType => s.toFloat
161+
case DoubleType => s.toDouble
162+
case BooleanType => s.toBoolean
163+
case DateType =>
164+
DateTimeUtils
165+
.stringToDate(UTF8String.fromString(s))
166+
.getOrElse(null)
167+
case _: TimestampType =>
168+
DateTimeUtils
169+
.stringToTimestamp(UTF8String.fromString(s), java.time.ZoneOffset.UTC)
170+
.getOrElse(null)
171+
case d: DecimalType =>
172+
val dec =
173+
org.apache.spark.sql.types.Decimal(new java.math.BigDecimal(s))
174+
dec.changePrecision(d.precision, d.scale)
175+
dec
176+
case _ => UTF8String.fromString(s)
177+
}
178+
} catch {
179+
case _: NumberFormatException | _: IllegalArgumentException =>
180+
null
181+
}
182+
}
183+
}
140184
}

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,26 @@ case class CometScanRule(session: SparkSession)
396396
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DELTA_COMPAT, r)) {
397397
return None
398398
}
399+
400+
// Validate filesystem schemes from the scan's input files.
401+
val supportedSchemes =
402+
Set("file", "s3", "s3a", "gs", "gcs", "abfss", "abfs", "wasbs", "wasb", "oss")
403+
val inputFiles = scanExec.relation.location.inputFiles
404+
if (inputFiles.nonEmpty) {
405+
val schemes = inputFiles
406+
.map(f => new java.net.URI(f).getScheme)
407+
.filter(_ != null)
408+
.toSet
409+
val unsupported = schemes -- supportedSchemes
410+
if (unsupported.nonEmpty) {
411+
withInfo(
412+
scanExec,
413+
s"Native Delta scan does not support filesystem schemes: " +
414+
unsupported.mkString(", "))
415+
return None
416+
}
417+
}
418+
399419
Some(CometScanExec(scanExec, session, SCAN_NATIVE_DELTA_COMPAT))
400420
}
401421

0 commit comments

Comments
 (0)