Skip to content

[Spark] Delta Connect python client implementation + tests (ported from the branch-4.0-preview) #4514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .github/workflows/spark_master_python_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0
pipenv run pip install flake8==3.9.0
pipenv run pip install black==23.12.1
pipenv run pip install importlib_metadata==3.10.0
pipenv run pip install mypy==1.8.0
pipenv run pip install mypy-protobuf==3.3.0
pipenv run pip install cryptography==37.0.4
Expand All @@ -74,8 +75,15 @@ jobs:
pipenv run pip install pydocstyle==3.0.0
pipenv run pip install pandas==2.2.0
pipenv run pip install pyarrow==11.0.0
pipenv run pip install numpy==1.21
pipenv run pip install https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc4-bin/pyspark-4.0.0.tar.gz
pipenv run pip install pypandoc==1.3.3
Copy link
Contributor

@longvu-db longvu-db May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we pip install 2 different versions of wheel (0.43.0 and 0.33.4), right above

pipenv run pip install numpy==1.22.4
pipenv run pip install grpcio==1.67.0
pipenv run pip install grpcio-status==1.67.0
pipenv run pip install googleapis-common-protos==1.65.0
pipenv run pip install protobuf==5.29.1
pipenv run pip install googleapis-common-protos-stubs==2.2.0
pipenv run pip install grpc-stubs==1.24.11
pipenv run pip install https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc6-bin/pyspark-4.0.0.tar.gz
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this from rc4 to rc6 if that's okay, feel free to put it back if it's not

if: steps.git-diff.outputs.diff
- name: Run Python tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_master_test.yaml
Expand Down
4 changes: 3 additions & 1 deletion dev/delta-connect-gen-protos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ for f in `find gen/proto/python/delta/connect -name "*.py*"`; do
if [[ $f == *_pb2.py || $f == *_pb2_grpc.py ]]; then
sed \
-e 's/import spark.connect./import pyspark.sql.connect.proto./g' \
-e 's/from delta.connect import/from delta.connect.proto import/g' \
-e "s/DESCRIPTOR, 'spark.connect/DESCRIPTOR, 'pyspark.sql.connect.proto/g" \
-e 's/from spark.connect import/from pyspark.sql.connect.proto import/g' \
-e "s/DESCRIPTOR, 'delta.connect/DESCRIPTOR, 'delta.connect.proto/g" \
-e 's/from delta.connect import/from delta.connect.proto import/g' \
$f > $f.tmp
mv $f.tmp $f
elif [[ $f == *.pyi ]]; then
Expand Down
7 changes: 6 additions & 1 deletion dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ mypy==1.8.0
flake8==3.9.0

# Code Formatter
black==23.9.1
black==23.12.1

# Spark Connect (required)
grpcio>=1.67.0
grpcio-status>=1.67.0
googleapis-common-protos>=1.65.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
googleapis-common-protos>=1.65.0
googleapis-common-protos>=1.65.0
protobuf==5.29.1

https://github.com/apache/spark/blob/6f8fa6b08ff30c41c108fdc1e7af69befcc6915c/dev/requirements.txt#L64C1-L64C17

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done later


# Spark and Delta Connect python proto generation plugin (optional)
mypy-protobuf==3.3.0
100 changes: 100 additions & 0 deletions examples/python/delta_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
To run this example you must follow these steps:

Requirements:
- Using Java 17
- Spark 4.0.0-preview1+
- delta-spark (python package) 4.0.0rc1+ and pyspark 4.0.0.dev1+

(1) Start a local Spark connect server using this command:
sbin/start-connect-server.sh \
--packages org.apache.spark:spark-connect_2.13:4.0.0-preview1,io.delta:delta-connect-server_2.13:{DELTA_VERSION},io.delta:delta-spark_2.13:{DELTA_VERSION},com.google.protobuf:protobuf-java:3.25.1 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "spark.connect.extensions.relation.classes"="org.apache.spark.sql.connect.delta.DeltaRelationPlugin" \
--conf "spark.connect.extensions.command.classes"="org.apache.spark.sql.connect.delta.DeltaCommandPlugin"
* Be sure to replace DELTA_VERSION with the version you are using

(2) Set the SPARK_REMOTE environment variable to point to your local Spark server
export SPARK_REMOTE="sc://localhost:15002"

