Skip to content

[Build] Run the Spark master PYTHON tests using the Spark 4.0 RC4 #4513

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions .github/workflows/spark_master_python_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
name: "Delta Spark Master Python"
on: [push, pull_request]
jobs:
test:
name: "DSP"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DSMP

runs-on: ubuntu-24.04
strategy:
matrix:
# These Scala versions must match those in the build.sbt
scala: [2.13.13]
env:
SCALA_VERSION: ${{ matrix.scala }}
steps:
- uses: actions/checkout@v3
- uses: technote-space/get-diff-action@v4
id: git-diff
with:
PATTERNS: |
**
.github/workflows/**
!kernel/**
!connectors/**
- name: install java
uses: actions/setup-java@v3
with:
distribution: "zulu"
java-version: "17"
- name: Cache Scala, SBT
uses: actions/cache@v3
with:
path: |
~/.sbt
~/.ivy2
~/.cache/coursier
!~/.cache/coursier/v1/https/repository.apache.org/content/groups/snapshots
# Change the key if dependencies are changed. For each key, GitHub Actions will cache the
# the above directories when we use the key for the first time. After that, each run will
# just use the cache. The cache is immutable so we need to use a new key when trying to
# cache new stuff.
key: delta-sbt-cache-spark-master-scala${{ matrix.scala }}
- name: Install Job dependencies
# TODO: update pyspark installation once Spark preview is formally released
run: |
sudo apt-get update
sudo apt-get install -y make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git
sudo apt install libedit-dev
curl -LO https://github.com/bufbuild/buf/releases/download/v1.28.1/buf-Linux-x86_64.tar.gz
mkdir -p ~/buf
tar -xvzf buf-Linux-x86_64.tar.gz -C ~/buf --strip-components 1
rm buf-Linux-x86_64.tar.gz
sudo apt install python3-pip --fix-missing
sudo pip3 install pipenv==2024.4.1
curl https://pyenv.run | bash
export PATH="~/.pyenv/bin:$PATH"
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"
pyenv install 3.9
pyenv global system 3.9
pipenv --python 3.9 install
# Update the pip version to 24.0. By default `pyenv.run` installs the latest pip version
# available. From version 24.1, `pip` doesn't allow installing python packages
# with version string containing `-`. In Delta-Spark case, the pypi package generated has
# `-SNAPSHOT` in version (e.g. `3.3.0-SNAPSHOT`) as the version is picked up from
# the`version.sbt` file.
pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0
pipenv run pip install flake8==3.9.0
pipenv run pip install black==23.9.1
pipenv run pip install mypy==1.8.0
pipenv run pip install mypy-protobuf==3.3.0
pipenv run pip install cryptography==37.0.4
pipenv run pip install twine==4.0.1
pipenv run pip install wheel==0.33.4
pipenv run pip install setuptools==41.1.0
pipenv run pip install pydocstyle==3.0.0
pipenv run pip install pandas==2.0.0
pipenv run pip install pyarrow==8.0.0
pipenv run pip install numpy==1.21
pipenv run pip install https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc4-bin//pyspark-4.0.0.tar.gz
if: steps.git-diff.outputs.diff
- name: Run Python tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_master_test.yaml
run: |
echo 'ThisBuild / version := "4.0.0-SNAPSHOT"' > version.sbt
TEST_PARALLELISM_COUNT=4 USE_SPARK_MASTER=true pipenv run python run-tests.py --group spark-python
if: steps.git-diff.outputs.diff
4 changes: 3 additions & 1 deletion .github/workflows/spark_python_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ jobs:
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
pipenv run pip install black==23.9.1
pipenv run pip install importlib_metadata==3.10.0
pipenv run pip install mypy==0.982
# The mypy versions 0.982 and 1.8.0 have conflicting rules (cannot get style checks to
# pass for both versions on the same file) so we upgrade this to match Spark 4.0
pipenv run pip install mypy==1.8.0
pipenv run pip install mypy-protobuf==3.3.0
pipenv run pip install cryptography==37.0.4
pipenv run pip install twine==4.0.1
Expand Down
2 changes: 1 addition & 1 deletion dev/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Linter
mypy==0.982
mypy==1.8.0
flake8==3.9.0

# Code Formatter
Expand Down
2 changes: 1 addition & 1 deletion examples/python/table_exists.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def exists(spark, filepath):
try:
spark.read.load(path=filepath, format="delta")
except AnalysisException as exception:
if "is not a Delta table" in exception.desc or "Path does not exist" in exception.desc:
if "is not a Delta table" in exception.getMessage() or "Path does not exist" in exception.getMessage():
return False
raise exception
return True
Expand Down
14 changes: 10 additions & 4 deletions project/Mima.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ object Mima {
def getPrevSparkName(currentVersion: String): String = {
val (major, minor, patch) = getMajorMinorPatch(currentVersion)
// name change in version 3.0.0, so versions > 3.0.0 should have delta-spark are prev version.
if (major >= 3 && (minor > 0 || patch > 0)) "delta-spark" else "delta-core"
if (major < 3 || (major == 3 && minor == 0 && patch == 0)) {
"delta-core"
} else {
"delta-spark"
}
}

def getPrevSparkVersion(currentVersion: String): String = {
Expand All @@ -53,9 +57,10 @@ object Mima {
val lastVersionInMajorVersion = Map(
0 -> "0.8.0",
1 -> "1.2.1",
2 -> "2.4.0"
2 -> "2.4.0",
3 -> "3.3.1"
)
if (minor == 0) { // 1.0.0 or 2.0.0 or 3.0.0
if (minor == 0) { // 1.0.0 or 2.0.0 or 3.0.0 or 4.0.0
lastVersionInMajorVersion.getOrElse(major - 1, {
throw new Exception(s"Last version of ${major - 1}.x.x not configured.")
})
Expand All @@ -73,7 +78,8 @@ object Mima {
// We skip from 0.6.0 to 3.0.0 when migrating connectors to the main delta repo
0 -> "0.6.0",
1 -> "0.6.0",
2 -> "0.6.0"
2 -> "0.6.0",
3 -> "3.3.1"
)
if (minor == 0) { // 1.0.0
majorToLastMinorVersions.getOrElse(major - 1, {
Expand Down
6 changes: 5 additions & 1 deletion python/delta/pip_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ def configure_spark_with_delta_pip(
'''
raise Exception(msg) from e

scala_version = "2.12"
if int(delta_version.split(".")[0]) >= 4:
# For Delta 4.0+ (thus Spark 4.0+) Scala 2.12 is not supported
scala_version = "2.13"
else:
scala_version = "2.12"
maven_artifact = f"io.delta:delta-spark_{scala_version}:{delta_version}"

extra_packages = extra_packages if extra_packages is not None else []
Expand Down
16 changes: 7 additions & 9 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
# limitations under the License.
#

# mypy: disable-error-code="union-attr"
# mypy: disable-error-code="attr-defined"
# type: ignore[union-attr]
# mypy: disable-error-code="union-attr, attr-defined"

import unittest
import os
Expand Down Expand Up @@ -159,7 +157,7 @@ def reset_table() -> None:
.whenNotMatchedBySourceUpdate(set={"value": "value + 0"}) \
.execute()
self.__checkAnswer(merge_output,
([Row(6, # affected rows
([Row(6, # type: ignore[call-overload]
4, # updated rows (a and b in WHEN MATCHED
# and c and d in WHEN NOT MATCHED BY SOURCE)
0, # deleted rows
Expand Down Expand Up @@ -502,7 +500,7 @@ def test_merge_with_inconsistent_sessions(self) -> None:
target_path = os.path.join(self.tempFile, "target")
spark = self.spark

def f(spark):
def f(spark): # type: ignore[no-untyped-def]
spark.range(20) \
.withColumn("x", col("id")) \
.withColumn("y", col("id")) \
Expand Down Expand Up @@ -542,7 +540,7 @@ def test_history(self) -> None:
[Row("Overwrite")],
StructType([StructField("operationParameters.mode", StringType(), True)]))

def test_cdc(self):
def test_cdc(self) -> None:
self.spark.range(0, 5).write.format("delta").save(self.tempFile)
deltaTable = DeltaTable.forPath(self.spark, self.tempFile)
# Enable Change Data Feed
Expand Down Expand Up @@ -976,7 +974,7 @@ def test_verify_paritionedBy_compatibility(self) -> None:
from pyspark.sql.column import _to_seq # type: ignore[attr-defined]
except ImportError:
# Spark 4
from pyspark.sql.classic.column import _to_seq # type: ignore[attr-defined]
from pyspark.sql.classic.column import _to_seq # type: ignore

with self.table("testTable"):
tableBuilder = DeltaTable.create(self.spark).tableName("testTable") \
Expand Down Expand Up @@ -1102,8 +1100,8 @@ def test_delta_table_builder_with_bad_args(self) -> None:
builder.addColumn(
"a",
"bigint",
generatedByDefaultAs=""
) # type: ignore[arg-type]
generatedByDefaultAs="" # type: ignore[arg-type]
)

# bad generatedByDefaultAs - identity column data type must be Long
with self.assertRaises(UnsupportedOperationException):
Expand Down
3 changes: 2 additions & 1 deletion python/delta/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ def test_convert(self) -> None:
def test_ddls(self) -> None:
table = "deltaTable"
table2 = "deltaTable2"
with self.table(table, table2):
with self.table(table, table + "_part", table2):
def read_table() -> DataFrame:
return self.spark.sql(f"SELECT * FROM {table}")

self.spark.sql(f"DROP TABLE IF EXISTS {table}")
self.spark.sql(f"DROP TABLE IF EXISTS {table}_part")
self.spark.sql(f"DROP TABLE IF EXISTS {table2}")

self.spark.sql(f"CREATE TABLE {table}(a LONG, b String NOT NULL) USING delta")
Expand Down
41 changes: 28 additions & 13 deletions python/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
from os import path


def test(root_dir, package):
# Run all of the test under test/python directory, each of them
# has main entry point to execute, which is python's unittest testing
def test(root_dir, code_dir, packages):
# Test the codes in the code_dir directory using its "tests" subdirectory,
# each of them has main entry point to execute, which is python's unittest testing
# framework.
python_root_dir = path.join(root_dir, "python")
test_dir = path.join(python_root_dir, path.join("delta", "tests"))
test_dir = path.join(python_root_dir, path.join(code_dir, "tests"))
test_files = [os.path.join(test_dir, f) for f in os.listdir(test_dir)
if os.path.isfile(os.path.join(test_dir, f)) and
f.endswith(".py") and not f.startswith("_")]
extra_class_path = path.join(python_root_dir, path.join("delta", "testing"))
extra_class_path = path.join(python_root_dir, path.join(code_dir, "testing"))

for test_file in test_files:
try:
Expand All @@ -40,7 +40,7 @@ def test(root_dir, package):
"--repositories",
("https://maven-central.storage-download.googleapis.com/maven2/,"
"https://repo1.maven.org/maven2/"),
"--packages", package, test_file]
"--packages", ",".join(packages), test_file]
print("Running tests in %s\n=============" % test_file)
print("Command: %s" % str(cmd))
run_cmd(cmd, stream_output=True)
Expand All @@ -56,20 +56,30 @@ def delete_if_exists(path):
print("Deleted %s " % path)


def prepare(root_dir):
def prepare(root_dir, use_spark_master):
print("##### Preparing python tests & building packages #####")
# Build package with python files in it
sbt_path = path.join(root_dir, path.join("build", "sbt"))
delete_if_exists(os.path.expanduser("~/.ivy2/cache/io.delta"))
delete_if_exists(os.path.expanduser("~/.m2/repository/io/delta/"))
run_cmd([sbt_path, "clean", "sparkGroup/publishM2"], stream_output=True)
sbt_command = [sbt_path]
packages = ["spark/publishM2", "storage/publishM2"]
if use_spark_master:
sbt_command = sbt_command + ["-DsparkVersion=master"]
packages = packages + ["connectCommon/publishM2", "connectServer/publishM2"]
run_cmd(sbt_command + ["clean"] + packages, stream_output=True)


def get_local_package(package_name, use_spark_master):
# Get current release which is required to be loaded
version = '0.0.0'
with open(os.path.join(root_dir, "version.sbt")) as fd:
version = fd.readline().split('"')[1]
package = "io.delta:delta-spark_2.12:" + version
return package

if use_spark_master:
return f"io.delta:{package_name}_2.13:" + version
else:
return f"io.delta:{package_name}_2.12:" + version


def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, print_cmd=True, **kwargs):
Expand Down Expand Up @@ -179,10 +189,15 @@ def run_delta_connect_codegen_python(root_dir):
if __name__ == "__main__":
print("##### Running python tests #####")
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
package = prepare(root_dir)
use_spark_master = os.getenv("USE_SPARK_MASTER") or False
prepare(root_dir, use_spark_master)
delta_spark_package = get_local_package("delta-spark", use_spark_master)

run_python_style_checks(root_dir)
run_mypy_tests(root_dir)
run_pypi_packaging_tests(root_dir)
run_delta_connect_codegen_python(root_dir)
test(root_dir, package)
test(root_dir, "delta", [delta_spark_package])

# For versions 4.0+ run Delta Connect tests as well
if use_spark_master:
run_delta_connect_codegen_python(root_dir)
19 changes: 13 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ def get_version_from_sbt():


VERSION = get_version_from_sbt()
MAJOR_VERSION = int(VERSION.split(".")[0])

if MAJOR_VERSION < 4:
packages_arg = ['delta']
install_requires_arg = ['pyspark>=3.5.2,<3.6.0', 'importlib_metadata>=1.0.0']
python_requires_arg = '>=3.6'
else: # MAJOR_VERSION >= 4
# Delta 4.0+ contains Delta Connect code and uses Spark 4.0+
packages_arg = ['delta', 'delta.connect', 'delta.connect.proto']
install_requires_arg = ['pyspark>=4.0.0.dev1', 'importlib_metadata>=1.0.0']
python_requires_arg = '>=3.9'

class VerifyVersionCommand(install):
"""Custom command to verify that the git tag matches our version"""
Expand Down Expand Up @@ -60,15 +70,12 @@ def run(self):
],
keywords='delta.io',
package_dir={'': 'python'},
packages=['delta'],
packages=packages_arg,
package_data={
'delta': ['py.typed'],
},
install_requires=[
'pyspark>=3.5.3,<3.6.0',
'importlib_metadata>=1.0.0',
],
python_requires='>=3.6',
install_requires=install_requires_arg,
python_requires=python_requires_arg,
cmdclass={
'verify': VerifyVersionCommand,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.antlr.v4.runtime.ParserRuleContext
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.QueryContext

class DeltaAnalysisException(
errorClass: String,
Expand Down Expand Up @@ -60,6 +61,8 @@ class DeltaIllegalArgumentException(
override def getMessageParameters: java.util.Map[String, String] = {
DeltaThrowableHelper.getMessageParameters(errorClass, errorSubClass = null, messageParameters)
}

override def getQueryContext: Array[QueryContext] = new Array(0);
}

class DeltaUnsupportedOperationException(
Expand All @@ -74,6 +77,8 @@ class DeltaUnsupportedOperationException(
override def getMessageParameters: java.util.Map[String, String] = {
DeltaThrowableHelper.getMessageParameters(errorClass, errorSubClass = null, messageParameters)
}

override def getQueryContext: Array[QueryContext] = new Array(0);
}

class DeltaParseException(
Expand Down
Loading