Skip to content

Conversation

@YannByron
Copy link
Contributor

Purpose

to introduce spark engine for #155

This is the first pr, which includes:

  1. to introduce the basic spark architecture, fluss-spark-common, fluss-spark-ut and fluss-spark-3.x included.
  2. to support spark catalog and table, based on spark 3.5 and 3.4 for now.
  3. to support spark CI.

Linked issue: close #228

Brief change log

Tests

API and Format

Documentation

@YannByron
Copy link
Contributor Author

@wuchong please take a look, thanks.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @YannByron for the great work! I’ve left a few comments for consideration.

Additionally, it would be great if we could add Javadoc or explanatory comments to the key classes and methods, this would greatly improve readability and maintainability for future contributors.

<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs} -Dio.netty.tryReflectionSetAccessible=true</argLine>
<filereports>PaimonTestSuite.txt</filereports>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlussTestSuite.txt?

<version>0.9-SNAPSHOT</version>
</parent>

<artifactId>fluss-spark-common</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a Scala version suffix to the artifact ID (also for fluss-spark-3.4 and fluss-spark-3.5 modules)? This would ensure that the published JARs automatically include the Scala version in their artifact names during Maven deployment, following standard Scala cross-build conventions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'm just going to do this until scala 2.13 or spark4 is supported. Let me do this in this pr.

pom.xml Outdated
Comment on lines 467 to 479
<profile>
<id>spark3</id>
<modules>
<module>fluss-spark/fluss-spark-3.5</module>
<module>fluss-spark/fluss-spark-3.4</module>
</modules>
<activation>
<activeByDefault>true</activeByDefault>
<property>
<name>spark3</name>
</property>
</activation>
</profile>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can enable these modues by default? So that the license checker pipeline can verify these modules as well.


import scala.collection.JavaConverters._

class FlussCatalog extends TableCatalog with SupportsFlussNamespaces with WithFlussAdmin {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about naming it SparkCatalog? Since these catalog implementations reside in the Fluss repository alongside those for other engines (such as Flink and Trino), including the engine name in the class name would make it easier to identify and distinguish between them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

import org.apache.fluss.metadata.TableInfo
import org.apache.fluss.spark.catalog.{FlussTableInfo, SupportsFlussPartitionManagement}

case class FlussTable(table: TableInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. SparkTable?

SparkDataTypes.createMapType(
mapType.getKeyType.accept(this),
mapType.getValueType.accept(this),
mapType.isNullable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapType.getValueType.isNullable


import scala.collection.JavaConverters._

object FlussDataTypeToSparkDataType extends DataTypeVisitor[SparkDataType] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlussDataTypeToSparkDataType -> FlussToSparkTypeVisitor to align with SparkToFlussTypeVisitor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. And the other sub-classes of DataTypeVisitor can also follow this rule.


val (tableProps, customProps) =
caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition {
case (key, _) => FlussConfigUtils.TABLE_OPTIONS.containsKey(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlussConfigUtils.TABLE_OPTIONS is a static set, however, the fluss table created by newer client version may carry additional table options. Therefore, it would be more robust to check whether the config key start with table. prefix.

.scalafmt.conf Outdated
Comment on lines 50 to 51
["org.apache.paimon\\..*"],
["org.apache.paimon.shade\\..*"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to fluss package

.field("pt", DataTypes.STRING())
.build())
assertThat(testPartitionedTable.getPartitionKeys.get(0)).isEqualTo("pt")
assertThat(testPartitionedTable.getCustomProperties.containsKey("key")).isEqualTo(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the key to verify?

@YannByron YannByron force-pushed the fluss-spark branch 2 times, most recently from 6c21e44 to 4f8e790 Compare December 26, 2025 10:28
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I rebased the branch to trigger test on latest main and appended a commit to mapping Spark BinaryType to Fluss BytesType, because the BinaryType in Fluss is a fixed size binary not a VarbinaryType.

@wuchong wuchong merged commit 5ed3b37 into apache:main Dec 26, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support spark catalog(base spark 3.3.3)

2 participants