Skip to content

Commit d00d9d2

Browse files
committed
stash.
1 parent d469f4d commit d00d9d2

2 files changed

Lines changed: 1 addition & 207 deletions

File tree

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 1 addition & 196 deletions
Original file line numberDiff line numberDiff line change
@@ -257,300 +257,135 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
257257
case _
258258
if scanExec.scan.getClass.getName ==
259259
"org.apache.iceberg.spark.source.SparkBatchQueryScan" =>
260-
// scalastyle:off println
261-
println(s"=== CometScanRule: Detected Iceberg SparkBatchQueryScan ===")
262-
// scalastyle:on println
263-
264260
val fallbackReasons = new ListBuffer[String]()
265261

266262
// Native Iceberg scan requires both configs to be enabled
267263
if (!COMET_ICEBERG_NATIVE_ENABLED.get()) {
268264
fallbackReasons += "Native Iceberg scan disabled because " +
269265
s"${COMET_ICEBERG_NATIVE_ENABLED.key} is not enabled"
270-
// scalastyle:off println
271-
println(s"=== Fallback: COMET_ICEBERG_NATIVE_ENABLED not enabled ===")
272-
// scalastyle:on println
273266
return withInfos(scanExec, fallbackReasons.toSet)
274267
}
275268

276269
if (!COMET_EXEC_ENABLED.get()) {
277270
fallbackReasons += "Native Iceberg scan disabled because " +
278271
s"${COMET_EXEC_ENABLED.key} is not enabled"
279-
// scalastyle:off println
280-
println(s"=== Fallback: COMET_EXEC_ENABLED not enabled ===")
281-
// scalastyle:on println
282272
return withInfos(scanExec, fallbackReasons.toSet)
283273
}
284274

285-
// scalastyle:off println
286-
println(s"=== CometScanRule: Both configs enabled, checking schema ===")
287-
// scalastyle:on println
288-
289275
val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION)
290276
val schemaSupported =
291277
typeChecker.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons)
292278

293279
if (!schemaSupported) {
294280
fallbackReasons += "Comet extension is not enabled for " +
295281
s"${scanExec.scan.getClass.getSimpleName}: Schema not supported"
296-
// scalastyle:off println
297-
println(s"=== Fallback: Schema not supported ===")
298-
// scalastyle:on println
299282
}
300283

301-
// scalastyle:off println
302-
println(s"=== CometScanRule: Schema check passed, extracting metadata ===")
303-
// scalastyle:on println
304-
305284
// Extract all Iceberg metadata once using reflection.
306285
// If any required reflection fails, this returns None, and we fall back to Spark.
307286
// First get metadataLocation and catalogProperties which are needed by the factory.
308287
val tableOpt = IcebergReflection.getTable(scanExec.scan)
309-
tableOpt.foreach { table =>
310-
logInfo(s"Iceberg table class: ${table.getClass.getName}")
311-
}
312-
313-
// scalastyle:off println
314-
if (tableOpt.isEmpty) {
315-
println(s"=== Failed to get Iceberg table via reflection ===")
316-
} else {
317-
println(s"=== Got Iceberg table: ${tableOpt.get.getClass.getName} ===")
318-
}
319-
// scalastyle:on println
320288

321289
val metadataLocationOpt = tableOpt.flatMap { table =>
322-
val metadataLoc = IcebergReflection.getMetadataLocation(table)
323-
metadataLoc match {
324-
case Some(loc) =>
325-
logInfo(s"Iceberg metadata location: $loc")
326-
// scalastyle:off println
327-
println(s"=== Got metadata location: $loc ===")
328-
// scalastyle:on println
329-
case None =>
330-
logInfo(s"Iceberg metadata location not available (likely REST catalog)")
331-
// scalastyle:off println
332-
println(s"=== Metadata location not available ===")
333-
// scalastyle:on println
334-
}
335-
metadataLoc
290+
IcebergReflection.getMetadataLocation(table)
336291
}
337292

