Skip to content

[Ignore] CI test for Python Delta Connect #4530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 91 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
29c81a8
SPARK-49025: Make Column implementation agnostic
hvanhovell Dec 2, 2024
7769b02
Merge remote-tracking branch 'delta/master' into SPARK-49025
hvanhovell Dec 3, 2024
3cd4522
Use shims to make cross compilation work.
hvanhovell Dec 3, 2024
1e42d57
Fix tests.
hvanhovell Dec 3, 2024
96b744e
Merge remote-tracking branch 'delta/master' into SPARK-49025
hvanhovell Dec 4, 2024
5f4b78e
Fix DataSkippingPredicateBuilder
hvanhovell Dec 4, 2024
a790e70
Merge remote-tracking branch 'delta/master' into SPARK-49025
hvanhovell Dec 5, 2024
ebfc764
CI
hvanhovell Dec 5, 2024
46cbac3
Fix Master
hvanhovell Dec 5, 2024
446b11d
Fix missing error
hvanhovell Dec 6, 2024
ecaac64
Actual bug
hvanhovell Dec 6, 2024
0342f3c
Fix build
hvanhovell Dec 6, 2024
e3f20ca
Attempt to fix errors
hvanhovell Dec 6, 2024
e668428
No style
hvanhovell Dec 6, 2024
de70e3d
...
hvanhovell Dec 6, 2024
3a1cb9e
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Dec 10, 2024
732ab1c
toJson codegen fix
hvanhovell Dec 10, 2024
53af65a
Attempt to fix errors
hvanhovell Dec 11, 2024
bbecbee
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Dec 11, 2024
4201898
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Dec 13, 2024
cef032a
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Feb 5, 2025
1baf1f5
Use optimizedPlan instead
hvanhovell Feb 5, 2025
246e186
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Feb 13, 2025
512ff70
Fix Scala Refactor Errors
hvanhovell Feb 14, 2025
99c37f0
Fix Scala Refactor Errors
hvanhovell Feb 14, 2025
dc11079
Fix TableSpec collation
hvanhovell Feb 14, 2025
995d670
Make tests compile
hvanhovell Feb 14, 2025
810d4d8
Make 3.5 compile again
hvanhovell Feb 14, 2025
d0c9dc4
Undo contains change
hvanhovell Feb 14, 2025
312eb6a
Fix reflection magic
hvanhovell Feb 14, 2025
fbaa773
Fix reflection/to_json
hvanhovell Feb 14, 2025
11e4824
Fix stats collection
hvanhovell Feb 14, 2025
269c8ca
Fix tests
hvanhovell Feb 14, 2025
c6262ed
SPARK-51356
hvanhovell Feb 28, 2025
2713335
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Feb 28, 2025
fcc5577
Structured Logging was disabled by default
hvanhovell Feb 28, 2025
7bfa849
Changed error message
hvanhovell Feb 28, 2025
4cb463e
Fix column resolution
hvanhovell Feb 28, 2025
cc30948
Fix delta connect plugin
hvanhovell Feb 28, 2025
f7f68c4
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Feb 28, 2025
3929e67
Fix DeltaAlterTableTests
hvanhovell Feb 28, 2025
410b919
Add apache snapshots to repositories
hvanhovell Feb 28, 2025
ca734a3
Fix connect server compilation
hvanhovell Mar 1, 2025
1441c43
Fix connect test compilation. Disable them for now.
hvanhovell Mar 2, 2025
aff0892
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Mar 2, 2025
3842d3b
WIP - fix connect client tests
hvanhovell Mar 3, 2025
2260fcc
Merge remote-tracking branch 'delta/master' into fix_mstr
hvanhovell Mar 3, 2025
d15f30d
Undo unneeded connect client changes
hvanhovell Mar 3, 2025
dc43bf6
Fix import
hvanhovell Mar 3, 2025
4c7edfd
Merge branch 'fix_mstr' into fix-connect-client
hvanhovell Mar 4, 2025
1a73d85
Merge remote-tracking branch 'delta/master' into fix-connect-client
hvanhovell Mar 4, 2025
e4ffadb
Merge remote-tracking branch 'delta/master' into fix-connect-client
hvanhovell Mar 13, 2025
16d9e7d
Merge remote-tracking branch 'delta/master' into fix-connect-client
hvanhovell Apr 28, 2025
af128c0
Fix build & re-enable tests
hvanhovell Apr 28, 2025
1b43275
Fix shims & remove stale comment
hvanhovell Apr 28, 2025
2bbfb08
Improve dist generation
hvanhovell Apr 28, 2025
df917e9
Another pass. CR
hvanhovell Apr 29, 2025
a40edf1
Merge remote-tracking branch 'delta/master' into fix-connect-client
hvanhovell Apr 30, 2025
19b41ae
Disable test.
hvanhovell Apr 30, 2025
0aa6a01
Merge remote-tracking branch 'delta/master' into fix-connect-client
hvanhovell May 8, 2025
0e81d63
Try to run python tests
allisonport-db May 8, 2025
c7d6d71
mypy
allisonport-db May 8, 2025
e1cf82e
Mypi + fix getQueryContext error
allisonport-db May 8, 2025
b976a38
Upgrade pandas
allisonport-db May 8, 2025
ad178c5
Ignore all to shorten line
allisonport-db May 8, 2025
2d0dfab
Make StagedTable support truncate
hvanhovell May 9, 2025
996a505
Fix a missed version + update python deps to match Spark
allisonport-db May 9, 2025
b73bfeb
Try to cherry-pick delta connect python changes
allisonport-db Aug 12, 2024
a7c1f84
Add remaining python packages from original PR
allisonport-db May 8, 2025
7218758
Upgrade python deps to match spark
allisonport-db May 8, 2025
4084ec7
Update requirements.txt
allisonport-db May 8, 2025
31cecb1
Update pyproject.toml
allisonport-db May 9, 2025
0bb8355
Update spark connect dependencies to be 4.0
allisonport-db May 9, 2025
1e6e561
Update spark_master_python_test.yaml
allisonport-db May 9, 2025
0b654c2
add staging repo to python tests
allisonport-db May 9, 2025
10565c4
Resolve merge conflicts
longvu-db May 9, 2025
b3b04fd
Update proto
longvu-db May 9, 2025
f4abd51
Try fix import
longvu-db May 9, 2025
11ec127
Merge remote-tracking branch 'delta/master' into fix-connect-client
hvanhovell May 12, 2025
0271be8
Enable tests
hvanhovell May 12, 2025
60a83ea
surgery
vicennial May 12, 2025
f94e9a7
try
vicennial May 13, 2025
1355866
Merge branch 'pr4468' into pr4522
vicennial May 13, 2025
5d64977
Merge branch 'master' into pr4522
vicennial May 14, 2025
3735787
try
vicennial May 14, 2025
9ddbf05
trigger CI
vicennial May 15, 2025
3fba41b
lint
vicennial May 15, 2025
a912a04
more testing
vicennial May 15, 2025
64c2789
lint
vicennial May 15, 2025
24d4e5c
lint
vicennial May 15, 2025
b3ea57a
path|
vicennial May 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/spark_master_python_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0
pipenv run pip install flake8==3.9.0
pipenv run pip install black==23.12.1
pipenv run pip install importlib_metadata==3.10.0
pipenv run pip install mypy==1.8.0
pipenv run pip install mypy-protobuf==3.3.0
pipenv run pip install cryptography==37.0.4
Expand All @@ -74,7 +75,14 @@ jobs:
pipenv run pip install pydocstyle==3.0.0
pipenv run pip install pandas==2.2.0
pipenv run pip install pyarrow==11.0.0
pipenv run pip install numpy==1.21
pipenv run pip install pypandoc==1.3.3
pipenv run pip install numpy==1.22.4
pipenv run pip install grpcio==1.67.0
pipenv run pip install grpcio-status==1.67.0
pipenv run pip install googleapis-common-protos==1.65.0
pipenv run pip install protobuf==5.29.1
pipenv run pip install googleapis-common-protos-stubs==2.2.0
pipenv run pip install grpc-stubs==1.24.11
pipenv run pip install https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc4-bin/pyspark-4.0.0.tar.gz
if: steps.git-diff.outputs.diff
- name: Run Python tests
Expand Down
4 changes: 3 additions & 1 deletion dev/delta-connect-gen-protos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ for f in `find gen/proto/python/delta/connect -name "*.py*"`; do
if [[ $f == *_pb2.py || $f == *_pb2_grpc.py ]]; then
sed \
-e 's/import spark.connect./import pyspark.sql.connect.proto./g' \
-e 's/from delta.connect import/from delta.connect.proto import/g' \
-e "s/DESCRIPTOR, 'spark.connect/DESCRIPTOR, 'pyspark.sql.connect.proto/g" \
-e 's/from spark.connect import/from pyspark.sql.connect.proto import/g' \
-e "s/DESCRIPTOR, 'delta.connect/DESCRIPTOR, 'delta.connect.proto/g" \
-e 's/from delta.connect import/from delta.connect.proto import/g' \
$f > $f.tmp
mv $f.tmp $f
elif [[ $f == *.pyi ]]; then
Expand Down
7 changes: 6 additions & 1 deletion dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ mypy==1.8.0
flake8==3.9.0

