Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
e6e374f
save
huan233usc Oct 9, 2025
9560950
save
huan233usc Oct 9, 2025
8b9fda1
save
huan233usc Oct 9, 2025
4f45983
save
huan233usc Oct 9, 2025
d6dc7ef
fix
huan233usc Oct 9, 2025
3a6472b
fix
huan233usc Oct 9, 2025
b55bfaa
fix
huan233usc Oct 10, 2025
29a9dbe
fix
huan233usc Oct 10, 2025
5cbf64f
fix
huan233usc Oct 10, 2025
9a6acdd
fix
huan233usc Oct 10, 2025
f8d4862
fix
huan233usc Oct 10, 2025
6c7c972
fix
huan233usc Oct 10, 2025
565f9cb
fix
huan233usc Oct 10, 2025
74a1f5c
fix
huan233usc Oct 10, 2025
c6c306d
fix
huan233usc Oct 10, 2025
dbf686b
fix
huan233usc Oct 10, 2025
2929d72
fix
huan233usc Oct 10, 2025
642a42a
fix
huan233usc Oct 11, 2025
e485cda
fix
huan233usc Oct 11, 2025
f6ef579
fix
huan233usc Oct 11, 2025
5f0dbc8
fix
huan233usc Oct 11, 2025
812ba1d
fix
huan233usc Oct 11, 2025
7dd8d18
fix
huan233usc Oct 11, 2025
1658b8f
fix
huan233usc Oct 12, 2025
743ca99
fix
huan233usc Oct 12, 2025
33f484e
Fix test working directory: use baseDirectory instead of Test/baseDir…
huan233usc Oct 12, 2025
6f833ef
Refactor: use delta-spark-v1's baseDirectory directly for better clarity
huan233usc Oct 12, 2025
5c05ad4
Fix: use delta-spark-v1's baseDirectory for all test paths
huan233usc Oct 12, 2025
fb98c0c
Add debug output for Test/javaOptions user.dir to diagnose GitHub Act…
huan233usc Oct 12, 2025
8c4dd7e
Add more debug output for forkOptions working directory
huan233usc Oct 12, 2025
a0af2a9
Fix: remove duplicate javaOptions - only add user.dir
huan233usc Oct 12, 2025
57312d9
Fix: TestParallelization should use Test/baseDirectory for workingDir…
huan233usc Oct 12, 2025
e639b18
fix
huan233usc Oct 13, 2025
92b6326
fix
huan233usc Oct 13, 2025
1238ef8
Fix: avoid duplicate symlinks in connectClient test setup
huan233usc Oct 13, 2025
021582e
Simplify connectClient symlink fix - remove try-catch
huan233usc Oct 13, 2025
89206cf
Use local delta-spark-v1 in kernelDefaults tests
huan233usc Oct 13, 2025
ad155d1
try minimize change
huan233usc Oct 13, 2025
7864c77
fix test
huan233usc Oct 13, 2025
8716b18
fix test
huan233usc Oct 13, 2025
86c0186
fix test
huan233usc Oct 13, 2025
3a19590
fix test
huan233usc Oct 13, 2025
a000aa8
fix test
huan233usc Oct 13, 2025
163b123
fix test
huan233usc Oct 13, 2025
a63fd0d
fix test
huan233usc Oct 13, 2025
680787e
Merge from master and resolve conflicts
huan233usc Oct 15, 2025
6c3f89b
merge
huan233usc Oct 15, 2025
98295d3
save
huan233usc Oct 15, 2025
6fea63c
save
huan233usc Oct 15, 2025
ba9f416
save
huan233usc Oct 15, 2025
42d09ae
save
huan233usc Oct 15, 2025
94dcf97
revert to a working version
huan233usc Oct 16, 2025
4f07eb9
update
huan233usc Oct 16, 2025
758d35e
Merge master: resolve conflicts in build.sbt and StreamingHelperTest.…
huan233usc Oct 16, 2025
322cb0b
fix
huan233usc Oct 16, 2025
573ed9e
simplify
huan233usc Oct 16, 2025
85173cc
simplify
huan233usc Oct 16, 2025
b52d0a8
fix comments
huan233usc Oct 16, 2025
7860a01
fix import
huan233usc Oct 17, 2025
6650ff7
fix import
huan233usc Oct 17, 2025
68802d9
remove unnecessary change
huan233usc Oct 17, 2025
4f48696
remove unnecessary change
huan233usc Oct 17, 2025
3479837
fix comments
huan233usc Oct 17, 2025
9a039ab
fix test
huan233usc Oct 17, 2025
43ced15
fix test
huan233usc Oct 17, 2025
64d5723
simplify
huan233usc Oct 17, 2025
8cceaed
simplify
huan233usc Oct 17, 2025
8afe5fd
simplify
huan233usc Oct 17, 2025
ccfc1c3
resolve comment
huan233usc Oct 21, 2025
8f5d42e
comments
huan233usc Oct 21, 2025
e2820ee
comments
huan233usc Oct 21, 2025
0157dda
rename
huan233usc Oct 21, 2025
08f3401
Test: Add intentional duplicate class to verify detection mechanism
huan233usc Oct 21, 2025
754cebd
Format test files
huan233usc Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 243 additions & 63 deletions build.sbt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta;

