Skip to content

Commit a47d978

Browse files
schenksjclaude
andcommitted
feat(contrib): SPI surface additions for contribs that need core helpers
Format-agnostic surface additions PR2's Delta port (and any future file-format contrib) needs without duplicating ~600 lines of CometScanRule's existing schema-check / encryption-gate / marker-dispatch helpers. No specific format is named anywhere in core; contribs register their own tags through the SPI. 1. CometScanRule.isSchemaSupported widened from private to private[comet] so contrib extensions under org.apache.comet.contrib.* can reuse the same schema-check + fallback-reason emission rather than duplicating the 25-line body. Zero behaviour change for in-tree callers. 2. CometOperatorSerdeExtension.matchOperator new default-Some-or-None method for predicate-based serde dispatch. The class-keyed `serdes` map can't disambiguate a marker pattern like `CometScanExec(scanImpl="<contrib-specific-tag>")` (the class is shared with core's generic CometScanExec). Contribs using such markers override matchOperator. Backwards compatible: existing contribs that only populate `serdes` see no change. `serdes` now defaults to Map.empty so contribs that ONLY use matchOperator don't need to override both. 3. CometExecRule three-step dispatch: allExecs (core class map) -> mergedSerdes (contrib class map) -> matchOperator iteration (contrib predicate). First Some wins; multiple extensions' matchOperator results are tried in registration order. 4. CometOperatorSerdeExtension.nativeParquetScanImpls new default-Set.empty method. Contribs that use the CometScanExec marker pattern AND go through Comet's tuned ParquetSource declare their scanImpl tag(s) here. CometScanExec.supportedDataFilters consults the merged set (via CometExtensionRegistry.nativeParquetScanImpls) to decide whether to apply native-parquet filter exclusions. Core no longer needs to hard-code any contrib's tag name. 5. CometExtensionRegistry.nativeParquetScanImpls publishes the merged tag set at load() time. Populated/reset alongside mergedSerdesCache under the same monitor. Contributor guide updated with the matchOperator + nativeParquetScanImpls patterns and explicit guidance that contribs define their own scanImpl strings in their own code -- core's CometConf only carries SCAN_NATIVE_DATAFUSION / SCAN_NATIVE_ICEBERG_COMPAT for core's own variants. Verified - cargo check (default features): green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 29f685c commit a47d978

6 files changed

Lines changed: 125 additions & 8 deletions

File tree

docs/source/contributor-guide/contrib-extensions.md

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -277,9 +277,10 @@ trait CometOperatorSerdeExtension {
277277
}
278278
```
279279

280-
Contribs that need a custom physical operator (e.g., a contrib-specific scan exec
281-
carrying contrib-private state) define their own `SparkPlan` subclass and register a
282-
serde keyed on the new class:
280+
Two dispatch shapes are supported:
281+
282+
**Class-keyed** — the contrib defines its own `SparkPlan` subclass (typical for
283+
operator-style contribs):
283284

284285
```scala
285286
case class CometMyFormatScanExec(...) extends CometNativeExec { /* ... */ }
@@ -296,8 +297,42 @@ The merged map across all extensions is computed once at registry load time;
296297
contribs are logged as a warning at load — the convention is **one contrib defines a
297298
class, that contrib owns its serde**.
298299

299-
Avoid relying on the legacy `scanImpl: String` tag pattern on a generic `CometScanExec`
300-
— the SPI dispatches by class, not by tag.
300+
**Predicate-keyed (marker-class with scanImpl tag)** — required when the contrib uses
301+
core's `CometScanExec` as a marker disambiguated by a `scanImpl` string. `CometScanExec`
302+
is a Scala case class shared with core, so two contribs marking different tag values
303+
on the same class would otherwise collide. Override `matchOperator` instead of (or in
304+
addition to) populating `serdes`, and declare your tag(s) via `nativeParquetScanImpls`
305+
if your scan goes through Comet's tuned ParquetSource:
306+
307+
```scala
308+
class MyFormatSerdeExtension extends CometOperatorSerdeExtension {
309+
override def name: String = "myformat"
310+
311+
// Your contrib's scanImpl marker. Pick a stable string; no central registry of these
312+
// exists in core, but conventionally contribs use snake-case like "native_<name>_compat".
313+
private val MyScanImpl = "native_myformat_compat"
314+
315+
override def matchOperator(op: SparkPlan): Option[CometOperatorSerde[_]] = op match {
316+
case s: CometScanExec if s.scanImpl == MyScanImpl => Some(CometMyFormatScan)
317+
case _ => None
318+
}
319+
320+
// Tell core's CometScanExec.supportedDataFilters to apply DataFusion-style filter
321+
// exclusions to this tag. Required when your scan goes through Comet's tuned
322+
// ParquetSource (the same path SCAN_NATIVE_DATAFUSION uses).
323+
override def nativeParquetScanImpls: Set[String] = Set(MyScanImpl)
324+
}
325+
```
326+
327+
`CometExecRule` checks `matchOperator` only after the class-keyed `serdes` map misses,
328+
so the two patterns coexist. Multiple registered extensions' `matchOperator` calls are
329+
tried in registration order; the first `Some` wins.
330+
331+
Core's CometConf defines `SCAN_NATIVE_DATAFUSION` / `SCAN_NATIVE_ICEBERG_COMPAT` for
332+
core's own scan variants. Contribs are expected to define their own scanImpl strings
333+
inside their own code (not in `CometConf`); registering via `nativeParquetScanImpls`
334+
is the SPI hook that lets `CometScanExec.supportedDataFilters` apply the right filter
335+
treatment without core needing to know the contrib's tag name.
301336