338293
val metadataOpt = metadataLocationOpt.flatMap { metadataLocation =>
339-
// scalastyle:off println
340-
println(s"=== Starting metadata extraction for location: $metadataLocation ===")
341-
// scalastyle:on println
342294
try {
343295
val session = org.apache.spark.sql.SparkSession.active
344296
val hadoopConf = session.sessionState.newHadoopConf()
345297

346298
// For REST catalogs, the metadata file may not exist on disk since metadata
347299
// is fetched via HTTP. Check if file exists; if not, use table location instead.
348300
val metadataUri = new java.net.URI(metadataLocation)
349-
// scalastyle:off println
350-
println(
351-
s"=== metadataUri: $metadataUri, scheme: ${metadataUri.getScheme}, " +
352-
s"path: ${metadataUri.getPath} ===")
353-
// scalastyle:on println
354301

355302
val metadataFile = new java.io.File(metadataUri.getPath)
356-
// scalastyle:off println
357-
println(
358-
s"=== metadataFile: ${metadataFile.getAbsolutePath}, " +
359-
s"exists: ${metadataFile.exists()} ===")
360-
// scalastyle:on println
361303

362304
val effectiveLocation =
363305
if (!metadataFile.exists() && metadataUri.getScheme == "file") {
364306
// Metadata file doesn't exist (REST catalog with InMemoryFileIO or similar)
365307
// Use table location instead for FileIO initialization
366-
// scalastyle:off println
367-
println(s"=== Metadata file doesn't exist, attempting to get table location ===")
368-
// scalastyle:on println
369308

370309
tableOpt
371310
.flatMap { table =>
372311
try {
373312
val locationMethod = table.getClass.getMethod("location")
374313
val tableLocation = locationMethod.invoke(table).asInstanceOf[String]
375-
// scalastyle:off println
376-
println(
377-
s"=== REST catalog detected: metadata file doesn't exist, " +
378-
s"using table location: $tableLocation ===")
379-
// scalastyle:on println
380314
Some(tableLocation)
381315
} catch {
382316
case e: Exception =>
383-
// scalastyle:off println
384-
println(
385-
s"=== Could not get table location, " +
386-
s"using metadata location anyway: ${e.getMessage} ===")
387-
e.printStackTrace()
388-
// scalastyle:on println
389317
Some(metadataLocation)
390318
}
391319
}
392320
.getOrElse(metadataLocation)
393321
} else {
394-
// scalastyle:off println
395-
println(
396-
s"=== Metadata file exists or not file:// scheme, " +
397-
s"using metadata location ===")
398-
// scalastyle:on println
399322
metadataLocation
400323
}
401324

402-
// scalastyle:off println
403-
println(s"=== effectiveLocation: $effectiveLocation ===")
404-
// scalastyle:on println
405-
406325
val effectiveUri = new java.net.URI(effectiveLocation)
407-
// scalastyle:off println
408-
println(s"=== effectiveUri: $effectiveUri ===")
409-
// scalastyle:on println
410326

411327
val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri)
412-
// scalastyle:off println
413-
println(s"=== hadoopS3Options: $hadoopS3Options ===")
414-
// scalastyle:on println
415328

416329
val catalogProperties =
417330
org.apache.comet.serde.operator.CometIcebergNativeScan
418331
.hadoopToIcebergS3Properties(hadoopS3Options)
419-
// scalastyle:off println
420-
println(s"=== catalogProperties: $catalogProperties ===")
421-
// scalastyle:on println
422-
423-
// scalastyle:off println
424-
println(
425-
s"=== Calling CometIcebergNativeScanMetadata.extract " +
426-
s"with location: $effectiveLocation ===")
427-
// scalastyle:on println
428332

429333
val result = CometIcebergNativeScanMetadata
430334
.extract(scanExec.scan, effectiveLocation, catalogProperties)
431335

432-
// scalastyle:off println
433-
result match {
434-
case Some(metadata) =>
435-
println(s"=== CometIcebergNativeScanMetadata.extract returned Some(metadata) ===")
436-
case None =>
437-
println(s"=== CometIcebergNativeScanMetadata.extract returned None ===")
438-
}
439-
// scalastyle:on println
440-
441336
result
442337
} catch {
443338
case e: Exception =>
444-
// scalastyle:off println
445-
println(
446-
s"=== Failed to extract catalog properties from Iceberg scan: " +
447-
s"${e.getMessage} ===")
448-
e.printStackTrace()
449-
// scalastyle:on println
450339
logError(
451340
s"Failed to extract catalog properties from Iceberg scan: ${e.getMessage}",
452341
e)
453342
None
454343
}
455344
}
456345

