Skip to content

Commit 065e0f4

Browse files
committed
feat: route structured-text functions through codegen dispatcher
Register the CSV / JSON / XPath / XML structured-text functions that previously fell back to Spark so they stay native via the codegen dispatcher. None have a native (rust) implementation; they extend Spark's CodegenFallback. - from_csv, schema_of_csv, schema_of_json, json_object_keys, xpath/xpath_* - from_xml, to_xml, schema_of_xml (Spark 4.0+ only) On Spark 3.4/3.5 these are plain expressions, registered directly in the serde maps. On Spark 4.x they are RuntimeReplaceable and the optimizer rewrites them to Invoke(evaluator)/StaticInvoke before Comet sees the plan, so they are dispatched from CometExprShim4x.convertStructuredText, which matches the backing evaluators by simple name to stay robust across 4.0/4.1/4.2. When the dispatcher is disabled they fall back to Spark. Adds CometStructuredTextSuite (XML tests gated to Spark 4.0+). Verified on the spark-3.4, 3.5, 4.0, 4.1, and 4.2 profiles.
1 parent 4f8791e commit 065e0f4

12 files changed

Lines changed: 355 additions & 8 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ jobs:
386386
org.apache.comet.CometCodegenSuite
387387
org.apache.comet.CometCodegenSourceSuite
388388
org.apache.comet.CometCodegenHOFSuite
389+
org.apache.comet.CometStructuredTextSuite
389390
org.apache.comet.CometFuzzMathSuite
390391
org.apache.comet.CometCodegenFuzzSuite
391392
fail-fast: false

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ jobs:
202202
org.apache.comet.CometCodegenSuite
203203
org.apache.comet.CometCodegenSourceSuite
204204
org.apache.comet.CometCodegenHOFSuite
205+
org.apache.comet.CometStructuredTextSuite
205206
org.apache.comet.CometFuzzMathSuite
206207
org.apache.comet.CometCodegenFuzzSuite
207208

docs/source/user-guide/latest/expressions.md

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,9 @@ Comet focuses acceleration on mainstream relational, string, datetime, math, and
5252
expressions. The following function families are **not currently planned** for native acceleration (they are not on the 1.0 roadmap): specialized functionality with narrow real-world analytics use and high implementation cost. They fall back to Spark and may be reconsidered based on demand:
5353

5454
- **Probabilistic sketches and approximate top-k** (`kll_sketch_*`, `hll_*`, `theta_*`, `count_min_sketch`, `bitmap_*`, `approx_top_k*`): specialized data structures with exact-correctness traps.
55-
- **XML / XPath** (`from_xml`, `to_xml`, `schema_of_xml`, `xpath*`): legacy text format, rare in accelerated workloads.
5655
- **Geospatial** (`st_*`): brand-new Spark 4.1 functionality, specialized.
5756
- **Avro / Protobuf codecs** (`from_avro`, `to_avro`, `from_protobuf`, `to_protobuf`, `schema_of_avro`): format conversion belongs at the IO layer, not expression evaluation.
5857
- **JVM reflection** (`java_method`, `reflect`): niche, and they invoke arbitrary JVM methods (a security concern).
59-
- **CSV functions** (`from_csv`, `to_csv`, `schema_of_csv`): row-level CSV parsing and formatting in expressions is niche and better handled at the data source layer.
6058
- **UTF-8 validation** (`is_valid_utf8`, `make_valid_utf8`, `validate_utf8`, `try_validate_utf8`): niche Spark 4.x string-validation helpers.
6159
- **File metadata** (`input_file_name`, `input_file_block_start`, `input_file_block_length`): require scan-internal per-row file information, outside the expression layer.
6260
- **Miscellaneous niche** (`histogram_numeric`, `version`, `sentences`, `quote`): low-value or specialized functions with little benefit from native acceleration.
@@ -220,6 +218,16 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci
220218

221219
---
222220

221+
## csv_funcs
222+
223+
| Function | Status | Notes |
224+
| --- | --- | --- |
225+
| `from_csv` || |
226+
| `schema_of_csv` || |
227+
| `to_csv` || |
228+
229+
---
230+
223231
## datetime_funcs
224232