302337
##### `CometOperatorSerde[T <: SparkPlan]` contract
303338

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,20 @@ case class CometExecRule(session: SparkSession)
357357
// that aren't in `allExecs`, so this merge never overrides a core mapping in
358358
// practice; duplicate-class detection at load() time logs a warning if it
359359
// does happen.
360+
// Three-step dispatch:
361+
// 1. core's built-in class-keyed map (allExecs)
362+
// 2. contrib serde-extensions' class-keyed map (mergedSerdes)
363+
// 3. contrib serde-extensions' predicate-based matchOperator hook
364+
// (for marker-class patterns where one shared SparkPlan class --
365+
// e.g. CometScanExec -- is disambiguated by a runtime tag)
360366
val handler = allExecs
361367
.get(op.getClass)
362368
.orElse(CometExtensionRegistry.mergedSerdes.get(op.getClass))
369+
.orElse {
370+
CometExtensionRegistry.serdeExtensions.iterator
371+
.flatMap(_.matchOperator(op))
372+
.nextOption()
373+
}
363374
.map(_.asInstanceOf[CometOperatorSerde[SparkPlan]])
364375
handler match {
365376
case Some(handler) =>

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,9 @@ case class CometScanRule(session: SparkSession)
780780
case _ => false
781781
}
782782