(3) Run this file i.e. python3 examples/python/delta_connect.py
"""

import os
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import shutil

filePath = "/tmp/delta_connect"
tableName = "delta_connect_table"

def assert_dataframe_equals(df1, df2):
assert(df1.collect().sort() == df2.collect().sort())

def cleanup(spark):
shutil.rmtree(filePath, ignore_errors=True)
spark.sql(f"DROP TABLE IF EXISTS {tableName}")

# --------------------- Set up Spark Connect spark session ------------------------

assert os.getenv("SPARK_REMOTE"), "Must point to Spark Connect server using SPARK_REMOTE"

spark = SparkSession.builder \
.appName("delta_connect") \
.remote(os.getenv("SPARK_REMOTE")) \
.getOrCreate()

# Clean up any previous runs
cleanup(spark)

# -------------- Try reading non-existent table (should fail with an exception) ----------------

# Using forPath
try:
DeltaTable.forPath(spark, filePath).toDF().show()
except Exception as e:
assert "DELTA_MISSING_DELTA_TABLE" in str(e)
else:
assert False, "Expected exception to be thrown for missing table"

# Using forName
try:
DeltaTable.forName(spark, tableName).toDF().show()
except Exception as e:
assert "DELTA_MISSING_DELTA_TABLE" in str(e)
else:
assert False, "Expected exception to be thrown for missing table"

# ------------------------ Write basic table and check that results match ----------------------

# By table name
spark.range(5).write.format("delta").saveAsTable(tableName)
assert_dataframe_equals(DeltaTable.forName(spark, tableName).toDF(), spark.range(5))
assert_dataframe_equals(spark.read.format("delta").table(tableName), spark.range(5))
assert_dataframe_equals(spark.sql(f"SELECT * FROM {tableName}"), spark.range(5))

# By table path
spark.range(10).write.format("delta").save(filePath)
assert_dataframe_equals(DeltaTable.forPath(spark, filePath).toDF(), spark.range(10))
assert_dataframe_equals(spark.read.format("delta").load(filePath), spark.range(10))
assert_dataframe_equals(spark.sql(f"SELECT * FROM delta.`{filePath}`"), spark.range(10))

# ---------------------------------- Clean up ----------------------------------------
cleanup(spark)
19 changes: 19 additions & 0 deletions python/delta/connect/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from delta.connect.tables import DeltaTable

__all__ = ['DeltaTable']
55 changes: 55 additions & 0 deletions python/delta/connect/plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Optional

import delta.connect.proto as proto

from pyspark.sql.connect.client import SparkConnectClient
from pyspark.sql.connect.plan import LogicalPlan
import pyspark.sql.connect.proto as spark_proto


class DeltaLogicalPlan(LogicalPlan):
def __init__(self, child: Optional[LogicalPlan]) -> None:
super().__init__(child)

def plan(self, session: SparkConnectClient) -> spark_proto.Relation:
plan = spark_proto.Relation()
plan.extension.Pack(self.to_delta_relation(session))
return plan

def to_delta_relation(self, session: SparkConnectClient) -> proto.DeltaRelation:
...

def command(self, session: SparkConnectClient) -> spark_proto.Command:
command = spark_proto.Command()
command.extension.Pack(self.to_delta_command(session))
return command

def to_delta_command(self, session: SparkConnectClient) -> proto.DeltaCommand:
...


class DeltaScan(DeltaLogicalPlan):
def __init__(self, table: proto.DeltaTable) -> None:
super().__init__(None)
self._table = table

def to_delta_relation(self, client: SparkConnectClient) -> proto.DeltaRelation:
relation = proto.DeltaRelation()
relation.scan.table.CopyFrom(self._table)
return relation
19 changes: 19 additions & 0 deletions python/delta/connect/proto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from delta.connect.proto.base_pb2 import *
from delta.connect.proto.commands_pb2 import *
from delta.connect.proto.relations_pb2 import *
6 changes: 3 additions & 3 deletions python/delta/connect/proto/relations_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@


from delta.connect.proto import base_pb2 as delta_dot_connect_dot_base__pb2
from spark.connect import expressions_pb2 as spark_dot_connect_dot_expressions__pb2
from spark.connect import relations_pb2 as spark_dot_connect_dot_relations__pb2
from spark.connect import types_pb2 as spark_dot_connect_dot_types__pb2
from pyspark.sql.connect.proto import expressions_pb2 as spark_dot_connect_dot_expressions__pb2
from pyspark.sql.connect.proto import relations_pb2 as spark_dot_connect_dot_relations__pb2
from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__pb2


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
Expand Down
88 changes: 88 additions & 0 deletions python/delta/connect/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Dict, Optional

from delta.connect.plan import DeltaScan
import delta.connect.proto as proto
from delta.tables import DeltaTable as LocalDeltaTable

from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.connect.plan import LogicalPlan, SubqueryAlias
from pyspark.sql.connect.session import SparkSession


class DeltaTable(object):
def __init__(
self,
spark: SparkSession,
path: Optional[str] = None,
tableOrViewName: Optional[str] = None,
hadoopConf: Dict[str, str] = dict(),
plan: Optional[LogicalPlan] = None
) -> None:
self._spark = spark
self._path = path
self._tableOrViewName = tableOrViewName
self._hadoopConf = hadoopConf
if plan is not None:
self._plan = plan
else:
self._plan = DeltaScan(self._to_proto())

def toDF(self) -> DataFrame:
return DataFrame(self._plan, session=self._spark)

def alias(self, aliasName: str) -> "DeltaTable":
return DeltaTable(
self._spark,
self._path,
self._tableOrViewName,
self._hadoopConf,
SubqueryAlias(self._plan, aliasName)
)

@classmethod
def forPath(
cls,
sparkSession: SparkSession,
path: str,
hadoopConf: Dict[str, str] = dict()
) -> "DeltaTable":
assert sparkSession is not None
return DeltaTable(sparkSession, path=path, hadoopConf=hadoopConf)

@classmethod
def forName(
cls, sparkSession: SparkSession, tableOrViewName: str
) -> "DeltaTable":
assert sparkSession is not None
return DeltaTable(sparkSession, tableOrViewName=tableOrViewName)

def _to_proto(self) -> proto.DeltaTable:
result = proto.DeltaTable()
if self._path is not None:
result.path.path = self._path
if self._tableOrViewName is not None:
result.table_or_view_name = self._tableOrViewName
return result


DeltaTable.__doc__ = LocalDeltaTable.__doc__
DeltaTable.toDF.__doc__ = LocalDeltaTable.toDF.__doc__
DeltaTable.alias.__doc__ = LocalDeltaTable.alias.__doc__
DeltaTable.forPath.__func__.__doc__ = LocalDeltaTable.forPath.__doc__
DeltaTable.forName.__func__.__doc__ = LocalDeltaTable.forName.__doc__
15 changes: 15 additions & 0 deletions python/delta/connect/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Loading
Loading