225233
| Function | Status | Notes |
@@ -339,9 +347,9 @@ expression-level). The `outer` variants are wired but marked `Incompatible`; the
339347
| `from_json` || Falls back by default; opt-in via allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#from_json)) |
340348
| `get_json_object` || Some inputs need allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#get_json_object)) |
341349
| `json_array_length` || Single-quoted/trailing JSON needs allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#json_array_length)) |
342-
| `json_object_keys` | 🔜 | [#3161](https://github.com/apache/datafusion-comet/issues/3161) |
350+
| `json_object_keys` | | |
343351
| `json_tuple` | 🔜 | [#3160](https://github.com/apache/datafusion-comet/issues/3160) |
344-
| `schema_of_json` | 🔜 | [#3163](https://github.com/apache/datafusion-comet/issues/3163) |
352+
| `schema_of_json` | | |
345353
| `to_json` || Options and map/array inputs fall back ([audit](../../contributor-guide/expression-audits/json_funcs.md#to_json)) |
346354

347355
---
@@ -639,6 +647,25 @@ fall back to Spark.
639647

640648
---
641649

650+
## xml_funcs
651+
652+
| Function | Status | Notes |
653+
| --- | --- | --- |
654+
| `from_xml` || Spark 4.0+ |
655+
| `schema_of_xml` || Spark 4.0+ |
656+
| `to_xml` || Spark 4.0+ |
657+
| `xpath` || |
658+
| `xpath_boolean` || |
659+
| `xpath_double` || |
660+
| `xpath_float` || |
661+
| `xpath_int` || |
662+
| `xpath_long` || |
663+
| `xpath_number` || Alias of `xpath_double` |
664+
| `xpath_short` || |
665+
| `xpath_string` || |
666+
667+
---
668+
642669
## Beyond SQL functions
643670

644671
Comet also accelerates a number of Catalyst expressions that have no Spark SQL function name and therefore do not appear in the tables above. These arise from the DataFrame API, from SQL syntax other than function calls, or from the query optimizer. They include:

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.catalyst.expressions._
2929
import org.apache.spark.sql.catalyst.expressions.aggregate._
3030
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
31+
import org.apache.spark.sql.catalyst.expressions.xml.{XPathBoolean, XPathDouble, XPathFloat, XPathInt, XPathList, XPathLong, XPathShort, XPathString}
3132
import org.apache.spark.sql.comet.DecimalPrecision
3233
import org.apache.spark.sql.execution.{ScalarSubquery, SparkPlan}
3334
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
@@ -273,7 +274,22 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
273274
classOf[Cast] -> CometCast)
274275

275276
private val jsonExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
276-
classOf[LengthOfJsonArray] -> CometLengthOfJsonArray)
277+
classOf[LengthOfJsonArray] -> CometLengthOfJsonArray,
278+
classOf[SchemaOfJson] -> CometSchemaOfJson,
279+
classOf[JsonObjectKeys] -> CometJsonObjectKeys)
280+
281+
private val csvExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
282+
Map(classOf[CsvToStructs] -> CometCsvToStructs, classOf[SchemaOfCsv] -> CometSchemaOfCsv)
283+
284+
private val xpathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
285+
classOf[XPathBoolean] -> CometXPathBoolean,
286+
classOf[XPathShort] -> CometXPathShort,
287+
classOf[XPathInt] -> CometXPathInt,
288+
classOf[XPathLong] -> CometXPathLong,
289+
classOf[XPathFloat] -> CometXPathFloat,
290+
classOf[XPathDouble] -> CometXPathDouble,
291+
classOf[XPathString] -> CometXPathString,
292+
classOf[XPathList] -> CometXPathList)
277293

278294
private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
279295
// TODO PromotePrecision
@@ -301,7 +317,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
301317
mathExpressions ++ hashExpressions ++ stringExpressions ++
302318
conditionalExpressions ++ mapExpressions ++ predicateExpressions ++
303319
structExpressions ++ bitwiseExpressions ++ miscExpressions ++ arrayExpressions ++
304-
temporalExpressions ++ conversionExpressions ++ urlExpressions ++ jsonExpressions
320+
temporalExpressions ++ conversionExpressions ++ urlExpressions ++ jsonExpressions ++
321+
csvExpressions ++ xpathExpressions
305322

306323
/**
307324
* Mapping of Spark aggregate expression class to Comet expression handler.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{CsvToStructs, SchemaOfCsv}
23+
24+
// `from_csv` and `schema_of_csv` have no native (rust) implementation; they run through the
25+
// codegen dispatcher. See `CometStructuredTextSuite`.
26+
object CometCsvToStructs extends CometCodegenDispatch[CsvToStructs]
27+
28+
object CometSchemaOfCsv extends CometCodegenDispatch[SchemaOfCsv]

spark/src/main/scala/org/apache/comet/serde/json.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.comet.serde
2121

22-
import org.apache.spark.sql.catalyst.expressions.{Attribute, LengthOfJsonArray}
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, JsonObjectKeys, LengthOfJsonArray, SchemaOfJson}
2323

2424
import org.apache.comet.CometConf
2525
import org.apache.comet.serde.ExprOuterClass.Expr
@@ -46,3 +46,9 @@ object CometLengthOfJsonArray extends CometCodegenDispatch[LengthOfJsonArray] {
4646
super.convert(expr, inputs, binding)
4747
}
4848
}
49+
50+
// `schema_of_json` and `json_object_keys` have no native (rust) implementation; they run through
51+
// the codegen dispatcher. See `CometStructuredTextSuite`.
52+
object CometSchemaOfJson extends CometCodegenDispatch[SchemaOfJson]
53+
54+
object CometJsonObjectKeys extends CometCodegenDispatch[JsonObjectKeys]
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.xml.{XPathBoolean, XPathDouble, XPathFloat, XPathInt, XPathList, XPathLong, XPathShort, XPathString}
23+
24+
// The `xpath`/`xpath_*` functions have no native (rust) implementation; they run through the
25+
// codegen dispatcher. See `CometStructuredTextSuite`.
26+
object CometXPathBoolean extends CometCodegenDispatch[XPathBoolean]
27+
28+
object CometXPathShort extends CometCodegenDispatch[XPathShort]
29+
30+
object CometXPathInt extends CometCodegenDispatch[XPathInt]
31+
32+
object CometXPathLong extends CometCodegenDispatch[XPathLong]
33+
34+
object CometXPathFloat extends CometCodegenDispatch[XPathFloat]
35+
36+
object CometXPathDouble extends CometCodegenDispatch[XPathDouble]
37+
38+
object CometXPathString extends CometCodegenDispatch[XPathString]
39+
40+
object CometXPathList extends CometCodegenDispatch[XPathList]

spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ trait CometExprShim extends CommonStringExprs with CometExprShim4x {
5959
inputs: Seq[Attribute],
6060
binding: Boolean): Option[Expr] = {
6161
expr match {
62+
// RuntimeReplaceable structured-text functions (schema_of_csv/json, json_object_keys,
63+
// xpath_*, schema_of_xml) and from_xml/to_xml route through the codegen dispatcher; see
64+
// CometExprShim4x.convertStructuredText. Guarded so non-structured-text Invoke/StaticInvoke
65+
// nodes still reach their existing handlers below.
66+
case e if isStructuredTextDispatch(e) =>
67+
convertStructuredText(e, inputs, binding)
68+
6269
case knc: KnownNotContainsNull =>
6370
// On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)).
6471
// Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact.

spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ trait CometExprShim extends CommonStringExprs with CometExprShim4x {
5959
inputs: Seq[Attribute],
6060
binding: Boolean): Option[Expr] = {
6161
expr match {
62+
// RuntimeReplaceable structured-text functions (schema_of_csv/json, json_object_keys,
63+
// xpath_*, schema_of_xml) and from_xml/to_xml route through the codegen dispatcher; see
64+
// CometExprShim4x.convertStructuredText. Guarded so non-structured-text Invoke/StaticInvoke
65+
// nodes still reach their existing handlers below.
66+
case e if isStructuredTextDispatch(e) =>
67+
convertStructuredText(e, inputs, binding)
68+
6269
case knc: KnownNotContainsNull =>
6370
// On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)).
6471
// Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact.

spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ trait CometExprShim extends CommonStringExprs with CometExprShim4x {
5959
inputs: Seq[Attribute],
6060
binding: Boolean): Option[Expr] = {
6161
expr match {
62+
// RuntimeReplaceable structured-text functions (schema_of_csv/json, json_object_keys,
63+
// xpath_*, schema_of_xml) and from_xml/to_xml route through the codegen dispatcher; see
64+
// CometExprShim4x.convertStructuredText. Guarded so non-structured-text Invoke/StaticInvoke
65+
// nodes still reach their existing handlers below.
66+
case e if isStructuredTextDispatch(e) =>
67+
convertStructuredText(e, inputs, binding)
68+
6269
case knc: KnownNotContainsNull =>
6370
// On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)).
6471
// Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact.

0 commit comments

Comments
 (0)