457-
// scalastyle:off println
458-
if (metadataOpt.isEmpty) {
459-
println(s"=== metadataOpt is None, will fall back ===")
460-
} else {
461-
println(s"=== metadataOpt is Some, proceeding with validation ===")
462-
}
463-
// scalastyle:on println
464-
465346
// If metadata extraction failed, fall back to Spark
466347
val metadata = metadataOpt match {
467348
case Some(m) =>
468-
// scalastyle:off println
469-
println(s"=== Got metadata, proceeding with validation ===")
470-
// scalastyle:on println
471349
m
472350
case None =>
473351
fallbackReasons += "Failed to extract Iceberg metadata via reflection"
474-
// scalastyle:off println
475-
println(
476-
s"=== No metadata, falling back. " +
477-
s"Reasons: ${fallbackReasons.mkString(", ")} ===")
478-
// scalastyle:on println
479352
return withInfos(scanExec, fallbackReasons.toSet)
480353
}
481354

482355
// Now perform all validation using the pre-extracted metadata
483356
// Check if table uses a FileIO implementation compatible with iceberg-rust
484-
// scalastyle:off println
485-
println(s"=== Starting FileIO compatibility check ===")
486-
// scalastyle:on println
487357

488358
val fileIOCompatible = IcebergReflection.getFileIO(metadata.table) match {
489359
case Some(fileIO) =>
490360
val fileIOClassName = fileIO.getClass.getName
491-
// scalastyle:off println
492-
println(s"=== FileIO class: $fileIOClassName ===")
493-
// scalastyle:on println
494361
// InMemoryFileIO is now supported with table location fallback for REST catalogs
495362
true
496363
case None =>
497364
fallbackReasons += "Could not check FileIO compatibility"
498-
// scalastyle:off println
499-
println(s"=== Could not get FileIO, falling back ===")
500-
// scalastyle:on println
501365
false
502366
}
503367

504-
// scalastyle:off println
505-
println(s"=== FileIO compatible: $fileIOCompatible ===")
506-
// scalastyle:on println
507-
508368
// Check Iceberg table format version
509-
// scalastyle:off println
510-
println(s"=== Checking format version ===")
511-
// scalastyle:on println
512369

513370
val formatVersionSupported = IcebergReflection.getFormatVersion(metadata.table) match {
514371
case Some(formatVersion) =>
515-
// scalastyle:off println
516-
println(s"=== Format version: $formatVersion ===")
517-
// scalastyle:on println
518372
if (formatVersion > 2) {
519373
fallbackReasons += "Iceberg table format version " +
520374
s"$formatVersion is not supported. " +
521375
"Comet only supports Iceberg table format V1 and V2"
522-
// scalastyle:off println
523-
println(s"=== Format version $formatVersion not supported, falling back ===")
524-
// scalastyle:on println
525376
false
526377
} else {
527-
// scalastyle:off println
528-
println(s"=== Format version $formatVersion supported ===")
529-
// scalastyle:on println
530378
true
531379
}
532380
case None =>
533381
fallbackReasons += "Could not verify Iceberg table format version"
534-
// scalastyle:off println
535-
println(s"=== Could not get format version, falling back ===")
536-
// scalastyle:on println
537382
false
538383
}
539384

540-
// scalastyle:off println
541-
println(s"=== Checking file formats and schemes ===")
542-
// scalastyle:on println
543-
544385
// Check if all files are Parquet format and use supported filesystem schemes
545386
val (allParquetFiles, unsupportedSchemes) =
546387
IcebergReflection.validateFileFormatsAndSchemes(metadata.tasks)
547388