# Code Formatter
black==23.9.1
black==23.12.1

# Spark Connect (required)
grpcio>=1.67.0
grpcio-status>=1.67.0
googleapis-common-protos>=1.65.0

# Spark and Delta Connect python proto generation plugin (optional)
mypy-protobuf==3.3.0
99 changes: 99 additions & 0 deletions examples/python/delta_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
To run this example you must follow these steps:

Requirements:
- Using Java 17
- Spark 4.0.0-preview1+
- delta-spark (python package) 4.0.0rc1+ and pyspark 4.0.0.dev1+

(1) Start a local Spark connect server using this command:
sbin/start-connect-server.sh \
--packages org.apache.spark:spark-connect_2.13:4.0.0-preview1,io.delta:delta-connect-server_2.13:{DELTA_VERSION},io.delta:delta-spark_2.13:{DELTA_VERSION},com.google.protobuf:protobuf-java:3.25.1 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "spark.connect.extensions.relation.classes"="org.apache.spark.sql.connect.delta.DeltaRelationPlugin" \
--conf "spark.connect.extensions.command.classes"="org.apache.spark.sql.connect.delta.DeltaCommandPlugin"
* Be sure to replace DELTA_VERSION with the version you are using

(2) Set the SPARK_REMOTE environment variable to point to your local Spark server
export SPARK_REMOTE="sc://localhost:15002"