/**
* Intentionally duplicate class in sparkV2 to test duplicate detection. This should cause a build
* failure when building the spark-unified module.
*/
public class DuplicateTestClass {
public static String getVersion() {
return "v2";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public void setUp(@TempDir File tempDir) {
new SparkConf()
.set("spark.sql.catalog.dsv2", "io.delta.kernel.spark.catalog.TestCatalog")
.set("spark.sql.catalog.dsv2.base_path", tempDir.getAbsolutePath())
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension")
.set(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
"org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog")
.setMaster("local[*]")
.setAppName("Dsv2BasicTest");
spark = SparkSession.builder().config(conf).getOrCreate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public static void setUpSparkAndEngine() {
SparkSession.builder()
.master("local[*]")
.appName("SparkKernelDsv2Tests")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
"org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog")
.getOrCreate();
defaultEngine = DefaultEngine.create(spark.sessionState().newHadoopConf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public void setUp(@TempDir File tempDir) {
new SparkConf()
.set("spark.sql.catalog.dsv2", "io.delta.kernel.spark.catalog.TestCatalog")
.set("spark.sql.catalog.dsv2.base_path", tempDir.getAbsolutePath())
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension")
.set(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
"org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog")
.setMaster("local[*]")
.setAppName("SparkGoldenTableTest");
spark = SparkSession.builder().config(conf).getOrCreate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testGetActiveCommitAtTime_pastTimestamp(@TempDir File tempDir) throw
.history()
.getActiveCommitAtTime(
timestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
false /* canReturnLastCommit */,
true /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -172,7 +172,7 @@ public void testGetActiveCommitAtTime_futureTimestamp_canReturnLast(@TempDir Fil
.history()
.getActiveCommitAtTime(
futureTimestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
true /* canReturnLastCommit */,
true /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testGetActiveCommitAtTime_futureTimestamp_notMustBeRecreatable(@Temp
.history()
.getActiveCommitAtTime(
futureTimestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
true /* canReturnLastCommit */,
false /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testGetActiveCommitAtTime_earlyTimestamp_canReturnEarliest(@TempDir
.history()
.getActiveCommitAtTime(
earlyTimestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
false /* canReturnLastCommit */,
true /* mustBeRecreatable */,
true /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -268,7 +268,7 @@ public void testGetActiveCommitAtTime_earlyTimestamp_notMustBeRecreatable_canRet
.history()
.getActiveCommitAtTime(
earlyTimestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
false /* canReturnLastCommit */,
false /* mustBeRecreatable */,
true /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -353,12 +353,19 @@ public void testCheckVersionExists(
deltaLog
.history()
.checkVersionExists(
versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange));
versionToCheck,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
mustBeRecreatable,
allowOutOfRange));
} else {
streamingHelper.checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange);
deltaLog
.history()
.checkVersionExists(versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange);
.checkVersionExists(
versionToCheck,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
mustBeRecreatable,
allowOutOfRange);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ trait AbstractTestUtils
.appName("Spark Test Writer for Delta Kernel")
.config("spark.master", "local")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
// Set this conf to empty string so that the golden tables generated
// using with the test-prefix (i.e. there is no DELTA_TESTING set) can still work
.config(DeltaSQLConf.TEST_DV_NAME_PREFIX.key, "")
Expand Down
5 changes: 4 additions & 1 deletion project/TestParallelization.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ object TestParallelization {
Test / testGroupingStrategy := {
val groupsCount = (Test / forkTestJVMCount).value
val shard = (Test / shardId).value
// Use regular baseDirectory for target directory (not Test/baseDirectory)
val baseJvmDir = baseDirectory.value
MinShardGroupDurationStrategy(groupsCount, baseJvmDir, shard, defaultForkOptions.value)
},
Expand Down Expand Up @@ -81,7 +82,9 @@ object TestParallelization {
javaHome = javaHome.value,
outputStrategy = outputStrategy.value,
bootJars = Vector.empty,
workingDirectory = Some(baseDirectory.value),
// Use Test/baseDirectory instead of baseDirectory to support modules where these differ
// (e.g. spark-combined module where Test/baseDirectory points to spark/ source directory)
workingDirectory = Some((Test / baseDirectory).value),
runJVMOptions = (Test / javaOptions).value.toVector,
connectInput = connectInput.value,
envVars = (Test / envVars).value
Expand Down
63 changes: 63 additions & 0 deletions spark-unified/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Delta Spark Unified Module

This module contains the final, published `delta-spark` JAR that unifies both:
- **V1 (DSv1)**: The traditional Delta Lake connector with full `DeltaLog` support
- **V2 (DSv2)**: The new Kernel-backed connector for improved performance

## Architecture

The unified module provides single entry points for both V1 and V2:
- `DeltaCatalog`: Extends `AbstractDeltaCatalog` from the `spark` module
- `DeltaSparkSessionExtension`: Extends `AbstractDeltaSparkSessionExtension` from the `spark` module

## Module Structure

```
spark-unified/ (This module - final published artifact)
├── src/main/java/
│ └── org.apache.spark.sql.delta.catalog.DeltaCatalog.java
└── src/main/scala/
└── io.delta.sql.DeltaSparkSessionExtension.scala

spark/ (sparkV1 - V1 implementation)
├── Core Delta Lake classes with DeltaLog
└── AbstractDeltaCatalog, AbstractDeltaSparkSessionExtension

kernel-spark/ (sparkV2 - V2 implementation)
└── Kernel-backed DSv2 connector
```

## How It Works

1. **sparkV1** (`spark/`): Contains production code for the V1 connector including DeltaLog
2. **sparkV1Filtered** (`spark-v1-shaded/`): Filtered version of V1 excluding DeltaLog, Snapshot, OptimisticTransaction, and actions.scala
3. **sparkV2** (`kernel-spark/`): Kernel-backed V2 connector that depends on sparkV1Filtered
4. **spark** (this module): Final JAR that merges V1 + V2 + storage classes

The final JAR includes:
- All classes from sparkV1, sparkV2, and storage
- Python files
- No internal module dependencies in the published POM

## Internal vs Published Modules

**Internal modules** (not published to Maven):
- `delta-spark-v1`
- `delta-spark-v1-filtered`
- `delta-spark-v2`

**Published module**:
- `delta-spark` (this module) - contains merged classes from all internal modules

## Build

The module automatically:
1. Merges classes from V1, V2, and storage modules
2. Detects duplicate classes (fails build if found)
3. Filters internal modules from POM dependencies
4. Exports as JAR to avoid classpath conflicts

## Testing

Tests are located in `spark/src/test/` and run against the combined JAR to ensure V1+V2 integration works correctly.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog;

/**
* Delta Catalog implementation that can delegate to both V1 and V2 implementations.
* This class sits in delta-spark (unified) module and can access:
* - V1: org.apache.spark.sql.delta.* (full version with DeltaLog)
* - V2: io.delta.kernel.spark.*
*/
public class DeltaCatalog extends AbstractDeltaCatalog {
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Delta Spark Session Extension that can register both V1 and V2 implementations.
* This class sits in delta-spark (unified) module and can access:
* - V1: org.apache.spark.sql.delta.* (full version with DeltaLog)
* - V2: io.delta.kernel.spark.*
*/
class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension {

/**
* NoOpRule for binary compatibility with Delta 3.3.0
* This class must remain here to satisfy MiMa checks
*/
class NoOpRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta;

/**
* Intentionally duplicate class in sparkV1 to test duplicate detection.
* This should cause a build failure when building the spark-unified module.
*/
public class DuplicateTestClass {
public static String getVersion() {
return "v1";
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ import org.apache.spark.sql.internal.SQLConf
*
* @since 0.4.0
*/
class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
// Legacy entry point class for backwards compatibility. Use DeltaSparkSessionExtension instead.
class LegacyDeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension

// Abstract base class that contains the core Delta Spark Session extension logic.
// This is extended by both LegacyDeltaSparkSessionExtension (V1-only) and
// DeltaSparkSessionExtension (unified V1+V2) in the spark-unified module.
class AbstractDeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (_, parser) =>
new DeltaSqlParser(parser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TimeTravel
import org.apache.spark.sql.delta.DataFrameUtils
import org.apache.spark.sql.delta.DeltaErrors.{TemporallyUnstableInputException, TimestampEarlierThanCommitRetentionException}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.catalog.IcebergTablePlaceHolder
import org.apache.spark.sql.delta.commands._
Expand Down Expand Up @@ -245,7 +245,7 @@ class DeltaAnalysis(session: SparkSession)
case _ =>
protocol
}
val newDeltaCatalog = new DeltaCatalog()
val newDeltaCatalog = new LegacyDeltaCatalog()
val existingTableOpt = newDeltaCatalog.getExistingTableIfExists(catalogTableTarget.identifier)
val newTable = newDeltaCatalog
.verifyTableAndSolidify(
Expand Down
17 changes: 7 additions & 10 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ package org.apache.spark.sql.delta
import java.io.{FileNotFoundException, IOException}
import java.nio.file.FileAlreadyExistsException
import java.util.{ConcurrentModificationException, UUID}

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBySpec}
import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.catalog.AbstractDeltaCatalog
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, DeltaGenerateCommand}
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.AutoCompactType
Expand All @@ -37,9 +35,8 @@ import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sql.DeltaSparkSessionExtension
import io.delta.sql.AbstractDeltaSparkSessionExtension
import org.apache.hadoop.fs.{ChecksumException, Path}

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -1881,10 +1878,10 @@ trait DeltaErrorsBase
val catalogImplConfig = SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key
new DeltaAnalysisException(
errorClass = "DELTA_CONFIGURE_SPARK_SESSION_WITH_EXTENSION_AND_CATALOG",
messageParameters = Array(classOf[DeltaSparkSessionExtension].getName,
catalogImplConfig, classOf[DeltaCatalog].getName,
classOf[DeltaSparkSessionExtension].getName,
catalogImplConfig, classOf[DeltaCatalog].getName),
messageParameters = Array(classOf[AbstractDeltaSparkSessionExtension].getName,
catalogImplConfig, classOf[AbstractDeltaCatalog].getName,
classOf[AbstractDeltaSparkSessionExtension].getName,
catalogImplConfig, classOf[AbstractDeltaCatalog].getName),
cause = originalException)
}

Expand Down
Loading
Loading