783-
private def isSchemaSupported(
783+
// private[comet] so contrib extensions (under org.apache.comet.contrib.*) can reuse
784+
// the same schema check + fallback-reason emission rather than duplicating the body.
785+
private[comet] def isSchemaSupported(
784786
scanExec: FileSourceScanExec,
785787
scanImpl: String,
786788
r: HadoopFsRelation): Boolean = {

spark/src/main/scala/org/apache/comet/spi/CometExtensionRegistry.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ object CometExtensionRegistry extends Logging {
7171
val newScanExts = loadOne[CometScanRuleExtension]("CometScanRuleExtension")
7272
val newSerdeExts = loadOne[CometOperatorSerdeExtension]("CometOperatorSerdeExtension")
7373
val newMerged = newSerdeExts.flatMap(_.serdes).toMap
74+
val newNativeParquetTags = newSerdeExts.flatMap(_.nativeParquetScanImpls).toSet
7475
// Publish the @volatile fields BEFORE flipping `loaded` so other threads either see
7576
// the empty defaults (and may re-enter -- benign, blocked by the monitor) or the
7677
// fully-populated state (and may skip -- also benign).
7778
scanExts = newScanExts
7879
serdeExts = newSerdeExts
7980
mergedSerdesCache = newMerged
81+
nativeParquetScanImplsCache = newNativeParquetTags
8082
loaded.set(true)
8183
if (newScanExts.nonEmpty || newSerdeExts.nonEmpty) {
8284
logInfo(
@@ -114,6 +116,16 @@ object CometExtensionRegistry extends Logging {
114116
: Map[Class[_ <: org.apache.spark.sql.execution.SparkPlan],
115117
org.apache.comet.serde.CometOperatorSerde[_]] = Map.empty
116118

119+
/**
120+
* Union of every registered extension's `nativeParquetScanImpls`. Consumed by
121+
* `CometScanExec.supportedDataFilters` to decide whether the marker scan's filter set
122+
* should get the same native-parquet exclusions as `SCAN_NATIVE_DATAFUSION`. Computed
123+
* once at `load()` time; empty until `load()` has run.
124+
*/
125+
def nativeParquetScanImpls: Set[String] = nativeParquetScanImplsCache
126+
127+
@volatile private var nativeParquetScanImplsCache: Set[String] = Set.empty
128+
117129
/**
118130
* Log a warning when two registered contribs claim the same `Class[_ <: SparkPlan]` for serde
119131
* dispatch. The convention documented in `contrib-extensions.md` is that each contrib defines
@@ -162,6 +174,7 @@ object CometExtensionRegistry extends Logging {
162174
scanExts = Seq.empty
163175
serdeExts = Seq.empty
164176
mergedSerdesCache = Map.empty
177+
nativeParquetScanImplsCache = Set.empty
165178
}
166179

167180
private def loadOne[T](label: String)(implicit ct: scala.reflect.ClassTag[T]): Seq[T] = {

spark/src/main/scala/org/apache/comet/spi/CometOperatorSerdeExtension.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,44 @@ trait CometOperatorSerdeExtension {
4949
* Convention: each contrib's mapping should reference only classes the contrib itself defines,
5050
* so two contribs never claim ownership of the same operator class.
5151
*/
52-
def serdes: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]]
52+
def serdes: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = Map.empty
53+
54+
/**
55+
* Predicate-based dispatch hook for contribs whose serde key cannot be expressed as a unique
56+
* `SparkPlan` class. The canonical case is the `CometScanExec` marker-with-`scanImpl`-tag
57+
* pattern: a contrib's `CometScanRuleExtension.transformV1` returns
58+
* `CometScanExec(scanExec, session, "native_delta_compat")`, but `CometScanExec` is a case
59+
* class shared with core, so a class-keyed map can't disambiguate by the tag. The contrib
60+
* overrides this method to inspect the plan and return its serde:
61+
*
62+
* {{{
63+
* override def matchOperator(op: SparkPlan): Option[CometOperatorSerde[_]] = op match {
64+
* case s: CometScanExec if s.scanImpl == CometConf.SCAN_NATIVE_DELTA_COMPAT =>
65+
* Some(CometDeltaNativeScan)
66+
* case _ => None
67+
* }
68+
* }}}
69+
*
70+
* `CometExecRule` consults `matchOperator` only after the class-keyed `serdes` map misses, so
71+
* contribs with a unique exec class never need to implement this. Multiple registered
72+
* extensions' `matchOperator` returns are tried in registration order; the first `Some` wins.
73+
*/
74+
def matchOperator(op: SparkPlan): Option[CometOperatorSerde[_]] = None
75+
76+
/**
77+
* Declares which `scanImpl` string tags this contrib produces from
78+
* `CometScanRuleExtension.transformV1` when using the `CometScanExec(marker, scanImpl=X)`
79+
* pattern. Tags listed here get `CometScanExec.supportedDataFilters`'s native-parquet
80+
* filter exclusions (drop dynamic pruning + IsNull/IsNotNull on ArrayType columns), the
81+
* same treatment `SCAN_NATIVE_DATAFUSION` receives.
82+
*
83+
* Override only if your contrib uses the marker-class pattern AND your native side goes
84+
* through Comet's tuned `ParquetSource`. Contribs that define their own `SparkPlan`
85+
* subclass (rather than reusing `CometScanExec`) don't need this; they control filter
86+
* selection themselves.
87+
*
88+
* Example: a Delta contrib that uses `CometScanExec(..., scanImpl="native_delta_compat")`
89+
* would override this to `Set("native_delta_compat")`.
90+
*/
91+
def nativeParquetScanImpls: Set[String] = Set.empty
5392
}

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,13 @@ case class CometScanExec(
159159
* on array columns (see [[isNullCheckOnArrayColumn]]).
160160
*/
161161
lazy val supportedDataFilters: Seq[Expression] = {
162-
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) {
162+
// Contribs that use the CometScanExec marker pattern with their own scanImpl
163+
// string can declare that their scan goes through Comet's tuned ParquetSource
164+
// (and therefore wants DataFusion-style filter exclusions) by registering the
165+
// tag via `CometOperatorSerdeExtension.nativeParquetScanImpls`. Core doesn't
166+
// need to know any contrib's marker name; the registry is the source of truth.
167+
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
168+
CometScanExec.contribNativeParquetScanImpls.contains(scanImpl)) {
163169
dataFilters
164170
.filterNot(isDynamicPruningFilter)
165171
.filterNot(isNullCheckOnArrayColumn)
@@ -534,6 +540,17 @@ case class CometScanExec(
534540

535541
object CometScanExec {
536542

543+
/**
544+
* Set of contrib-registered scanImpl tags whose CometScanExec should use Comet's
545+
* native-parquet filter exclusion semantics (drop dynamic pruning + IsNull/IsNotNull on
546+
* ArrayType columns). Populated lazily from
547+
* `CometExtensionRegistry.serdeExtensions.flatMap(_.nativeParquetScanImpls)`. Each access
548+
* re-reads the volatile field on `CometExtensionRegistry`; the cost is one HashSet
549+
* lookup per CometScanExec construction, which is dwarfed by Spark's own per-plan work.
550+
*/
551+
private[comet] def contribNativeParquetScanImpls: Set[String] =
552+
org.apache.comet.spi.CometExtensionRegistry.nativeParquetScanImpls
553+
537554
def apply(
538555
scanExec: FileSourceScanExec,
539556
session: SparkSession,

0 commit comments

Comments
 (0)