(3) Run this file i.e. python3 examples/python/delta_connect.py
"""

import os
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import shutil

filePath = "/tmp/delta_connect"
tableName = "delta_connect_table"

def assert_dataframe_equals(df1, df2):
assert(df1.collect().sort() == df2.collect().sort())

def cleanup(spark):
shutil.rmtree(filePath, ignore_errors=True)
spark.sql(f"DROP TABLE IF EXISTS {tableName}")

# --------------------- Set up Spark Connect spark session ------------------------

assert os.getenv("SPARK_REMOTE"), "Must point to Spark Connect server using SPARK_REMOTE"

spark = SparkSession.builder \
.remote(os.getenv("SPARK_REMOTE")) \
.getOrCreate()

# Clean up any previous runs
cleanup(spark)

# -------------- Try reading non-existent table (should fail with an exception) ----------------

# Using forPath
try:
DeltaTable.forPath(spark, filePath).toDF().show()
except Exception as e:
assert "DELTA_MISSING_DELTA_TABLE" in str(e)
else:
assert False, "Expected exception to be thrown for missing table"

# Using forName
try:
DeltaTable.forName(spark, tableName).toDF().show()
except Exception as e:
assert "DELTA_MISSING_DELTA_TABLE" in str(e)
else:
assert False, "Expected exception to be thrown for missing table"

# ------------------------ Write basic table and check that results match ----------------------

# By table name
spark.range(5).write.format("delta").saveAsTable(tableName)
assert_dataframe_equals(DeltaTable.forName(spark, tableName).toDF(), spark.range(5))
assert_dataframe_equals(spark.read.format("delta").table(tableName), spark.range(5))
assert_dataframe_equals(spark.sql(f"SELECT * FROM {tableName}"), spark.range(5))

# By table path
spark.range(10).write.format("delta").save(filePath)
assert_dataframe_equals(DeltaTable.forPath(spark, filePath).toDF(), spark.range(10))
assert_dataframe_equals(spark.read.format("delta").load(filePath), spark.range(10))
assert_dataframe_equals(spark.sql(f"SELECT * FROM delta.`{filePath}`"), spark.range(10))

# ---------------------------------- Clean up ----------------------------------------
cleanup(spark)
19 changes: 19 additions & 0 deletions python/delta/connect/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from delta.connect.tables import DeltaTable

__all__ = ['DeltaTable']
55 changes: 55 additions & 0 deletions python/delta/connect/plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Optional

import delta.connect.proto as proto

from pyspark.sql.connect.client import SparkConnectClient
from pyspark.sql.connect.plan import LogicalPlan
import pyspark.sql.connect.proto as spark_proto


class DeltaLogicalPlan(LogicalPlan):
def __init__(self, child: Optional[LogicalPlan]) -> None:
super().__init__(child)

def plan(self, session: SparkConnectClient) -> spark_proto.Relation:
plan = spark_proto.Relation()
plan.extension.Pack(self.to_delta_relation(session))
return plan

def to_delta_relation(self, session: SparkConnectClient) -> proto.DeltaRelation:
...

def command(self, session: SparkConnectClient) -> spark_proto.Command:
command = spark_proto.Command()
command.extension.Pack(self.to_delta_command(session))
return command

def to_delta_command(self, session: SparkConnectClient) -> proto.DeltaCommand:
...


class DeltaScan(DeltaLogicalPlan):
def __init__(self, table: proto.DeltaTable) -> None:
super().__init__(None)
self._table = table

def to_delta_relation(self, client: SparkConnectClient) -> proto.DeltaRelation:
relation = proto.DeltaRelation()
relation.scan.table.CopyFrom(self._table)
return relation
19 changes: 19 additions & 0 deletions python/delta/connect/proto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from delta.connect.proto.base_pb2 import *
from delta.connect.proto.commands_pb2 import *
from delta.connect.proto.relations_pb2 import *
6 changes: 3 additions & 3 deletions python/delta/connect/proto/relations_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@


from delta.connect.proto import base_pb2 as delta_dot_connect_dot_base__pb2
from spark.connect import expressions_pb2 as spark_dot_connect_dot_expressions__pb2
from spark.connect import relations_pb2 as spark_dot_connect_dot_relations__pb2
from spark.connect import types_pb2 as spark_dot_connect_dot_types__pb2
from pyspark.sql.connect.proto import expressions_pb2 as spark_dot_connect_dot_expressions__pb2
from pyspark.sql.connect.proto import relations_pb2 as spark_dot_connect_dot_relations__pb2
from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__pb2


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
Expand Down
88 changes: 88 additions & 0 deletions python/delta/connect/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Dict, Optional

from delta.connect.plan import DeltaScan
import delta.connect.proto as proto
from delta.tables import DeltaTable as LocalDeltaTable

from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.connect.plan import LogicalPlan, SubqueryAlias
from pyspark.sql.connect.session import SparkSession


class DeltaTable(object):
def __init__(
self,
spark: SparkSession,
path: Optional[str] = None,
tableOrViewName: Optional[str] = None,
hadoopConf: Dict[str, str] = dict(),
plan: Optional[LogicalPlan] = None
) -> None:
self._spark = spark
self._path = path
self._tableOrViewName = tableOrViewName
self._hadoopConf = hadoopConf
if plan is not None:
self._plan = plan
else:
self._plan = DeltaScan(self._to_proto())

def toDF(self) -> DataFrame:
return DataFrame(self._plan, session=self._spark)

def alias(self, aliasName: str) -> "DeltaTable":
return DeltaTable(
self._spark,
self._path,
self._tableOrViewName,
self._hadoopConf,
SubqueryAlias(self._plan, aliasName)
)

@classmethod
def forPath(
cls,
sparkSession: SparkSession,
path: str,
hadoopConf: Dict[str, str] = dict()
) -> "DeltaTable":
assert sparkSession is not None
return DeltaTable(sparkSession, path=path, hadoopConf=hadoopConf)

@classmethod
def forName(
cls, sparkSession: SparkSession, tableOrViewName: str
) -> "DeltaTable":
assert sparkSession is not None
return DeltaTable(sparkSession, tableOrViewName=tableOrViewName)

def _to_proto(self) -> proto.DeltaTable:
result = proto.DeltaTable()
if self._path is not None:
result.path.path = self._path
if self._tableOrViewName is not None:
result.table_or_view_name = self._tableOrViewName
return result


DeltaTable.__doc__ = LocalDeltaTable.__doc__
DeltaTable.toDF.__doc__ = LocalDeltaTable.toDF.__doc__
DeltaTable.alias.__doc__ = LocalDeltaTable.alias.__doc__
DeltaTable.forPath.__func__.__doc__ = LocalDeltaTable.forPath.__doc__
DeltaTable.forName.__func__.__doc__ = LocalDeltaTable.forName.__doc__
15 changes: 15 additions & 0 deletions python/delta/connect/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Loading
Loading