Iceberg 1.11 support for Spark 411, part (1/3): extract version-divergent scan APIs behind a shim#14881
Iceberg 1.11 support for Spark 411, part (1/3): extract version-divergent scan APIs behind a shim#14881res-life wants to merge 1 commit into
Conversation
gerashegalov
left a comment
There was a problem hiding this comment.
This should ideally build on top of #14866
|
build |
Greptile SummaryThis PR refactors the Iceberg integration's common layer to hide version-divergent scan APIs (
Confidence Score: 4/5Safe to merge; no behavior change for existing Iceberg 1.6/1.9/1.10 users, and the new dispatch infrastructure is straightforward. The refactoring is mechanically correct: the class hierarchy ensures the GpuSparkScan cast is safe, the hasNestedType rewrite maps Iceberg types to Spark equivalents correctly, and the Array[DataFile] type annotation fixes a real Scala 2.13 inference gap. The two tactical try/catch fallbacks (projection vs expectedSchema in GpuSparkBatch, runtimeFilters vs runtimeFilterExpressions in GpuSparkBatchQueryScan) each throw-and-catch an exception on every invocation for all current supported Iceberg versions, which is a minor hot-path overhead explicitly acknowledged in the PR as temporary pending a follow-up cleanup. GpuSparkBatch and GpuSparkBatchQueryScan carry the tactical field-name fallbacks that should be moved to IcebergShimUtils in the follow-up PR. Important Files Changed
Class Diagram%%{init: {'theme': 'neutral'}}%%
classDiagram
class GpuSparkScan {
<<abstract>>
+cpuScan: SparkScan
+rapidsConf: RapidsConf
+queryUsesInputFile: Boolean
+hasNestedType() Boolean
+readSchema() StructType
+toBatch() Batch
}
class GpuSparkPartitioningAwareScan {
<<abstract>>
+outputPartitioning() Partitioning
+groupingKeyType() Types.StructType
+taskGroups() Seq
}
class GpuSparkBatchQueryScan {
+cpuScan: SparkBatchQueryScan
-runtimeFilterExpressions: List~Expression~
+filter(predicates: Array~Predicate~)
+withInputFile() GpuScan
}
class GpuSparkCopyOnWriteScanBase {
<<abstract>>
+cpuScan: SparkCopyOnWriteScan
+filterAttributes() Array~NamedReference~
+estimateStatistics() Statistics
}
class GpuSparkCopyOnWriteScan_16 {
+filter(filters: Array~Filter~)
+withInputFile() GpuScan
}
class GpuSparkCopyOnWriteScan_19 {
+filter(filters: Array~Filter~)
+withInputFile() GpuScan
}
class GpuSparkCopyOnWriteScan_110 {
+filter(filters: Array~Filter~)
+withInputFile() GpuScan
}
class ShimUtils {
+newCopyOnWriteScan(Scan, RapidsConf, Boolean) GpuScan$
}
class IcebergShimUtils {
<<interface>>
+newCopyOnWriteScan(Scan, RapidsConf, Boolean) GpuScan
}
GpuSparkScan <|-- GpuSparkPartitioningAwareScan
GpuSparkPartitioningAwareScan <|-- GpuSparkBatchQueryScan
GpuSparkPartitioningAwareScan <|-- GpuSparkCopyOnWriteScanBase
GpuSparkCopyOnWriteScanBase <|-- GpuSparkCopyOnWriteScan_16 : iceberg-1-6-x
GpuSparkCopyOnWriteScanBase <|-- GpuSparkCopyOnWriteScan_19 : iceberg-1-9-x
GpuSparkCopyOnWriteScanBase <|-- GpuSparkCopyOnWriteScan_110 : iceberg-1-10-x
ShimUtils --> IcebergShimUtils : delegates
IcebergShimUtils <|.. GpuSparkCopyOnWriteScan_16 : create()
IcebergShimUtils <|.. GpuSparkCopyOnWriteScan_19 : create()
IcebergShimUtils <|.. GpuSparkCopyOnWriteScan_110 : create()
Reviews (1): Last reviewed commit: "Iceberg: extract version-divergent scan ..." | Re-trigger Greptile |
ffb4086 to
4647fc3
Compare
Refactors iceberg/common so the {SparkScan, SparkBatchQueryScan,
SparkCopyOnWriteScan, SparkBatch, DataWriteResult} APIs that diverge
between Iceberg 1.10.x and 1.11.x are hidden behind a small interface,
with per-version implementations in iceberg-1-6-x / iceberg-1-9-x /
iceberg-1-10-x. No behavior change for the existing Iceberg versions
this PR ships; sets the stage for a follow-up that adds iceberg-1-11-x.
Common:
- GpuSparkCopyOnWriteScan -> renamed to GpuSparkCopyOnWriteScanBase
(abstract); per-version concrete subclass mixes in the right runtime-
filter trait (SupportsRuntimeFiltering vs SupportsRuntimeV2Filtering)
and the matching filter() signature.
- GpuSparkScan: rewrite hasNestedType via Spark's readSchema() + Spark
types so it no longer depends on the Iceberg 1.10-only
cpuScan.expectedSchema(); dispatch SparkCopyOnWriteScan construction
through ShimUtils.newCopyOnWriteScan.
- GpuSparkBatchQueryScan: toString uses cpuScan.description() (public,
available in both Iceberg 1.10 and 1.11) instead of branch /
expectedSchema / filterExpressions which 1.11 removed.
runtimeFilterExpressions field read tolerates both 1.10 name
(runtimeFilterExpressions) and 1.11 name (runtimeFilters) — a tactical
fallback to be replaced with proper per-version shim methods.
- GpuSparkBatch: same tolerance for expectedSchema (1.10) vs projection
(1.11).
- GpuSparkWrite: type-annotate `new Array[DataFile](0)` so Scala 2.13
doesn't infer Array[Nothing] under 1.11's wildcarded
DataWriteResult.dataFiles().
- IcebergShimUtils / ShimUtils: add newCopyOnWriteScan(Scan, ...) factory
whose parameter is Spark's public Scan because Iceberg's
SparkCopyOnWriteScan is package-private — cross-package callers cannot
reference it directly.
Per-Iceberg-version module:
- New GpuSparkCopyOnWriteScan in org.apache.iceberg.spark.source (so it
can reference the package-private SparkCopyOnWriteScan). Companion
object exposes create(Scan, ...): GpuScan for cross-package callers.
1.6/1.9/1.10 mix in SupportsRuntimeFiltering + filter(Filter[]).
- ShimUtilsImpl.java: implement newCopyOnWriteScan via
GpuSparkCopyOnWriteScan.create.
Signed-off-by: Chong Gao <res_life@163.com>
4647fc3 to
ac790ba
Compare
Stacked work for #14853 (1/3) — common-code preparation for adding iceberg-1-11-x.
Depends on
Description
Refactors
iceberg/commonso theSparkScan/SparkCopyOnWriteScan/SparkBatch/DataWriteResultAPIs that diverge between Iceberg 1.10.x and 1.11.x are hidden behind a small interface, with per-Iceberg-version implementations iniceberg-1-6-x/iceberg-1-9-x/iceberg-1-10-x. No behavior change for the Iceberg versions this PR ships; sets the stage for the follow-up PR that addsiceberg-1-11-x.Common changes:
GpuSparkCopyOnWriteScan→ renamed toGpuSparkCopyOnWriteScanBase(abstract). The runtime-filter trait +filtermethod live in a per-version concrete subclass (1.6/1.9/1.10 mix inSupportsRuntimeFilteringwithfilter(Filter[]); 1.11 will mix inSupportsRuntimeV2Filteringwithfilter(Predicate[])).GpuSparkScan: rewritehasNestedTypevia Spark'sreadSchema()+ Spark types so it no longer depends on the 1.10-onlycpuScan.expectedSchema(). DispatchSparkCopyOnWriteScanconstruction through the newShimUtils.newCopyOnWriteScanfactory.GpuSparkBatchQueryScan.toStringusescpuScan.description()(available in both 1.10 and 1.11) instead ofbranch/expectedSchema/filterExpressions(1.11 removed these).GpuSparkBatchQueryScan.runtimeFilterExpressionsreflective field-read tolerates both the 1.10 name (runtimeFilterExpressions) and the 1.11 name (runtimeFilters).GpuSparkBatch: same tolerance forexpectedSchema(1.10) vsprojection(1.11).GpuSparkWrite: type-annotatenew Array[DataFile](0)so Scala 2.13 doesn't inferArray[Nothing]under 1.11's wildcardedDataWriteResult.dataFiles().IcebergShimUtils/ShimUtils: addnewCopyOnWriteScan(Scan, RapidsConf, Boolean): GpuScanfactory. The parameter is Spark's publicScanbecause Iceberg'sSparkCopyOnWriteScanis package-private — cross-package callers cannot reference it directly.Per-Iceberg-version module changes (1.6 / 1.9 / 1.10, all identical for the V1 path):
GpuSparkCopyOnWriteScaninorg.apache.iceberg.spark.source(so it can reference the package-privateSparkCopyOnWriteScan). Companion object exposescreate(Scan, ...): GpuScanfor cross-package callers.ShimUtilsImpl.javaimplementsnewCopyOnWriteScanviaGpuSparkCopyOnWriteScan.create.The two
try/catchfield-name fallbacks (inGpuSparkBatchQueryScanandGpuSparkBatch) are tactical and will be pushed behind proper per-versionIcebergShimUtilsmethods in a later cleanup PR.Checklists
Documentation
Testing
(3.5.x + 4.0.x iceberg integration tests in `integration_tests/src/main/python/iceberg/` — exercises the new dispatch path with no behavior change vs. before this PR.)
Performance