548-
// scalastyle:off println
549-
println(
550-
s"=== allParquetFiles: $allParquetFiles, " +
551-
s"unsupportedSchemes: ${unsupportedSchemes.mkString(", ")} ===")
552-
// scalastyle:on println
553-
554389
val allSupportedFilesystems = if (unsupportedSchemes.isEmpty) {
555390
true
556391
} else {
@@ -565,10 +400,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
565400
"Comet only supports Parquet files in Iceberg tables"
566401
}
567402

568-
// scalastyle:off println
569-
println(s"=== Checking partition types ===")
570-
// scalastyle:on println
571-
572403
// Partition values are deserialized via iceberg-rust's Literal::try_from_json()
573404
// which has incomplete type support (binary/fixed unimplemented, decimals limited)
574405
val partitionTypesSupported = (for {
@@ -577,10 +408,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
577408
val unsupportedTypes =
578409
IcebergReflection.validatePartitionTypes(partitionSpec, metadata.scanSchema)
579410

580-
// scalastyle:off println
581-
println(s"=== unsupportedTypes: ${unsupportedTypes.size} ===")
582-
// scalastyle:on println
583-
584411
if (unsupportedTypes.nonEmpty) {
585412
unsupportedTypes.foreach { case (fieldName, typeStr, reason) =>
586413
fallbackReasons +=
@@ -722,37 +549,15 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
722549
!hasUnsupportedDeletes
723550
}
724551

725-
// scalastyle:off println
726-
println(s"=== Final validation results: ===")
727-
println(s"=== schemaSupported: $schemaSupported ===")
728-
println(s"=== fileIOCompatible: $fileIOCompatible ===")
729-
println(s"=== formatVersionSupported: $formatVersionSupported ===")
730-
println(s"=== allParquetFiles: $allParquetFiles ===")
731-
println(s"=== allSupportedFilesystems: $allSupportedFilesystems ===")
732-
println(s"=== partitionTypesSupported: $partitionTypesSupported ===")
733-
println(s"=== complexTypePredicatesSupported: $complexTypePredicatesSupported ===")
734-
println(s"=== transformFunctionsSupported: $transformFunctionsSupported ===")
735-
println(s"=== deleteFileTypesSupported: $deleteFileTypesSupported ===")
736-
// scalastyle:on println
737-
738552
if (schemaSupported && fileIOCompatible && formatVersionSupported && allParquetFiles &&
739553
allSupportedFilesystems && partitionTypesSupported &&
740554
complexTypePredicatesSupported && transformFunctionsSupported &&
741555
deleteFileTypesSupported) {
742-
// scalastyle:off println
743-
println(
744-
s"=== ALL CHECKS PASSED - " +
745-
s"Creating CometBatchScanExec with native Iceberg scan ===")
746-
// scalastyle:on println
747556
CometBatchScanExec(
748557
scanExec.clone().asInstanceOf[BatchScanExec],
749558
runtimeFilters = scanExec.runtimeFilters,
750559
nativeIcebergScanMetadata = Some(metadata))
751560
} else {
752-
// scalastyle:off println
753-
println(s"=== Some checks failed - Falling back to Spark ===")
754-
println(s"=== Fallback reasons: ${fallbackReasons.mkString(", ")} ===")
755-
// scalastyle:on println
756561
withInfos(scanExec, fallbackReasons.toSet)
757562
}
758563

spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2268,22 +2268,11 @@ class CometIcebergNativeSuite extends CometTestBase {
22682268
) USING iceberg
22692269
""")
22702270

2271-
// Insert data
22722271
spark.sql("""
22732272
INSERT INTO rest_cat.db.test_table
22742273
VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7)
22752274
""")
22762275

2277-
// Query the table
2278-
val df = spark.sql("SELECT * FROM rest_cat.db.test_table ORDER BY id")
2279-
2280-
// Print the explain to see fallback reasons
2281-
// scalastyle:off println
2282-
println("=== EXPLAIN OUTPUT ===")
2283-
df.explain(true)
2284-
println("=== END EXPLAIN ===")
2285-
// scalastyle:on println
2286-
22872276
checkIcebergNativeScan("SELECT * FROM rest_cat.db.test_table ORDER BY id")
22882277

22892278
spark.sql("DROP TABLE rest_cat.db.test_table")

0 commit comments

Comments
 (0)