Iceberg 1.11: accelerate SparkIncrementalAppendScan on GPU#14883
Draft
res-life wants to merge 3 commits into
Draft
Iceberg 1.11: accelerate SparkIncrementalAppendScan on GPU#14883res-life wants to merge 3 commits into
res-life wants to merge 3 commits into
Conversation
1b3c034 to
e2a0367
Compare
added 3 commits
May 29, 2026 11:36
Refactors iceberg/common so the {SparkScan, SparkBatchQueryScan,
SparkCopyOnWriteScan, SparkBatch, DataWriteResult} APIs that diverge
between Iceberg 1.10.x and 1.11.x are hidden behind a small interface,
with per-version implementations in iceberg-1-6-x / iceberg-1-9-x /
iceberg-1-10-x. No behavior change for the existing Iceberg versions
this PR ships; sets the stage for a follow-up that adds iceberg-1-11-x.
Common:
- GpuSparkCopyOnWriteScan -> renamed to GpuSparkCopyOnWriteScanBase
(abstract); per-version concrete subclass mixes in the right runtime-
filter trait (SupportsRuntimeFiltering vs SupportsRuntimeV2Filtering)
and the matching filter() signature.
- GpuSparkScan: rewrite hasNestedType via Spark's readSchema() + Spark
types so it no longer depends on the Iceberg 1.10-only
cpuScan.expectedSchema(); dispatch SparkCopyOnWriteScan construction
through ShimUtils.newCopyOnWriteScan.
- GpuSparkBatchQueryScan: toString uses cpuScan.description() (public,
available in both Iceberg 1.10 and 1.11) instead of branch /
expectedSchema / filterExpressions which 1.11 removed.
runtimeFilterExpressions field read tolerates both 1.10 name
(runtimeFilterExpressions) and 1.11 name (runtimeFilters) — a tactical
fallback to be replaced with proper per-version shim methods.
- GpuSparkBatch: same tolerance for expectedSchema (1.10) vs projection
(1.11).
- GpuSparkWrite: type-annotate `new Array[DataFile](0)` so Scala 2.13
doesn't infer Array[Nothing] under 1.11's wildcarded
DataWriteResult.dataFiles().
- IcebergShimUtils / ShimUtils: add newCopyOnWriteScan(Scan, ...) factory
whose parameter is Spark's public Scan because Iceberg's
SparkCopyOnWriteScan is package-private — cross-package callers cannot
reference it directly.
Per-Iceberg-version module:
- New GpuSparkCopyOnWriteScan in org.apache.iceberg.spark.source (so it
can reference the package-private SparkCopyOnWriteScan). Companion
object exposes create(Scan, ...): GpuScan for cross-package callers.
1.6/1.9/1.10 mix in SupportsRuntimeFiltering + filter(Filter[]).
- ShimUtilsImpl.java: implement newCopyOnWriteScan via
GpuSparkCopyOnWriteScan.create.
Signed-off-by: Chong Gao <res_life@163.com>
Iceberg published iceberg-spark-runtime-4.1_2.13 starting at version
1.11.0; iceberg-1-10-x is not an option on Spark 4.1 because Iceberg
never released a 4.1 runtime before 1.11. Adds a new Maven submodule
iceberg/iceberg-1-11-x and switches the release411 profile from
iceberg/iceberg-stub to use it.
Module skeleton (mirrors iceberg-1-10-x with the iceberg111x sub-package
and a spark411 shim source dir):
- iceberg/iceberg-1-11-x/pom.xml + scala2.13 mirror
- iceberg111x/IcebergProviderImpl, ShimUtilsImpl, GpuParquetIOShim
- org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan (Iceberg 1.11
copy-on-write scan: SupportsRuntimeV2Filtering with filter(Predicate[]))
- spark411/.../GpuInternalRow overriding the new SpecializedGetters
methods Spark 4.1 added (getGeometry, getGeography) alongside the
existing getVariant
Wiring:
- parent pom: add spark41x.iceberg.artifact.suffix=4.1 and
iceberg.111x.version=1.11.0 properties; swap release411 from
iceberg-stub to iceberg-1-11-x with the 4.1 runtime suffix
- scala2.13 mirror regenerated via build/make-scala-version-build-files.sh
- IcebergProbeImpl: lift the < 4.1.0 Spark cap to < 4.2.0; add 1.11.0
commit-id mapping and "1.11" -> "iceberg111x" shim sub-package
- README: document the new row in the Iceberg/Spark support matrix
Integration tests: enable iceberg suite on Spark 4.1.x:
- spark_session.py: add is_spark_41x() helper and include 4.1.x in
is_iceberg_supported_spark() so the @iceberg-marked tests no longer
skip on 4.1.
- iceberg/__init__.py: update the skip reason to mention 4.1.x.
- iceberg_test.py::test_iceberg_read_timetravel: Iceberg 1.11 removed
the `.option("snapshot-id", ...)` read path and directs users at
Spark's built-in `versionAsOf`. Switching to versionAsOf works on both
1.10 and 1.11.
Signed-off-by: Chong Gao <res_life@163.com>
Iceberg 1.11 split SparkBatchQueryScan into a new class hierarchy and
the incremental-append query path (.option("start-snapshot-id", ...) +
.option("end-snapshot-id", ...)) moved into a brand-new class
org.apache.iceberg.spark.source.SparkIncrementalAppendScan
(package-private, 1.11-only, extends SparkRuntimeFilterableScan).
Before 1.11 the same query path went through SparkBatchQueryScan and
was matched by the existing batch-query ScanRule in IcebergProviderBase.
Without a rule for the new class, the leaf falls back to CPU and the
GpuTransitionOverrides partial-columnar assertion fires, breaking
test_iceberg_incremental_read.
Adds:
- GpuSparkIncrementalAppendScan in org.apache.iceberg.spark.source —
mirrors GpuSparkBatchQueryScan since both extend SparkRuntimeFilterableScan
(a SparkPartitioningAwareScan<PartitionScanTask>). Companion object
exposes create(Scan, ...) for cross-package callers.
- iceberg111x.IcebergProviderImpl overrides getScans to register a third
ScanRule for SparkIncrementalAppendScan on top of the base provider's
two rules. The CPU class is loaded by string since it is
package-private and not directly referenceable from outside
org.apache.iceberg.spark.source.
Signed-off-by: Chong Gao <res_life@163.com>
e2a0367 to
d24b3f7
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked work for #14853 (3/3) — restores GPU acceleration for the incremental-append read path on Iceberg 1.11. Stacked on top of #14882 — please review only the latest commit; the diff vs. `main` includes #14881 and #14882's changes too.
Description
Iceberg 1.11 split `SparkBatchQueryScan` into a new class hierarchy and the incremental-append query path (`.option("start-snapshot-id", ...)` + `.option("end-snapshot-id", ...)`) moved into a brand-new class `org.apache.iceberg.spark.source.SparkIncrementalAppendScan` (package-private, 1.11-only, extends `SparkRuntimeFilterableScan`).
Before 1.11 the same query path went through `SparkBatchQueryScan` and was matched by the existing batch-query `ScanRule` in `IcebergProviderBase`. Without a rule for the new class, the leaf falls back to CPU and the `GpuTransitionOverrides` partial-columnar assertion fires, breaking `test_iceberg_incremental_read` (3 variants: PERFILE / MULTITHREADED / COALESCING).
Adds:
Checklists
Documentation
Testing
(`integration_tests/.../iceberg/iceberg_test.py::test_iceberg_incremental_read` — 3 variants for PERFILE / MULTITHREADED / COALESCING. These tests already existed and were exercising the equivalent code path on 1.10; without this PR they fail on Spark 4.1.1 + Iceberg 1.11.0.)
Performance